# -*- coding: utf-8 -*-
##------------------------------------------
## OpenVINO™ toolkit
## Age/Gender Recognition
##
## model: face-detection-adas-0001
## age-gender-recognition-retail-0013
##
## 2021.06.21 Masahiro Izutsu
##------------------------------------------
# Color Escape Code
GREEN = '\033[1;32m'
RED = '\033[1;31m'
NOCOLOR = '\033[0m'
YELLOW = '\033[1;33m'
# 定数定義
WINDOW_WIDTH = 640
BOX_COLOR_OK = ( 0,255, 0)
BOX_COLOR_ER = ( 0, 0, 255)
LABEL_BG_COLOR_OK = ( 0, 180, 0) # greyish green background for text
LABEL_BG_COLOR_ER = ( 0, 0, 240) # greyish red background for text
TEXT_COLOR = (255, 255, 255) # white text
MODEL_DEF_FACE = './models/face-detection-adas-0001.xml'
MODEL_DEF_MASK = './models/face_mask.xml'
INPUT_DEF = './images/mask-test.jpg'
# モジュール読み込み
from openvino.inference_engine import IECore
from openvino.inference_engine import get_version
# import処理
import sys
import cv2
import numpy as np
import argparse
import myfunction
import mylib
# タイトル・バージョン情報
title = 'Face Mask Check'
print(GREEN)
print('--- {} ---'.format(title))
print(cv2.__version__)
print("OpenVINO inference_engine:", get_version())
print(NOCOLOR)
# Parses arguments for the application
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--image', metavar = 'IMAGE_FILE', type=str,
default = INPUT_DEF,
help = 'Absolute path to image file or cam for camera stream.')
parser.add_argument('-m_dt', '--m_detector', type=str,
default = MODEL_DEF_FACE,
help = 'Detector Path to an .xml file with a trained model.'
'Default value is '+MODEL_DEF_FACE)
parser.add_argument('-m_mk', '--m_mask', type=str,
default = MODEL_DEF_MASK,
help = 'Face-mask Path to an .xml file with a trained model.'
'Default value is '+MODEL_DEF_MASK)
parser.add_argument('-d', '--device', default = 'CPU', type=str,
help = 'Optional. Specify a target device to infer on. CPU, GPU, FPGA, HDDL or MYRIAD is '
'acceptable. The demo will look for a suitable plugin for the device specified. '
'Default value is CPU')
parser.add_argument('-l', '--language', metavar = 'LANGUAGE',
default = 'jp',
help = 'Language.(jp/en) Default value is \'jp\'')
parser.add_argument('-t', '--title', metavar = 'TITLE',
default = 'y',
help = 'Program title flag.(y/n) Default value is \'y\'')
parser.add_argument('-s', '--speed', metavar = 'SPEED',
default = 'y',
help = 'Speed display flag.(y/n) Default calue is \'y\'')
parser.add_argument('-o', '--out', metavar = 'IMAGE_OUT',
default = 'non',
help = 'Processed image file path. Default value is \'non\'')
return parser
# モデル基本情報の表示
def display_info(image, detector, mask, device, lang, input_blob, out_blob, input_blob_mask, out_blob_mask, titleflg, speedflg, outpath):
print(YELLOW + title + ': Starting application...' + NOCOLOR)
print(' - ' + YELLOW + 'Image File : ' + NOCOLOR, image)
print(' - ' + YELLOW + 'm_detect : ' + NOCOLOR, detector)
print(' - ' + YELLOW + 'm_mask : ' + NOCOLOR, mask)
print(' - ' + YELLOW + 'Device : ' + NOCOLOR, device)
print(' - ' + YELLOW + 'Language : ' + NOCOLOR, lang)
print(' - ' + YELLOW + 'Input Shape1 : ' + NOCOLOR, input_blob)
print(' - ' + YELLOW + 'Output Shape1: ' + NOCOLOR, out_blob)
print(' - ' + YELLOW + 'Input Shape2 : ' + NOCOLOR, input_blob_mask)
print(' - ' + YELLOW + 'Output Shape2: ' + NOCOLOR, out_blob_mask)
print(' - ' + YELLOW + 'Program Title: ' + NOCOLOR, titleflg)
print(' - ' + YELLOW + 'Speed flag : ' + NOCOLOR, speedflg)
print(' - ' + YELLOW + 'Processed out: ' + NOCOLOR, outpath)
# 画像の種類を判別する
# 戻り値: 'jeg''png'... 画像ファイル
# 'None' 画像ファイル以外 (動画ファイル)
# 'NotFound' ファイルが存在しない
import imghdr
def is_pict(filename):
try:
imgtype = imghdr.what(filename)
except FileNotFoundError as e:
imgtype = 'NotFound'
return str(imgtype)
# ** main関数 **
def main():
# 日本語フォント指定
fontPIL = 'NotoSansCJK-Bold.ttc'
# Argument parsing and parameter setting
ARGS = parse_args().parse_args()
input_stream = ARGS.image
lang = ARGS.language
titleflg = ARGS.title
speedflg = ARGS.speed
if ARGS.image.lower() == "cam" or ARGS.image.lower() == "camera":
input_stream = 0
isstream = True
else:
filetype = is_pict(input_stream)
isstream = filetype == 'None'
if (filetype == 'NotFound'):
print(RED + "\ninput file Not found." + NOCOLOR)
quit()
model_detector = ARGS.m_detector
model_mask = ARGS.m_mask
device = ARGS.device
outpath = ARGS.out
# 判定ラベル
if (lang == 'jp'):
label = ('マスクをつけて!', 'マスク装着')
else:
label = ('NOT wearing a Mask !!!', 'earing a Mask')
# モデルの読み込み (顔検出)face-detection-adas-0001
ie = IECore()
net = ie.read_network(model = model_detector, weights = model_detector[:-4] + '.bin')
exec_net = ie.load_network(network = net, device_name = device)
# 入出力設定(顔検出)
input_key = list(net.input_info.keys())[0] # 入力データ・キー名
input_blob_name = net.input_info[input_key].name
output_blob_name = next(iter(net.outputs))
input_blob = net.input_info[input_blob_name].name
out_blob = next(iter(net.outputs))
n, c, h, w = net.input_info[input_blob].input_data.shape
# モデルの読み込み(マスク装着)face-mask
net_mask = ie.read_network(model = model_mask, weights = model_mask[:-4] + '.bin')
exec_net_mask = ie.load_network(network = net_mask, device_name=device)
# 入出力設定(年齢/性別)
input_key_mask = list(net.input_info.keys())[0] # 入力データ・キー名
input_blob_name_mask = net.input_info[input_key_mask].name
output_blob_name_mask = next(iter(net_mask.outputs))
input_blob_mask = net.input_info[input_blob_name_mask].name
out_blob_mask = next(iter(net_mask.outputs))
n_mask, c_mask, h_mask, w_mask = net_mask.input_info[input_blob_mask].input_data.shape
# 情報表示
display_info(input_stream, model_detector, model_mask, device, lang, input_blob, out_blob, input_blob_mask, out_blob_mask, titleflg, speedflg, outpath)
# 入力準備
if (isstream):
# カメラ
cap = cv2.VideoCapture(input_stream)
ret, frame = cap.read()
loopflg = cap.isOpened()
else:
# 画像ファイル読み込み
frame = cv2.imread(input_stream)
if frame is None:
print(RED + "\nUnable to read the input." + NOCOLOR)
quit()
# アスペクト比を固定してリサイズ
img_h, img_w = frame.shape[:2]
if (img_w > WINDOW_WIDTH):
height = round(img_h * (WINDOW_WIDTH / img_w))
frame = cv2.resize(frame, dsize = (WINDOW_WIDTH, height))
loopflg = True # 1回ループ
# 処理結果の記録 step1
if (outpath != 'non'):
if (isstream):
fps = int(cap.get(cv2.CAP_PROP_FPS))
out_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
out_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
outvideo = cv2.VideoWriter(outpath, fourcc, fps, (out_w, out_h))
# 計測値初期化
fpsWithTick = mylib.fpsWithTick()
frame_count = 0
fps_total = 0
fpsWithTick.get() # fps計測開始
# メインループ
while (loopflg):
if frame is None:
print(RED + "\nUnable to read the input." + NOCOLOR)
quit()
# 入力データフォーマットへ変換
img = cv2.resize(frame, (w, h)) # サイズ変更
img = img.transpose((2, 0, 1)) # HWC > CHW
img = np.expand_dims(img, axis=0) # 次元合せ
# 推論実行
out = exec_net.infer(inputs={input_blob_name: img})
# 出力から必要なデータのみ取り出し
out = out[output_blob_name]
out = np.squeeze(out) # サイズ1の次元を全て削除
# 検出されたすべての顔領域に対して1つずつ処理
for detection in out:
# conf値の取得
confidence = float(detection[2])
# バウンディングボックス座標を入力画像のスケールに変換
xmin = int(detection[3] * frame.shape[1])
ymin = int(detection[4] * frame.shape[0])
xmax = int(detection[5] * frame.shape[1])
ymax = int(detection[6] * frame.shape[0])
# conf値が0.5より大きい場合のみバウンディングボックス表示
if confidence > 0.5:
# 顔検出領域はカメラ範囲内に補正する。特にminは補正しないとエラーになる
if xmin < 0:
xmin = 0
if ymin < 0:
ymin = 0
if xmax > frame.shape[1]:
xmax = frame.shape[1]
if ymax > frame.shape[0]:
ymax = frame.shape[0]
# 顔領域のみ切り出し
frame_face = frame[ ymin:ymax, xmin:xmax ]
# 入力データフォーマットへ変換
img = cv2.resize(frame_face, (w_mask, h_mask)) # サイズ変更
img = img.transpose((2, 0, 1)) # HWC > CHW
img = np.expand_dims(img, axis=0) # 次元合せ
# 推論実行
out = exec_net_mask.infer(inputs={input_blob_name_mask: img})
# 出力から必要なデータのみ取り出し
mask_out = out[output_blob_name_mask]
mask_out = np.squeeze(mask_out) #不要な次元の削減
mask_flg = False
if mask_out < 0.0:
box_color = BOX_COLOR_ER
label_bgcolor = LABEL_BG_COLOR_ER
out_str = label[0]
else:
mask_flg = True
box_color = BOX_COLOR_OK
label_bgcolor = LABEL_BG_COLOR_OK
out_str = label[1]
label_text_color = TEXT_COLOR
# バウンディングボックス(顔領域)表示
cv2.rectangle(frame, (xmin, ymin-20), (xmax, ymin), label_bgcolor, -1)
# cv2.putText(frame, out_str, (xmin, ymin-4), cv2.FONT_HERSHEY_DUPLEX, fontScale=0.6, color=cor, lineType=cv2.LINE_AA)
myfunction.cv2_putText(img = frame,
text = out_str,
org = (xmin+2, ymin-4),
fontFace = fontPIL,
fontScale = 12,
color = label_text_color,
mode = 0)
cv2.rectangle(frame, (xmin, ymin-20), (xmax, ymax), box_color, thickness = 1)
# FPSを計算する
fps = fpsWithTick.get()
st_fps = 'fps: {:>6.2f}'.format(fps)
if (speedflg == 'y'):
cv2.rectangle(frame, (10, 38), (95, 55), (90, 90, 90), -1)
cv2.putText(frame, st_fps, (15, 50), cv2.FONT_HERSHEY_DUPLEX, fontScale=0.4, color=(255, 255, 255), lineType=cv2.LINE_AA)
# タイトル描画
if (titleflg == 'y'):
cv2.putText(frame, title, (10, 30), cv2.FONT_HERSHEY_DUPLEX, fontScale=0.8, color=(200, 200, 0), lineType=cv2.LINE_AA)
# 画像表示
window_name = title + " (hit 'q' or 'esc' key to exit)"
cv2.imshow(window_name, frame)
# 処理結果の記録 step2
if (outpath != 'non'):
if (isstream):
outvideo.write(frame)
else:
cv2.imwrite(outpath, frame)
# 何らかのキーが押されたら終了
breakflg = False
while(True):
key = cv2.waitKey(1)
prop_val = cv2.getWindowProperty(window_name, cv2.WND_PROP_ASPECT_RATIO)
if key == 27 or key == 113 or (prop_val < 0.0): # 'esc' or 'q'
breakflg = True
break
if (isstream):
break
if ((breakflg == False) and isstream):
# 次のフレームを読み出す
ret, frame = cap.read()
if ret == False:
break
loopflg = cap.isOpened()
else:
loopflg = False
# 終了処理
if (isstream):
cap.release()
# 処理結果の記録 step3
if (outpath != 'non'):
if (isstream):
outvideo.release()
cv2.destroyAllWindows()
print('\nFPS average: {:>10.2f}'.format(fpsWithTick.get_average()))
print('\n Finished.')
# main関数エントリーポイント(実行開始)
if __name__ == "__main__":
sys.exit(main())