# -*- coding: utf-8 -*-
##------------------------------------------
## OpenVINO™ toolkit
## Person Re-Identificationで人物を追跡
##
## model: person-detection-retail-0013
## person-reidentification-retail-0287
##
## 2021.03.10 Masahiro Izutsu
##------------------------------------------
## person-tracking.py
import sys
import argparse
import numpy as np
import time
import random
import cv2
from openvino.inference_engine import get_version
from openvino.inference_engine import IECore
from model import Model
# Color Escape Code
GREEN = '\033[1;32m'
RED = '\033[1;31m'
NOCOLOR = '\033[0m'
YELLOW = '\033[1;33m'
# 定数定義
DEVICE = "MYRIAD"
MODULE_DETECTOR = '../FP16/person-detection-retail-0013'
MODULE_REIDENTIFCATION = '../FP16/person-reidentification-retail-0287'
MOVIE = "../../Videos/video003.mp4"
THRESHOLD= 0.8
TRACKING_MAX=50
SCALE = 1.0
# タイトル・バージョン情報
title = 'Person Tracking'
print(GREEN)
print('--- {} ---'.format(title))
print(cv2.__version__)
print("OpenVINO inference_engine:", get_version())
print(NOCOLOR)
# Parses arguments for the application
def parse_args():
parser = argparse.ArgumentParser(description = 'Image classifier using \
Intel® Neural Compute Stick 2.' )
parser.add_argument( '-i', '--image', metavar = 'IMAGE_FILE',
type=str, default = MOVIE,
help = 'Absolute path to movie file or cam for camera stream.')
parser.add_argument( '--threshold', metavar = 'FLOAT',
type=float, default = THRESHOLD,
help = 'Threshold for detection.')
return parser
class PersonDetector(Model):
def __init__(self, model_path, device, ie_core, threshold, num_requests):
super().__init__(model_path, device, ie_core, num_requests, None)
_, _, h, w = self.input_size
self.__input_height = h
self.__input_width = w
self.__threshold = threshold
def __prepare_frame(self, frame):
initial_h, initial_w = frame.shape[:2]
scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
in_frame = in_frame.transpose((2, 0, 1))
in_frame = in_frame.reshape(self.input_size)
return in_frame, scale_h, scale_w
def infer(self, frame):
in_frame, _, _ = self.__prepare_frame(frame)
result = super().infer(in_frame)
detections = []
height, width = frame.shape[:2]
for r in result[0][0]:
conf = r[2]
if(conf > self.__threshold):
x1 = int(r[3] * width)
y1 = int(r[4] * height)
x2 = int(r[5] * width)
y2 = int(r[6] * height)
detections.append([x1, y1, x2, y2, conf])
return detections
class PersonReidentification(Model):
def __init__(self, model_path, device, ie_core, threshold, num_requests):
super().__init__(model_path, device, ie_core, num_requests, None)
_, _, h, w = self.input_size
self.__input_height = h
self.__input_width = w
self.__threshold = threshold
def __prepare_frame(self, frame):
initial_h, initial_w = frame.shape[:2]
scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
in_frame = in_frame.transpose((2, 0, 1))
in_frame = in_frame.reshape(self.input_size)
return in_frame, scale_h, scale_w
def infer(self, frame):
in_frame, _, _ = self.__prepare_frame(frame)
result = super().infer(in_frame)
return np.delete(result, 1)
class Tracker:
def __init__(self):
# 識別情報のDB
self.identifysDb = None
# 中心位置のDB
self.center = []
def __getCenter(self, person):
x = person[0] - person[2]
y = person[1] - person[3]
return (x,y)
def __getDistance(self, person, index):
(x1, y1) = self.center[index]
(x2, y2) = self.__getCenter(person)
a = np.array([x1, y1])
b = np.array([x2, y2])
u = b - a
return np.linalg.norm(u)
def __isOverlap(self, persons, index):
[x1, y1, x2, y2] = persons[index]
for i, person in enumerate(persons):
if(index == i):
continue
if(max(person[0], x1) <= min(person[2], x2) and max(person[1], y1) <= min(person[3], y2)):
return True
return False
def getIds(self, identifys, persons):
if(identifys.size==0):
return []
if self.identifysDb is None:
self.identifysDb = identifys
for person in persons:
self.center.append(self.__getCenter(person))
print("input: {} DB:{}".format(len(identifys), len(self.identifysDb)))
similaritys = self.__cos_similarity(identifys, self.identifysDb)
similaritys[np.isnan(similaritys)] = 0
ids = np.nanargmax(similaritys, axis=1)
for i, similarity in enumerate(similaritys):
persionId = ids[i]
d = self.__getDistance(persons[i], persionId)
print("persionId:{} {} distance:{}".format(persionId,similarity[persionId], d))
# 0.95以上で、重なりの無い場合、識別情報を更新する
if(similarity[persionId] > 0.95):
if(self.__isOverlap(persons, i) == False):
self.identifysDb[persionId] = identifys[i]
# 0.5以下で、距離が離れている場合、新規に登録する
elif(similarity[persionId] < 0.5):
if(d > 500):
print("distance:{} similarity:{}".format(d, similarity[persionId]))
self.identifysDb = np.vstack((self.identifysDb, identifys[i]))
self.center.append(self.__getCenter(persons[i]))
ids[i] = len(self.identifysDb) - 1
print("> append DB size:{}".format(len(self.identifysDb)))
print(ids)
# 重複がある場合は、信頼度の低い方を無効化する
for i, a in enumerate(ids):
for e, b in enumerate(ids):
if(e == i):
continue
if(a == b):
if(similarity[a] > similarity[b]):
ids[i] = -1
else:
ids[e] = -1
print(ids)
return ids
# コサイン類似度
# 参考にさせて頂きました: https://github.com/kodamap/person_reidentification
def __cos_similarity(self, X, Y):
m = X.shape[0]
Y = Y.T
return np.dot(X, Y) / (
np.linalg.norm(X.T, axis=0).reshape(m, 1) * np.linalg.norm(Y, axis=0)
)
# モデル基本情報の表示
def display_info(image, threshold):
print(YELLOW + title + ': Starting application...' + NOCOLOR)
print(' - ' + YELLOW + 'Image File: ' + NOCOLOR, image)
print(' - ' + YELLOW + 'Threshold: ' + NOCOLOR, threshold)
# 画像の種類を判別する
# 戻り値: 'jeg''png'... 画像ファイル
# 'None' 画像ファイル以外 (動画ファイル)
# 'NotFound' ファイルが存在しない
import imghdr
def is_pict(filename):
try:
imgtype = imghdr.what(filename)
except FileNotFoundError as e:
imgtype = 'NotFound'
return str(imgtype)
# ** main関数 **
def main():
# Argument parsing and parameter setting
ARGS = parse_args().parse_args()
input_stream = ARGS.image
if ARGS.image.lower() == "cam" or ARGS.image.lower() == "camera":
input_stream = 0
else:
filetype = is_pict(input_stream)
if (filetype == 'NotFound' or filetype !='None'):
print(RED + "\ninput file Not found." + NOCOLOR)
quit()
detection_threshold = ARGS.threshold
device = DEVICE
cpu_extension = None
ie_core = IECore()
if device == "CPU" and cpu_extension:
ie_core.add_extension(cpu_extension, "CPU")
# 情報表示
display_info(input_stream, detection_threshold)
person_detector = PersonDetector("../FP16/person-detection-retail-0013", device, ie_core, detection_threshold, num_requests=2)
personReidentification = PersonReidentification("../FP16/person-reidentification-retail-0287", device, ie_core, detection_threshold, num_requests=2)
tracker = Tracker()
cap = cv2.VideoCapture (input_stream)
colors = []
for i in range(TRACKING_MAX):
b = random.randint(0, 255)
g = random.randint(0, 255)
r = random.randint(0, 255)
colors.append((b,g,r))
while True:
grabbed, frame = cap.read()
if not grabbed:# ループ再生
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
if(frame is None):
continue
# Personを検知する
persons = []
detections = person_detector.infer(frame)
if(len(detections) > 0):
print("-------------------")
for detection in detections:
x1 = int(detection[0])
y1 = int(detection[1])
x2 = int(detection[2])
y2 = int(detection[3])
conf = detection[4]
print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2))
persons.append([x1,y1,x2,y2])
print("====================")
# 各Personの画像から識別情報を取得する
identifys = np.zeros((len(persons), 255))
for i, person in enumerate(persons):
# 各Personのimage取得
img = frame[person[1] : person[3], person[0]: person[2]]
h, w = img.shape[:2]
if(h==0 or w==0):
continue
# identification取得
identifys[i] = personReidentification.infer(img)
# Idの取得
ids = tracker.getIds(identifys, persons)
# 枠及びIdを画像に追加
for i, person in enumerate(persons):
if(ids[i]!=-1):
color = colors[int(ids[i])]
frame = cv2.rectangle(frame, (person[0], person[1]), (person[2] ,person[3]), color, int(2))
frame = cv2.putText(frame, str(ids[i]), (person[0], person[1]), cv2.FONT_HERSHEY_PLAIN, int(2), color, int(2), cv2.LINE_AA)
# 画像の縮小
h, w = frame.shape[:2]
frame = cv2.resize(frame, ((int(w * SCALE), int(h * SCALE))))
# 画像の表示
window_name = title + ' (hit key to exit)'
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
# main関数エントリーポイント(実行開始)
if __name__ == "__main__":
sys.exit(main())
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
##------------------------------------------
## OpenVINO™ toolkit
## Person Re-Identificationで人物を追跡
##
## model: person-detection-retail-0013
## person-reidentification-retail-0287
##
## 2021.03.10 Masahiro Izutsu
##------------------------------------------
## 2021.03.25 model/device parameter
## 2021.06.23 fps display
import sys
import argparse
import numpy as np
import time
import random
import cv2
from openvino.inference_engine import get_version
from openvino.inference_engine import IECore
from model import Model
import mylib
# Color Escape Code
GREEN = '\033[1;32m'
RED = '\033[1;31m'
NOCOLOR = '\033[0m'
YELLOW = '\033[1;33m'
# 定数定義
MOVIE = "../../Videos/video003.mp4"
THRESHOLD= 0.8
TRACKING_MAX=50
SCALE = 1.0
from os.path import expanduser
MODEL_DEF_DETECT = expanduser('~/model/intel/FP32/person-detection-retail-0013.xml')
MODEL_DEF_REIDE = expanduser('~/model/intel/FP32/person-reidentification-retail-0287.xml')
# タイトル・バージョン情報
title = 'Person Tracking 2'
print(GREEN)
print('--- {} ---'.format(title))
print(cv2.__version__)
print("OpenVINO inference_engine:", get_version())
print(NOCOLOR)
# Parses arguments for the application
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--image', metavar = 'IMAGE_FILE', type=str, default = MOVIE,
help = 'Absolute path to movie file or cam for camera stream.')
parser.add_argument('-m_dt', '--m_detector', type = str,
default = MODEL_DEF_DETECT,
help = 'Detector Path to an .xml file with a trained model.'
'Default value is '+MODEL_DEF_DETECT)
parser.add_argument('-m_re', '--m_reidentification', type = str,
default = MODEL_DEF_REIDE,
help = 'Reidentification Path to an .xml file with a trained model.'
'Default value is '+MODEL_DEF_REIDE)
parser.add_argument('-d', '--device', default = 'CPU', type = str,
help = 'Optional. Specify a target device to infer on. CPU, GPU, FPGA, HDDL or MYRIAD is '
'acceptable. The demo will look for a suitable plugin for the device specified. '
'Default value is CPU')
parser.add_argument('--threshold', metavar = 'FLOAT', type = float, default = THRESHOLD,
help = 'Threshold for detection.')
parser.add_argument('-s', '--speed', metavar = 'SPEED',
default = 'y',
help = 'Speed display flag.(y/n) Default calue is \'y\'')
parser.add_argument('-o', '--out', metavar = 'IMAGE_OUT',
default = 'non',
help = 'Processed image file path. Default value is \'non\'')
return parser
class PersonDetector(Model):
def __init__(self, model_path, device, ie_core, threshold, num_requests):
super().__init__(model_path, device, ie_core, num_requests, None)
_, _, h, w = self.input_size
self.__input_height = h
self.__input_width = w
self.__threshold = threshold
def __prepare_frame(self, frame):
initial_h, initial_w = frame.shape[:2]
scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
in_frame = in_frame.transpose((2, 0, 1))
in_frame = in_frame.reshape(self.input_size)
return in_frame, scale_h, scale_w
def infer(self, frame):
in_frame, _, _ = self.__prepare_frame(frame)
result = super().infer(in_frame)
detections = []
height, width = frame.shape[:2]
for r in result[0][0]:
conf = r[2]
if(conf > self.__threshold):
x1 = int(r[3] * width)
y1 = int(r[4] * height)
x2 = int(r[5] * width)
y2 = int(r[6] * height)
detections.append([x1, y1, x2, y2, conf])
return detections
class PersonReidentification(Model):
def __init__(self, model_path, device, ie_core, threshold, num_requests):
super().__init__(model_path, device, ie_core, num_requests, None)
_, _, h, w = self.input_size
self.__input_height = h
self.__input_width = w
self.__threshold = threshold
def __prepare_frame(self, frame):
initial_h, initial_w = frame.shape[:2]
scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
in_frame = in_frame.transpose((2, 0, 1))
in_frame = in_frame.reshape(self.input_size)
return in_frame, scale_h, scale_w
def infer(self, frame):
in_frame, _, _ = self.__prepare_frame(frame)
result = super().infer(in_frame)
return np.delete(result, 1)
class Tracker:
def __init__(self):
# 識別情報のDB
self.identifysDb = None
# 中心位置のDB
self.center = []
def __getCenter(self, person):
x = person[0] - person[2]
y = person[1] - person[3]
return (x,y)
def __getDistance(self, person, index):
(x1, y1) = self.center[index]
(x2, y2) = self.__getCenter(person)
a = np.array([x1, y1])
b = np.array([x2, y2])
u = b - a
return np.linalg.norm(u)
def __isOverlap(self, persons, index):
[x1, y1, x2, y2] = persons[index]
for i, person in enumerate(persons):
if(index == i):
continue
if(max(person[0], x1) <= min(person[2], x2) and max(person[1], y1) <= min(person[3], y2)):
return True
return False
def getIds(self, identifys, persons):
if(identifys.size==0):
return []
if self.identifysDb is None:
self.identifysDb = identifys
for person in persons:
self.center.append(self.__getCenter(person))
print("input: {} DB:{}".format(len(identifys), len(self.identifysDb)))
similaritys = self.__cos_similarity(identifys, self.identifysDb)
similaritys[np.isnan(similaritys)] = 0
ids = np.nanargmax(similaritys, axis=1)
for i, similarity in enumerate(similaritys):
persionId = ids[i]
d = self.__getDistance(persons[i], persionId)
print("persionId:{} {} distance:{}".format(persionId,similarity[persionId], d))
# 0.95以上で、重なりの無い場合、識別情報を更新する
if(similarity[persionId] > 0.95):
if(self.__isOverlap(persons, i) == False):
self.identifysDb[persionId] = identifys[i]
# 0.5以下で、距離が離れている場合、新規に登録する
elif(similarity[persionId] < 0.5):
if(d > 500):
print("distance:{} similarity:{}".format(d, similarity[persionId]))
self.identifysDb = np.vstack((self.identifysDb, identifys[i]))
self.center.append(self.__getCenter(persons[i]))
ids[i] = len(self.identifysDb) - 1
print("> append DB size:{}".format(len(self.identifysDb)))
print(ids)
# 重複がある場合は、信頼度の低い方を無効化する
for i, a in enumerate(ids):
for e, b in enumerate(ids):
if(e == i):
continue
if(a == b):
if(similarity[a] > similarity[b]):
ids[i] = -1
else:
ids[e] = -1
print(ids)
return ids
# コサイン類似度
# 参考にさせて頂きました: https://github.com/kodamap/person_reidentification
def __cos_similarity(self, X, Y):
m = X.shape[0]
Y = Y.T
return np.dot(X, Y) / (
np.linalg.norm(X.T, axis=0).reshape(m, 1) * np.linalg.norm(Y, axis=0)
)
# モデル基本情報の表示
def display_info(image, detector, reidentification, device, threshold, speedflg, outpath):
print(YELLOW + title + ': Starting application...' + NOCOLOR)
print(' - ' + YELLOW + 'Image File : ' + NOCOLOR, image)
print(' - ' + YELLOW + 'm_detect : ' + NOCOLOR, detector)
print(' - ' + YELLOW + 'm_redient. : ' + NOCOLOR, reidentification)
print(' - ' + YELLOW + 'Device : ' + NOCOLOR, device)
print(' - ' + YELLOW + 'Threshold : ' + NOCOLOR, threshold)
print(' - ' + YELLOW + 'Speed flag : ' + NOCOLOR, speedflg)
print(' - ' + YELLOW + 'Processed out: ' + NOCOLOR, outpath)
# 画像の種類を判別する
# 戻り値: 'jeg''png'... 画像ファイル
# 'None' 画像ファイル以外 (動画ファイル)
# 'NotFound' ファイルが存在しない
import imghdr
def is_pict(filename):
try:
imgtype = imghdr.what(filename)
except FileNotFoundError as e:
imgtype = 'NotFound'
return str(imgtype)
# ** main関数 **
def main():
# Argument parsing and parameter setting
ARGS = parse_args().parse_args()
input_stream = ARGS.image
if ARGS.image.lower() == "cam" or ARGS.image.lower() == "camera":
input_stream = 0
else:
filetype = is_pict(input_stream)
if (filetype == 'NotFound' or filetype !='None'):
print(RED + "\ninput file Not found." + NOCOLOR)
quit()
isstream = True
detection_threshold = ARGS.threshold
speedflg = ARGS.speed
model_detector=ARGS.m_detector
model_reidentification=ARGS.m_reidentification
outpath = ARGS.out
device = ARGS.device
cpu_extension = None
ie_core = IECore()
if device == "CPU" and cpu_extension:
ie_core.add_extension(cpu_extension, "CPU")
# 情報表示
display_info(input_stream, model_detector, model_reidentification, device, detection_threshold, speedflg, outpath)
person_detector = PersonDetector(model_detector, device, ie_core, detection_threshold, num_requests=2)
personReidentification = PersonReidentification(model_reidentification, device, ie_core, detection_threshold, num_requests=2)
tracker = Tracker()
# 入力準備
cap = cv2.VideoCapture (input_stream)
# 処理結果の記録 step1
if (outpath != 'non'):
if (isstream):
fps = int(cap.get(cv2.CAP_PROP_FPS))
out_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
out_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
outvideo = cv2.VideoWriter(outpath, fourcc, fps, (out_w, out_h))
colors = []
for i in range(TRACKING_MAX):
b = random.randint(0, 255)
g = random.randint(0, 255)
r = random.randint(0, 255)
colors.append((b,g,r))
# 計測値初期化
fpsWithTick = mylib.fpsWithTick()
frame_count = 0
fps_total = 0
fpsWithTick.get() # fps計測開始
# メインループ
while True:
grabbed, frame = cap.read()
if not grabbed:# ループ再生
break
if(frame is None):
break
# Personを検知する
persons = []
detections = person_detector.infer(frame)
if(len(detections) > 0):
print("-------------------")
for detection in detections:
x1 = int(detection[0])
y1 = int(detection[1])
x2 = int(detection[2])
y2 = int(detection[3])
conf = detection[4]
print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2))
persons.append([x1,y1,x2,y2])
print("====================")
# 各Personの画像から識別情報を取得する
identifys = np.zeros((len(persons), 255))
for i, person in enumerate(persons):
# 各Personのimage取得
img = frame[person[1] : person[3], person[0]: person[2]]
h, w = img.shape[:2]
if(h==0 or w==0):
continue
# identification取得
identifys[i] = personReidentification.infer(img)
# Idの取得
ids = tracker.getIds(identifys, persons)
# 枠及びIdを画像に追加
for i, person in enumerate(persons):
if(ids[i]!=-1):
color = colors[int(ids[i])]
frame = cv2.rectangle(frame, (person[0], person[1]), (person[2] ,person[3]), color, int(2))
frame = cv2.putText(frame, str(ids[i]), (person[0], person[1]), cv2.FONT_HERSHEY_PLAIN, int(2), color, int(2), cv2.LINE_AA)
# 画像の縮小
h, w = frame.shape[:2]
frame = cv2.resize(frame, ((int(w * SCALE), int(h * SCALE))))
# FPSを計算する
fps = fpsWithTick.get()
st_fps = 'fps: {:>6.2f}'.format(fps)
if (speedflg == 'y'):
cv2.rectangle(frame, (10, 38), (95, 55), (90, 90, 90), -1)
cv2.putText(frame, st_fps, (15, 50), cv2.FONT_HERSHEY_DUPLEX, fontScale=0.4, color=(255, 255, 255), lineType=cv2.LINE_AA)
# 画像の表示
window_name = title + " (hit 'q' or 'esc' key to exit)"
cv2.namedWindow(window_name, cv2.WINDOW_AUTOSIZE) # 2021/0820
cv2.imshow(window_name, frame)
# 処理結果の記録 step2
if (outpath != 'non'):
if (isstream):
outvideo.write(frame)
else:
cv2.imwrite(outpath, frame)
key = cv2.waitKey(1)
ESC_KEY = 27
if key in {ord('q'), ord('Q'), ESC_KEY}:
break
# 終了処理
if (isstream):
cap.release()
# 処理結果の記録 step3
if (outpath != 'non'):
if (isstream):
outvideo.release()
cv2.destroyAllWindows()
print('\nFPS average: {:>10.2f}'.format(fpsWithTick.get_average()))
print('\n Finished.')
# main関数エントリーポイント(実行開始)
if __name__ == "__main__":
sys.exit(main())
# -*- coding: utf-8 -*-
##------------------------------------------
## OpenVINO™ toolkit
## model access base class
##
## 2021.03.10 Masahiro Izutsu
##------------------------------------------
## https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/python_demos/asl_recognition_demo/asl_recognition_demo/common.py
## model.py
class Model:
def __init__(self, model_path, device, ie_core, num_requests, output_shape=None):
if model_path.endswith((".xml", ".bin")):
model_path = model_path[:-4]
self.net = ie_core.read_network(model_path + ".xml", model_path + ".bin")
self.exec_net = ie_core.load_network(network=self.net, device_name=device, num_requests=num_requests)
self.input_name = next(iter(self.net.input_info))
if len(self.net.outputs) > 1:
if output_shape is not None:
candidates = []
for candidate_name in self.net.outputs:
candidate_shape = self.exec_net.requests[0].output_blobs[candidate_name].buffer.shape
if len(candidate_shape) != len(output_shape):
continue
matches = [src == trg or trg < 0
for src, trg in zip(candidate_shape, output_shape)]
if all(matches):
candidates.append(candidate_name)
if len(candidates) != 1:
raise Exception("One output is expected")
self.output_name = candidates[0]
else:
raise Exception("One output is expected")
else:
self.output_name = next(iter(self.net.outputs))
self.input_size = self.net.input_info[self.input_name].input_data.shape
self.output_size = self.exec_net.requests[0].output_blobs[self.output_name].buffer.shape
self.num_requests = num_requests
def infer(self, data):
input_data = {self.input_name: data}
infer_result = self.exec_net.infer(input_data)
return infer_result[self.output_name]