前面写了两个简单的屏幕共享工具,不过那只是为了验证通过截屏的方式是否可行,因为通常手动截屏的频率很低,而对于视频来说它的帧率要求就很高了,至少要一秒30帧率左右。所以,经过实际的截屏工具验证,我了解了几个Python截屏库的特点和限制。例如,多数截屏库都不支持很高的截屏速度,并且截屏是典型的 CPU 密集任务(我尝试使用多线程截屏,发现速度更慢了,之后有时间我也会把这一点整理成文章发出来)。
所以,我的初始的想法其实是基于 WebSocket 来实现的。现在,就让我们对先前的代码进行重构,采用 WebSocket 来传输图片数据。不过我这里没有使用到它的双向传输的特性,只是将原来 HTTP 传输的图片换成通过 WebSocket 来传输了。不过这里后续还有很多东西可以开发,如果有时间的话,也可以基于这个做一些有趣的东西。
演示
我这个笔记本的性能可能不太行,我只要打开视频帧率就降低了很多,哈哈。
截取播放B站视频
截取摄像头画面
这样甚至可以远程共享画面了,如果两个人都布置一个,就可以各自看到对话了(不过没有声音,且效率低下,可能也就只能在局域网使用),不过这样它是相当于从服务器的地方获取的数据,而我们平时使用的视频通话工具都是从客户端获取的数据。
而且页面越多,帧率越低,这里可能要优化一下或者它就是这么累赘,只能个人使用。
说明
注意,这里的测试环境是 Windows,因为有些库依赖于 Windows 提供的特性,所以需要在 Windows 上运行它。在程序启动时,它会开启一个截屏的线程,不断去获取屏幕的截屏,如果有用户访问,它就会通过 WebSocket 连接,把获取的屏幕截图数据传送给前端,前端的逻辑就是将它绘制在 canvas 上,并添加帧率显示。
注意:这部分前端的代码是 AI 生成,对于验证小功能来说,AI 真是太完美了。而且,其实这整个部分都可以让 AI 来做,但是具体是哪些的组合还是自己选择的,毕竟每个人的偏好和技术栈不同。
代码
所有的代码都在这里了,大概60行代码,我把前端压缩成一行代码了。如果要运行代码先要安装 flask
, flask_sock
, pillow
, dxcam
库。
import time
from flask import Flask
from flask_sock import Sock
from io import BytesIO
from PIL import Image
import dxcam# 创建应用
app = Flask(__name__)
sockets = Sock(app)
# 整个应用只创建一个即可
camera = dxcam.create(device_idx=0, output_idx=1) # output_idx 0 是第一块屏幕,1 是第二块屏幕
camera.start(target_fps=60, video_mode=True)
JPEG_QUALITY = 80 # 默认的jpeg图片质量,越高需要的计算量越大,同时越清晰# 直接前后端写一起了,这个只是一个演示的demo
INDEX_HTML = """
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Screen Sharing</title> <style> body { display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background-color: #f0f0f0; } canvas { border: 1px solid black; } </style> </head> <body> <canvas id="screenCanvas" width="960" height="540"></canvas> <script> const canvas = document.getElementById('screenCanvas'); const ctx = canvas.getContext('2d'); const ws = new WebSocket('ws://'+window.location.host+'/remote_desktop'); ws.onopen = () => { console.log("WebSocket connected"); }; ws.onerror = (event) => { console.error("WebSocket error:", event); }; let lastFrameTime = performance.now(); let frameCount = 0; let fps = 0; ws.onmessage = (event) => { const image = new Image(); image.src = URL.createObjectURL(event.data); image.onload = () => { ctx.drawImage(image, 0, 0, canvas.width, canvas.height); frameCount++; const now = performance.now(); const elapsed = now - lastFrameTime; if (elapsed >= 1000) { fps = frameCount / (elapsed / 1000); frameCount = 0; lastFrameTime = now; } displayFPS(); }; }; function displayFPS() { ctx.font = '30px Arial'; ctx.fillStyle = 'red'; ctx.fillText(`FPS: ${Math.round(fps)}`, 50, 50); } </script> </body> </html>
"""@app.route('/', methods=['GET'])
def index():"""简单的前端"""return INDEX_HTML@sockets.route('/remote_desktop')
def get_desktop(ws):"""获取一帧图片,并发送给前端"""buffer = BytesIO()fps = 0 # 计算后端生成的帧率frame_count = 0last_frame_time = time.perf_counter()while True:reset_buffer(buffer) # 每次重置buffer,方便复用img_data = get_frame(buffer)frame_count += 1elapsed = time.perf_counter() - last_frame_timeif elapsed >= 1:fps = frame_count // elapsedlast_frame_time = time.perf_counter()frame_count = 0print("backend real frame fps: ", fps)ws.send(img_data)def get_frame(buffer):"""获取一帧,然后处理成jpeg并返回二进制数据"""image_array = camera.get_latest_frame()Image.fromarray(image_array).save(buffer, format="JPEG", quality=JPEG_QUALITY)return buffer.getvalue()def reset_buffer(buffer):# 重置buffer,方便复用buffer.truncate(0)buffer.seek(0)if __name__ == '__main__': print("remote desktop server starting...")app.run("0.0.0.0", 9000)