链式算法处理视频流
视频源是本地摄像头
# coding=gbk
# 本地摄像头直接推流到 RTMP 服务器
import cv2
import mediapipe as mp
import subprocess as sp# 初始化 Mediapipe
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_holistic = mp.solutions.holisticholistic = mp_holistic.Holistic(min_detection_confidence=0.7,min_tracking_confidence=0.7
)# AI 算法处理帧
def frame_handler(image):image.flags.writeable = Falseimage_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)results = holistic.process(image_rgb)if results.pose_world_landmarks is not None:image.flags.writeable = Truemp_drawing.draw_landmarks(image,results.pose_landmarks,mp_holistic.POSE_CONNECTIONS,landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())return image# 设置摄像头
camera_index = 0
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():raise IOError("无法打开本地摄像头")# 设置分辨率和帧率
width, height = 640, 360 # 分辨率
fps = 15 # 帧率
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
cap.set(cv2.CAP_PROP_FPS, fps)# FFmpeg 推流地址
dst = "rtmp://localhost:1935/live/dest-local"# FFmpeg 推流命令
command = ['ffmpeg','-y', # 覆盖输出文件'-f', 'rawvideo', # 输入原始视频流格式'-vcodec', 'rawvideo','-pix_fmt', 'bgr24', # 像素格式'-s', f"{width}x{height}", # 分辨率'-r', str(fps), # 帧率'-i', '-', # 从标准输入读取视频流'-c:v', 'libx264', # 视频编码格式'-preset', 'ultrafast', # 超快编码模式'-tune', 'zerolatency', # 优化零延迟'-bufsize', '64k', # 缓冲区设置较小'-maxrate', '1M', # 最大码率控制'-g', '15', # GOP(关键帧间隔,降低到 15 帧)'-f', 'flv', # 输出格式dst
]# 启动 FFmpeg 子进程
pipe = sp.Popen(command, stdin=sp.PIPE)# 视频处理和推流
try:while True:ret, frame = cap.read()if not ret:print("无法读取摄像头数据,程序退出")break# 使用 Mediapipe 算法处理帧processed_frame = frame_handler(frame)# 将帧写入 FFmpeg 输入管道pipe.stdin.write(processed_frame.tobytes())# 显示处理结果cv2.imshow('Video', processed_frame)# 按 'q' 键退出if cv2.waitKey(1) & 0xFF == ord('q'):break
finally:# 释放资源cap.release()cv2.destroyAllWindows()pipe.stdin.close()pipe.wait()print("程序结束")
视频流是网络流 :
# coding=gbk
# 网络摄像头直接推流到 RTMP 服务器
import subprocess as spimport cv2
import mediapipe as mp# 初始化 Mediapipe
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_holistic = mp.solutions.holisticholistic = mp_holistic.Holistic(min_detection_confidence=0.7,min_tracking_confidence=0.7
)# AI 算法处理帧
def frame_handler(image):image.flags.writeable = Falseimage_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)results = holistic.process(image_rgb)if results.pose_world_landmarks is not None:image.flags.writeable = Truemp_drawing.draw_landmarks(image,results.pose_landmarks,mp_holistic.POSE_CONNECTIONS,landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())return image# 设置网络摄像头地址
camera_index = "rtsp://admin:@xxzx@192.168.1.64:554/Streaming/Channels/101" # 替换为你的网络摄像头地址
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():raise IOError(f"无法打开网络摄像头流:{camera_index}")# 设置分辨率和帧率
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # 自动获取分辨率宽度
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 自动获取分辨率高度
fps = int(cap.get(cv2.CAP_PROP_FPS)) # 自动获取帧率# 如果获取失败,设置默认值
if fps == 0:fps = 15
if width == 0 or height == 0:width, height = 640, 360 # 设置默认分辨率# RTMP 推流地址
dst = "rtmp://localhost:1935/live/dest-net"# FFmpeg 推流命令
command = ['ffmpeg','-y', # 覆盖输出文件'-f', 'rawvideo', # 输入原始视频流格式'-vcodec', 'rawvideo','-pix_fmt', 'bgr24', # 像素格式'-s', f"{width}x{height}", # 分辨率'-r', str(fps), # 帧率'-i', '-', # 从标准输入读取视频流'-c:v', 'libx264', # 视频编码格式'-preset', 'ultrafast', # 超快编码模式'-tune', 'zerolatency', # 优化零延迟'-bufsize', '64k', # 缓冲区设置较小'-maxrate', '1M', # 最大码率控制'-g', '15', # GOP(关键帧间隔,降低到 15 帧)'-f', 'flv', # 输出格式dst
]# 启动 FFmpeg 子进程
pipe = sp.Popen(command, stdin=sp.PIPE)# 视频处理和推流
try:while True:ret, frame = cap.read()if not ret:print("无法读取网络摄像头流,程序退出")break# 使用 Mediapipe 算法处理帧processed_frame = frame_handler(frame)# 将帧写入 FFmpeg 输入管道pipe.stdin.write(processed_frame.tobytes())# 显示处理结果cv2.imshow('Video', processed_frame)# 按 'q' 键退出if cv2.waitKey(1) & 0xFF == ord('q'):break
finally:# 释放资源cap.release()cv2.destroyAllWindows()pipe.stdin.close()pipe.wait()print("程序结束")