I'm trying to implement a multiprocessing version of object detection (video source can be both camera or video) with YOLO model of ultralytics.
I implemented a Queue where to add frames and a process pool with 4 workers: 1 shows the image and the other 3 process the frames.
Now, I have an issue:
When I start the program, the object detection works, but the video is not smooth and it seems "delayed"; with "delayed", I mean that, with respect to the original video source, it is slower like it has an high latency between every frame, thus turns slower than the original one. I'd expect the video to be smooth as the input source.
Any suggestion?
I already tried to vary the number of workers and maxsize of Queue, but it doesn't seem to be better.
from multiprocessing import Pool, Queue, Process, Lock
import cv2
from ultralytics import YOLO
stop_flag = False
def init_pool(d_b, selected_classes):
global detection_buffer, yolo, selected_classes_set
detection_buffer = d_b
yolo = YOLO('yolov8n.pt')
selected_classes_set = set(selected_classes)
def detect_object(frame, frame_id):
global yolo, selected_classes_set
results = yolo.track(frame, stream=False)
for result in results:
classes_names = result.names
for box in result.boxes:
if box.conf[0] > 0.4:
x1, y1, x2, y2 = map(int, box.xyxy[0])
cls = int(box.cls[0])
class_name = classes_names[cls]
if class_name in selected_classes_set:
colour = (0, 255, 0)
cv2.rectangle(frame, (x1, y1), (x2, y2), colour, 2)
cv2.putText(frame, f'{class_name} {box.conf[0]:.2f}', (x1, y1),
cv2.FONT_HERSHEY_SIMPLEX, 1, colour, 2)
detection_buffer.put((frame_id, frame))
def show(detection_buffer):
global stop_flag
next_frame_id = 0
frames_buffer = {}
while not stop_flag:
data = detection_buffer.get()
if data is None:
break
frame_id, frame = data
frames_buffer[frame_id] = frame
while next_frame_id in frames_buffer:
cv2.imshow("Video", frames_buffer.pop(next_frame_id))
next_frame_id += 1
if cv2.waitKey(1) & 0xFF == ord('q'):
stop_flag = True
break
cv2.destroyAllWindows()
return
# Required for Windows:
if __name__ == "__main__":
video_path = "path_to_video"
detection_buffer = Queue(maxsize=3)
selected_classes = ['car']
detect_pool = Pool(3, initializer=init_pool, initargs=(detection_buffer, selected_classes))
num_show_processes = 1
show_processes = Process(target=show, args=(detection_buffer,))
show_processes.start()
if not video_path:
cap = cv2.VideoCapture(0)
else:
cap = cv2.VideoCapture(video_path)
frame_id = 0
futures = []
while not stop_flag:
ret, frame = cap.read()
if ret:
f = detect_pool.apply_async(detect_object, args=(frame, frame_id))
futures.append(f)
frame_id += 1
else:
break
for f in futures:
f.get()
for _ in range(num_show_processes):
detection_buffer.put(None)
for p in show_processes:
p.join()
detect_pool.close()
detect_pool.join()
cv2.destroyAllWindows()