目录
- 背景
- pycharm安装配置
- 代码实现
- 创建本地视频配置 和 推流配置
- 视频帧的处理和检测框绘制
- 主要流程
- 遇到的一些问题
背景
首先这个基于完整安装配置了anaconda和yolo11的环境,如果需要配置开始的话,先看下专栏里另一个文章。
这次的目的是实现拉取视频流,做检测并绘制对象检测框。之后,将结果保存本地视频,并且推流到对应的rtmp服务器,便于调试也可以实时显示处理结果视频。
pycharm安装配置
安装就不提了吧,到官网下载个免费的社区版本就ok了,安装也基本不会有啥问题。
安装完成后,打开你的本地ultralytics文件夹作为项目,然后在设置里加一下解释器:
记得选对你的anaconda配置的环境,右边的列表能看到你环境中安装的库。
代码实现
创建本地视频配置 和 推流配置
依据传入的opencv捕获的视频流对象,获取本地保存视频的一些参数,创建video_writer,并记录推流参数。
def create_video_writer(cap_video, path_output):# 获取视频流参数width = int(cap_video.get(cv2.CAP_PROP_FRAME_WIDTH))height = int(cap_video.get(cv2.CAP_PROP_FRAME_HEIGHT))fps = int(cap_video.get(cv2.CAP_PROP_FPS))# 初始化本地视频保存fourcc = cv2.VideoWriter.fourcc(*'mp4v')out_writer = cv2.VideoWriter(path_output, fourcc, fps, (width, height))# 推流的参数配置command = ['ffmpeg','-y', # 覆盖输出文件(如果存在)'-f', 'rawvideo', # 输入格式'-pix_fmt', 'bgr24', # OpenCV 的像素格式'-s', f'{width}x{height}', # 分辨率'-r', str(fps), # 帧率'-i', '-', # 从标准输入读取'-c:v', 'libx264', # 输出视频编码'-preset', 'ultrafast', # 编码速度预设'-f', 'flv', # 输出格式(RTMP 需要 flv)RTMP_SERVER_URL]print("视频参数:fps " + str(fps))return out_writer, command
视频帧的处理和检测框绘制
依据传入的模型对象和视频帧,去绘制检测框和文本,之后返回结果的图像以及结果数据。
def process_frame(model_in, frame_in):results = model_in.predict(frame_in)# 绘制检测框for result in results:for box in result.boxes:x1, y1, x2, y2 = map(int, box.xyxy[0])conf = box.conf[0].item()cls_id = int(box.cls[0])label = f"{model_in.names[cls_id]} {conf:.2f}"print(label)# 绘制矩形和标签draw_rounded_rect(frame_in, (x1, y1), (x2, y2), (0, 255, 0), 2,cv2.LINE_AA, 10) # 红色圆角矩形cv2.putText(frame_in, label, (x1, y1 - 10),cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)return frame_in, results
draw_rounded_rect 单纯为了绘制个圆角框,可以简单的用cv2.rectangle(frame_in, (x1, y1), (x2, y2), (0, 255, 0), 2, cv2.LINE_AA)画。
def draw_rounded_rect(img, pt1, pt2, color, thickness, line_type, corner_radius):x1, y1 = pt1x2, y2 = pt2# 绘制四个角的圆弧cv2.ellipse(img, (x1 + corner_radius, y1 + corner_radius), (corner_radius, corner_radius), 180, 0, 90, color, thickness, line_type)cv2.ellipse(img, (x2 - corner_radius, y1 + corner_radius), (corner_radius, corner_radius), 270, 0, 90, color, thickness, line_type)cv2.ellipse(img, (x1 + corner_radius, y2 - corner_radius), (corner_radius, corner_radius), 90, 0, 90, color, thickness, line_type)cv2.ellipse(img, (x2 - corner_radius, y2 - corner_radius), (corner_radius, corner_radius), 0, 0, 90, color, thickness, line_type)# 绘制四条边cv2.line(img, (x1 + corner_radius, y1), (x2 - corner_radius, y1), color, thickness, line_type)cv2.line(img, (x1, y1 + corner_radius), (x1, y2 - corner_radius), color, thickness, line_type)cv2.line(img, (x1 + corner_radius, y2), (x2 - corner_radius, y2), color, thickness, line_type)cv2.line(img, (x2, y1 + corner_radius), (x2, y2 - corner_radius), color, thickness, line_type)
主要流程
流程主要就是加载模型,捕获对应的rtmp视频流,跑循环一帧帧解析数据,之后把帧的绘制结果写入本地的视频文件,同时帧结果也通过ffmpeg库推到对应的RTMP server去播放。最后终端清除资源。
# 加载模型
model = YOLO(MODEL_PATH)
# 初始化视频流
cap = cv2.VideoCapture(RTSP_URL)
if not cap.isOpened():raise ValueError("无法打开视频流!")
print("视频流已连接...")
out_writer, ffmpeg_command = create_video_writer(cap, OUTPUT_VIDEO_PATH)
if ENABLE_FEATURE_STREAM:# 启动 FFmpeg 进程ffmpeg_proc = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
if ENABLE_FEATURE_DISPLAY:cv2.namedWindow("DISPLAY", cv2.WINDOW_NORMAL)
print("本地视频写入配置完成...")
try:while True:ret, frame = cap.read()if not ret:print("视频流中断,尝试重连...")cap.release()cap = cv2.VideoCapture(RTSP_URL)time.sleep(1)continue# 处理单帧画面out_frame, _ = process_frame(model, frame)if ENABLE_FEATURE_DISPLAY:frame_small = cv2.resize(out_frame, (1080, 900))cv2.waitKey(1)cv2.imshow("DISPLAY", frame_small)if ENABLE_FEATURE_STREAM:# 推流到服务器if ffmpeg_proc.stdin:ffmpeg_proc.stdin.write(out_frame.tobytes())# 保存到本地out_writer.write(out_frame)
except KeyboardInterrupt:print("用户中断操作")
finally:# 清理资源cap.release()out_writer.release()if ENABLE_FEATURE_STREAM:if ffmpeg_proc.stdin:ffmpeg_proc.stdin.close()ffmpeg_proc.terminate()print("等待进程退出")ffmpeg_proc.wait()print("资源已释放")
然后前面的import,就是缺什么装什么,直接在conda环境里pip装就好了。
from ultralytics import YOLO
import cv2
import time
import subprocess
import numpy as np# 参数
ENABLE_FEATURE_STREAM = True
ENABLE_FEATURE_DISPLAY = True
RTSP_URL = "rtmp://xxxxx" # RTSP 或 HTTP 流地址
OUTPUT_VIDEO_PATH = "YOLO11OutPut.mp4" # 本地保存路径
RTMP_SERVER_URL = "rtmp://xxxxxx" # 推流服务器地址
MODEL_PATH = "yolo11n.pt"
效果就是跑起来前端会有实时视频,并且本地会有mp4保存,远端服务器也能实时看到视频。
至于什么效果啥的就是后续自己调整的事情了。
遇到的一些问题
- 第一个是图里后续加了中文标签,默认opencv字体不支持绘制中文会是?问号。
有两个方法一个是去改yolo的绘制代码,比较麻烦,我最后用的是转PIL绘制,当然需要下字体文件SimHei.ttf丢在目录下。标签的可以在项目内的ultralytics/ultralytics/blob/main/ultralytics/cfg/datasets/coco.yaml找到识别的标签集合,自己简单补个对应中文标签组。至于代码不难,就参考着自己调整吧:
from PIL import Image, ImageDraw, ImageFont
cn_names = {0: "人", 1: "自行车", 2: "汽车", 3: '摩托车', 4: '飞机', 5: '大巴', 6: '火车', 7: '卡车', 8: '船', 9: '信号灯'
}
font_path = "SimHei.ttf"
font_size = 20
font = ImageFont.truetype(font_path, font_size)
def put_text_cn(img, text, pos, color, font):# 转换 OpenCV 图像为 PIL 格式img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))draw = ImageDraw.Draw(img_pil)# 计算文本尺寸并绘制背景框bbox = draw.textbbox((0, 0), text, font=font)text_width = bbox[2] - bbox[0] # 宽度 = 右边界 - 左边界text_height = bbox[3] - bbox[1] # 高度 = 下边界 - 上边界x, y = posbg_pos = (x, y - text_height-5, x + text_width, y)# draw.rectangle(bg_pos, fill=color)# 绘制文本draw.text((bg_pos[0], bg_pos[1]), text, font=font, fill=(255, 255, 255)) # 白色文字# 转换回 OpenCV 格式return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
- 第二个是一开始跑yolo的时候遇到过报“NotImplementedError: Could not run 'torchvision::nms“错误。查了下是已安装库版本的问题,参考https://stackoverflow.com/questions/75103127/getting-notimplementederror-could-not-run-torchvisionnms-with-arguments-fr
uninstall对应的torch库然后重新安装官网链接安装一次试试~ - 还有就是如果下载模型异常,也是跑不起来的,网络实在不行可以考虑直接去yolo的文档里找预训练的模型链接,把模型下下来丢项目根目录先: