一、说明
让我们谈谈在Python中使用网络摄像头。我有一个简单的任务,从相机读取帧,并在每一帧上运行神经网络。对于一个特定的网络摄像头,我在设置目标 fps 时遇到了问题(正如我现在所理解的——因为相机可以用 mjpeg 格式运行 30 fps,但不能运行原始),所以我决定深入研究 FFmpeg 看看它是否有帮助。
二、OpenCV和FFmpeg两个选项
我最终让OpenCV和FFmpeg都工作了,但我发现了一件非常有趣的事情:FFmpeg性能优于OpenCV是我的主要用例。事实上,使用 FFmpeg,我读取帧的速度提高了 15 倍,整个管道的加速提高了 32%。我简直不敢相信结果,并多次重新检查了所有内容,但它们是一致的。
注意:当我只是一帧一帧地读取时,性能完全相同,但是当我在读取帧后运行某些内容时,FFmpeg 速度更快(这需要时间)。我将在下面确切地说明我的意思。
2.1 openCV的代码实现
现在,让我们看一下代码。首先 — 使用 OpenCV 读取网络摄像头帧的类:
class VideoStreamCV:def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):self.src = srcself.fps = fpsself.resolution = resolutionself.cap = self._open_camera()self.wait_for_cam()def _open_camera(self):cap = cv2.VideoCapture(self.src)cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])fourcc = cv2.VideoWriter_fourcc(*"MJPG")cap.set(cv2.CAP_PROP_FOURCC, fourcc)cap.set(cv2.CAP_PROP_FPS, self.fps)return capdef read(self):ret, frame = self.cap.read()if not ret:return Nonereturn framedef release(self):self.cap.release()def wait_for_cam(self):for _ in range(30):frame = self.read()if frame is not None:return Truereturn False
2.2 使用FFmpeg
我使用功能,因为相机通常需要时间“热身”。FFmpeg 类使用相同的预热:wait_for_cam
class VideoStreamFFmpeg:def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):self.src = srcself.fps = fpsself.resolution = resolutionself.pipe = self._open_ffmpeg()self.frame_shape = (self.resolution[1], self.resolution[0], 3)self.frame_size = np.prod(self.frame_shape)self.wait_for_cam()def _open_ffmpeg(self):os_name = platform.system()if os_name == "Darwin": # macOSinput_format = "avfoundation"video_device = f"{self.src}:none"elif os_name == "Linux":input_format = "v4l2"video_device = f"{self.src}"elif os_name == "Windows":input_format = "dshow"video_device = f"video={self.src}"else:raise ValueError("Unsupported OS")command = ['ffmpeg','-f', input_format,'-r', str(self.fps),'-video_size', f'{self.resolution[0]}x{self.resolution[1]}','-i', video_device,'-vcodec', 'mjpeg', # Input codec set to mjpeg'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video'-pix_fmt', 'bgr24','-vsync', '2','-f', 'image2pipe', '-']if os_name == "Linux":command.insert(2, "-input_format")command.insert(3, "mjpeg")return subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8)def read(self):raw_image = self.pipe.stdout.read(self.frame_size)if len(raw_image) != self.frame_size:return Noneimage = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)return imagedef release(self):self.pipe.terminate()def wait_for_cam(self):for _ in range(30):frame = self.read()if frame is not None:return Truereturn False
For timing function, I used decorator:run
def timeit(func):def wrapper(*args, **kwargs):t0 = time.perf_counter()result = func(*args, **kwargs)t1 = time.perf_counter()print(f"Main function time: {round(t1-t0, 4)}s")return resultreturn wrapper
作为一个繁重的合成任务,我使用了这个简单的函数来代替神经网络(它也可以只是)。这是一个非常重要的部分,因为没有任何任务,OpenCV和FFmpeg的读取速度是相同的:time.sleep
def computation_task():for _ in range(5000000):9999 * 9999
现在功能与我读取框架的循环,它的时间,运行:computation_task
@timeit
def run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):timer = []for _ in range(100):t0 = time.perf_counter()cam.read()timer.append(time.perf_counter() - t0)if run_task:computation_task()cam.release()return round(np.mean(timer), 4)
最后,我设置了几个参数,使用 OpenCV 和 FFmpeg 初始化 2 个视频流,并在没有和使用它的情况下运行它们。main
computation_task
def main():fsp = 30resolution = (1920, 1080)for run_task in [False, True]:ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)print(f"FFMPEG, task {run_task}:")print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")print(f"CV2, task {run_task}:")print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")
这是我得到的:
FFMPEG, task False:
Main function time: 3.2334s
Mean frame read time: 0.0323sCV2, task False:
Main function time: 3.3934s
Mean frame read time: 0.0332sFFMPEG, task True:
Main function time: 4.461s
Mean frame read time: 0.0014sCV2, task True:
Main function time: 6.6833s
Mean frame read time: 0.023s
因此,如果没有合成任务,我可以获得相同的阅读时间:,。但是对于合成任务:和,所以FFmpeg要快得多。美妙之处在于,我的神经网络应用程序得到了真正的加速,而不仅仅是综合测试,所以我决定分享结果。0.0323
0.0332
0.0014
0.023
下图显示了 1 次迭代所需的时间:读取帧,使用 yolov8s 模型(在 CPU 上)处理它,并使用检测到的对象保存帧:
三 完整脚本
以下是包含综合测试的完整脚本:
import platform
import subprocess
import time
from typing import Tuple
import cv2
import numpy as npclass VideoStreamFFmpeg:def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):self.src = srcself.fps = fpsself.resolution = resolutionself.pipe = self._open_ffmpeg()self.frame_shape = (self.resolution[1], self.resolution[0], 3)self.frame_size = np.prod(self.frame_shape)self.wait_for_cam()def _open_ffmpeg(self):os_name = platform.system()if os_name == "Darwin": # macOSinput_format = "avfoundation"video_device = f"{self.src}:none"elif os_name == "Linux":input_format = "v4l2"video_device = f"{self.src}"elif os_name == "Windows":input_format = "dshow"video_device = f"video={self.src}"else:raise ValueError("Unsupported OS")command = ['ffmpeg','-f', input_format,'-r', str(self.fps),'-video_size', f'{self.resolution[0]}x{self.resolution[1]}','-i', video_device,'-vcodec', 'mjpeg', # Input codec set to mjpeg'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video'-pix_fmt', 'bgr24','-vsync', '2','-f', 'image2pipe', '-']if os_name == "Linux":command.insert(2, "-input_format")command.insert(3, "mjpeg")return subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8)def read(self):raw_image = self.pipe.stdout.read(self.frame_size)if len(raw_image) != self.frame_size:return Noneimage = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)return imagedef release(self):self.pipe.terminate()def wait_for_cam(self):for _ in range(30):frame = self.read()if frame is not None:return Truereturn Falseclass VideoStreamCV:def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):self.src = srcself.fps = fpsself.resolution = resolutionself.cap = self._open_camera()self.wait_for_cam()def _open_camera(self):cap = cv2.VideoCapture(self.src)cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])fourcc = cv2.VideoWriter_fourcc(*"MJPG")cap.set(cv2.CAP_PROP_FOURCC, fourcc)cap.set(cv2.CAP_PROP_FPS, self.fps)return capdef read(self):ret, frame = self.cap.read()if not ret:return Nonereturn framedef release(self):self.cap.release()def wait_for_cam(self):for _ in range(30):frame = self.read()if frame is not None:return Truereturn Falsedef timeit(func):def wrapper(*args, **kwargs):t0 = time.perf_counter()result = func(*args, **kwargs)t1 = time.perf_counter()print(f"Main function time: {round(t1-t0, 4)}s")return resultreturn wrapperdef computation_task():for _ in range(5000000):9999 * 9999@timeit
def run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):timer = []for _ in range(100):t0 = time.perf_counter()cam.read()timer.append(time.perf_counter() - t0)if run_task:computation_task()cam.release()return round(np.mean(timer), 4)def main():fsp = 30resolution = (1920, 1080)for run_task in [False, True]:ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)print(f"FFMPEG, task {run_task}:")print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")print(f"CV2, task {run_task}:")print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")if __name__ == "__main__":main()
注意:此脚本已在Apple的M1 Pro芯片上进行了测试。希望这是有帮助的!阿尔戈·萨基扬