Flask-SocketIO 是基于 Flask 的一个扩展,用于简化在 Flask 应用中集成 WebSocket 功能。WebSocket 是一种在客户端和服务器之间实现实时双向通信的协议,常用于实现实时性要求较高的应用,如聊天应用、实时通知等,使得开发者可以更轻松地构建实时性要求较高的应用。通过定义事件处理函数,可以实现双向实时通信,为应用提供更加丰富和实时的用户体验。
前端参数拼接
Flask 提供了针对WebSocket的支持插件flask_socketio
直接通过pip命令安装即可导入使用,同时前端也需要引入SocketIO.js
库文件。
如下代码通过ECharts图表库和WebSocket技术实现了一个实时监控主机CPU负载的动态折线图。通过WebSocket连接到Flask应用中的Socket.IO命名空间,前端通过实时接收后端传来的CPU负载数据,动态更新折线图,展示1分钟、5分钟和15分钟的CPU负载趋势。同时,通过控制台打印实时数据,实现了方便的调试和监控功能。
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><script type="text/javascript" src="https://www.lyshark.com/javascript/jquery/3.5.1/jquery.min.js"></script><script type="text/javascript" src="https://www.lyshark.com/javascript/socket.io/socket.io.min.js"></script><script type="text/javascript" src="https://www.lyshark.com/javascript/echarts/5.3.0/echarts.min.js"></script>
</head>
<body><div id="Linechart" style="height:500px;width:1200px;border:1px solid #673ab7;padding:10px;"></div><!-- 执行绘图函数--><script type="text/javascript" charset="UTF-8">var display = function(time,x,y,z){var echo = echarts.init(document.getElementById("Linechart"));var option = {title: {left: 'left',text: 'CPU 利用表动态监控',},// 调节大小grid: {left: '3%',right: '4%',bottom: '3%',containLabel: true},// tooltip 鼠标放上去之后会自动出现坐标tooltip: {trigger: 'axis',axisPointer: {type: 'cross',label: {backgroundColor: '#6a7985'}}},legend: {data: ['1分钟负载', '5分钟负载', '15分钟负载']},xAxis: {type: 'category',// data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']data: time},yAxis: {type: 'value'},series:[{name: "1分钟负载",stack: "总量",//data: [10, 25, 99, 87, 54, 66, 2],data: x,type: 'line',areaStyle: {}},{name: "5分钟负载",stack: "总量",//data: [89, 57, 85, 44, 25, 4, 54],data: y,type: 'line',areaStyle: {}},{name: "15分钟负载",stack: "总量",//data: [1, 43, 2, 12, 5, 4, 7],data: z,type: 'line',areaStyle: {}}]};echo.setOption(option,true);}</script><!-- 负责对参数的解析,填充数据 --><script type="text/javascript" charset="UTF-8">var time =["","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","",""];var cpu_load1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0];var cpu_load5 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0];var cpu_load15 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0];var update_function = function(recv){time.push(recv.datetime);cpu_load1.push(parseFloat(recv.load1));cpu_load5.push(parseFloat(recv.load5));cpu_load15.push(parseFloat(recv.load15));if(time.length >=10){time.shift();cpu_load1.shift();cpu_load5.shift();cpu_load15.shift();console.log("时间数组: " + time);console.log("1分钟: " + cpu_load1);console.log("5分钟: " + cpu_load5);console.log("15分钟: " + cpu_load15);// 调用绘图函数display(time,cpu_load1,cpu_load5,cpu_load15);}};</script><!-- 负责接收目标主机的CPU负载情况 --><script type="text/javascript" charset="UTF-8">$(document).ready(function(){namespace = '/Socket';var socket = io.connect("http://" + document.domain + ":" + location.port + namespace);socket.emit("message",{"data":"hello lyshark"}); // 初始化完成后,发送一条消息.socket.on('response', function(recv) {console.log("时间: " + recv.datetime);console.log("1分钟: " + recv.load1);console.log("5分钟: " + recv.load5);console.log("15分钟: " + recv.load15);// 调用函数完成数据填充update_function(recv);});});</script>
</body>
</html>
后台代码使用Flask和Flask-SocketIO搭建了一个实时监控主机CPU负载的WebSocket应用,并将数据通过socketio.emit
函数将数据推送给前端展示。
关键点概括如下:
Flask和SocketIO集成:
- 使用Flask框架创建了一个Web应用,并通过Flask-SocketIO集成了WebSocket功能,实现了实时双向通信。
消息接收与实时推送:
- 定义了
socket
事件处理函数,用于接收前端通过WebSocket发送的消息。在无限循环中,通过socketio.sleep
方法设置每2秒推送一次实时的CPU负载数据给前端。
前端连接和断开事件:
- 定义了
connect
和disconnect
事件处理函数,分别在WebSocket连接建立和断开时触发。在控制台打印相应信息,用于监控连接状态。
实时数据推送:
- 使用
socketio.emit
方法实时将CPU负载数据推送给前端,以更新折线图。推送的数据包括当前时间、1分钟负载、5分钟负载和15分钟负载。
前端页面渲染:
- 通过Flask的
render_template
方法渲染了一个HTML页面,用于展示实时更新的CPU负载折线图。
调试信息输出:
- 在每个事件处理函数中使用
print
语句输出调试信息,方便监测WebSocket连接和消息的传递过程。
总体来说,该应用实现了一个简单而实用的实时监控系统,通过WebSocket技术实时推送主机CPU负载数据至前端,为用户提供了实时可视化的监控体验。
from flask import Flask,render_template,request
from flask_socketio import SocketIO
import time,psutilasync_mode = None
app = Flask(__name__)
app.config['SECRET_KEY'] = "lyshark"
socketio = SocketIO(app)@app.route("/")
def index():return render_template("index.html")# 出现消息后,率先执行此处
@socketio.on("message",namespace="/Socket")
def socket(message):print("接收到消息:",message['data'])while True:socketio.sleep(2)data = time.strftime("%M:%S",time.localtime())cpu = psutil.cpu_percent(interval=None,percpu=True)socketio.emit("response",{"datetime": data, "load1": cpu[0], "load5": cpu[1], "load15": cpu[2]},namespace="/Socket")# 当websocket连接成功时,自动触发connect默认方法
@socketio.on("connect",namespace="/Socket")
def connect():print("链接建立成功..")# 当websocket连接失败时,自动触发disconnect默认方法
@socketio.on("disconnect",namespace="/Socket")
def disconnect():print("链接建立失败..")if __name__ == '__main__':socketio.run(app,debug=True)
运行后,即可输出当前系统下CPU的负载情况,如下图所示;
后端参数拼接
如上所示的代码是在前端进行的数据拼接,如果我们想要在后端进行数据的拼接,则需要对代码进行一定的改进。
前端编写以下代码,通过WebSocket建立通信隧道,而后台则每隔2秒向前台推送传递字典数据。
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><script type="text/javascript" src="https://www.lyshark.com/javascript/jquery/3.5.1/jquery.min.js"></script><script type="text/javascript" src="https://www.lyshark.com/javascript/socket.io/socket.io.min.js"></script><script type="text/javascript" src="https://www.lyshark.com/javascript/echarts/5.3.0/echarts.min.js"></script>
</head>
<body><div id="Linechart" style="height:500px;width:1200px;border:1px solid #673ab7;padding:10px;"></div><!-- 执行绘图函数--><script type="text/javascript" charset="UTF-8">var display = function(time,x,y,z){var echo = echarts.init(document.getElementById("Linechart"));var option = {title: {left: 'left',text: 'CPU 利用表动态监控',},// 调节大小grid: {left: '3%',right: '4%',bottom: '3%',containLabel: true},// tooltip 鼠标放上去之后会自动出现坐标tooltip: {trigger: 'axis',axisPointer: {type: 'cross',label: {backgroundColor: '#6a7985'}}},legend: {data: ['1分钟负载', '5分钟负载', '15分钟负载']},xAxis: {type: 'category',// data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']data: time},yAxis: {type: 'value'},series:[{name: "1分钟负载",stack: "总量",//data: [10, 25, 99, 87, 54, 66, 2],data: x,type: 'line',areaStyle: {}},{name: "5分钟负载",stack: "总量",//data: [89, 57, 85, 44, 25, 4, 54],data: y,type: 'line',areaStyle: {}},{name: "15分钟负载",stack: "总量",//data: [1, 43, 2, 12, 5, 4, 7],data: z,type: 'line',areaStyle: {}}]};echo.setOption(option,true);}</script><!-- 负责接收目标主机的CPU负载情况 --><script type="text/javascript" charset="UTF-8">$(document).ready(function(){namespace = '/Socket';var socket = io.connect("http://" + document.domain + ":" + location.port + namespace);socket.emit("message",{"data":"hello lyshark"}); // 初始化完成后,发送一条消息.socket.on('response', function(recv) {console.log("时间: " + recv.datetime);console.log("1分钟: " + recv.load1);console.log("5分钟: " + recv.load5);console.log("15分钟: " + recv.load15);// 调用绘图函数display(recv.datetime,recv.load1,recv.load5,recv.load15);});});</script>
</body>
</html>
后台代码则是收集数据,并将数据通过socketio.emit
函数,推送给前端。
from flask import Flask,render_template,request
from flask_socketio import SocketIO
import time,psutilasync_mode = None
app = Flask(__name__)
app.config['SECRET_KEY'] = "lyshark"
socketio = SocketIO(app)# 填充数据表
local_time = ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
cpu_load1 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cpu_load5 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cpu_load15 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]# 左移填充
def shift(Array, Size, Push):if len(Array) <= Size and len(Array) >= 0:Array.pop(0)Array.append(Push)return Truereturn False# 右移填充
def unshift(Array, Size, Push):if len(Array) <= Size and len(Array) >= 0:Array.pop(Size-1)Array.insert(0,Push)@app.route("/")
def index():return render_template("index.html")# 出现消息后,率先执行此处
@socketio.on("message",namespace="/Socket")
def socket(message):print("接收到消息:",message['data'])while True:socketio.sleep(1)data = time.strftime("%M:%S",time.localtime())cpu = psutil.cpu_percent(interval=None,percpu=True)# 实现数组最大35,每次左移覆盖第一个shift(local_time,35,data)shift(cpu_load1,35,cpu[0])shift(cpu_load5, 35, cpu[1])shift(cpu_load15, 35, cpu[2])socketio.emit("response",{"datetime": local_time, "load1": cpu_load1, "load5": cpu_load5, "load15": cpu_load15},namespace="/Socket")# 当websocket连接成功时,自动触发connect默认方法
@socketio.on("connect",namespace="/Socket")
def connect():print("链接建立成功..")# 当websocket连接失败时,自动触发disconnect默认方法
@socketio.on("disconnect",namespace="/Socket")
def disconnect():print("链接建立失败..")if __name__ == '__main__':socketio.run(app,debug=True)
运行动态图如下所示;