1.版本说明:ThinkPHP6.0+ 、PHP8.0+
2.实现原理
应用场景,监听用户登陆,修改用户登陆状态,通过心跳时间计算用户是否下线,其他应用场景自行修改逻辑即可。
- 第一步:安装workerman
composer require topthink/think-worker
- 第二步:修改配置文件
// 执行以上代码后,config目录下会自动生成三个文件:gateway_worker.php、worker.php、worker_server.php// 因本人使用的gateway方式,因此需要修改的配置文件是gateway_worker.php,具体根据自己应用场景修改,本人修改的为业务处理逻辑的控制器<?phpreturn [// 其他配置没有动,所以没复制上,只改的下面的eventHandler// BusinsessWorker配置'businessWorker' => ['name' => 'BusinessWorker','count' => 1,'eventHandler' => '\app\common\event\WorkerHandler',],];
- 第三步:启动服务
// windows无法启动的请看本人博客,里面有相关说明,Mac和Linux没问题 https://blog.csdn.net/qq_38997379/article/details/143957149?spm=1001.2014.3001.5501php think worker:gateway
- 第四步:连接测试,vue或html都可以,下面有具体封装的方法
- 第五步:封装业务逻辑处理,下方有封装的方法
- 第六步:线上部署,修改nginx伪静态
location /wss {proxy_pass http://127.0.0.1:2358; // 端口号改为自己的端口proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";proxy_set_header X-Real-IP $remote_addr;
}
3.业务处理代码-WorkerHandler.php
<?phpnamespace app\common\event;use app\common\model\JudgesPublic;
use GatewayWorker\Lib\Gateway;
use think\facade\Log;use function htmlspecialchars;/*** WebSocket 事件处理器*/
class WorkerHandler
{/*** 当客户端连接时触发* @param mixed $client_id 客户端 ID*/public static function onConnect(mixed $client_id): void{try {$response = ['type' => 'connect','message' => 'Welcome to WebSocket server!',];$jsonResponse = json_encode($response);if (json_last_error() !== JSON_ERROR_NONE) {throw new \Exception('Failed to encode JSON response');}Gateway::sendToClient($client_id, $jsonResponse);} catch (\Exception $e) {// 记录日志error_log("Error in onConnect: " . $e->getMessage());}}/*** 当客户端发送消息时触发* @param mixed $client_id 客户端 ID* @param string $message 接收到的消息*/public static function onMessage(mixed $client_id, string $message): void{try {$data = json_decode($message, true);if (json_last_error() !== JSON_ERROR_NONE || !$data || !isset($data['type'])) {Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Invalid message format',]));return;}switch ($data['type']) {case 'ping': // 心跳包// 保存用户最近一次心跳时间,本人使用redis$uid = Gateway::getUidByClientId($client_id);redis_set('user_judges_public_last_heartbeat_' . $uid, $data['timestamp'], 600);Gateway::sendToClient($client_id, json_encode(['type' => 'pong','message' => 'Heartbeat received',]));break;case 'login': // 登陆// 绑定用户信息Gateway::bindUid($client_id, $data['user_id']);redis_set('user_client_judges_public_' . $data['user_id'], $client_id, 18 * 3600);// 修改用户状态self::changeJudgeStatus($data['user_id']);Gateway::sendToClient($client_id, json_encode(['type' => 'login','message' => '登陆成功,user_id' . $data['user_id'],]));break;case 'broadcast': // 广播消息$safeMessage = htmlspecialchars($data['message'] ?? 'No message', ENT_QUOTES, 'UTF-8');Gateway::sendToAll(json_encode(['type' => 'broadcast','message' => $safeMessage,]));break;case 'private': // 私聊消息if (isset($data['to_client_id']) && is_numeric($data['to_client_id'])) {$toClientId = intval($data['to_client_id']);if (Gateway::isUidOnline($toClientId)) {$safeMessage = htmlspecialchars($data['message'] ?? 'No message', ENT_QUOTES, 'UTF-8');Gateway::sendToClient($toClientId, json_encode(['type' => 'private','from_client_id' => $client_id,'message' => $safeMessage,]));} else {Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Recipient client ID is not online',]));}} else {Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Invalid recipient client ID',]));}break;default:Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Unknown message type',]));break;}} catch (\Exception $e) {// 记录日志error_log("Error in onMessage: " . $e->getMessage());}}/*** 当客户端断开连接时触发* @param mixed $client_id 客户端 ID*/public static function onClose(mixed $client_id): void{try {$response = ['type' => 'disconnect','client_id' => $client_id,'message' => 'A client has disconnected',];$jsonResponse = json_encode($response);if (json_last_error() !== JSON_ERROR_NONE) {throw new \Exception('Failed to encode JSON response');}Gateway::sendToAll($jsonResponse);} catch (\Exception $e) {// 记录日志error_log("Error in onClose: " . $e->getMessage());}}/*** @note 错误监听* @param mixed $client_id* @param mixed $code*/public static function onError(mixed $client_id, mixed $code): void{try {Log::info($code . "_错误码" . $client_id);$response = ['type' => 'error','client_id' => $client_id,'message' => 'An error occurred',];$jsonResponse = json_encode($response);if (json_last_error() !== JSON_ERROR_NONE) {throw new \Exception('Failed to encode JSON response');}
// Gateway::sendToAll($jsonResponse);} catch (\Exception $e) {error_log("Error in onError: " . $e->getMessage());}}/*** @note 修改登陆状态* @param mixed $user_id* @param int $status 1=登陆 0=未登陆*/protected static function changeJudgeStatus(mixed $user_id, int $status = 1): void{try {$model = new model();$info = $model->findOrEmpty($user_id);if (!$info->isEmpty()) {$info->login_status = $status;$info->save();} else {Log::error($user_id . "_登陆状态修改失败");}} catch (\Exception $e) {Log::error($user_id . "_登陆状态修改失败:" . $e->getMessage());}}}
4.前端代码封装
- 封装scoket.js,带心跳
// src/utils/websocket.js
export default class WebSocketService {constructor(url) {this.url = url; // WebSocket 服务器地址this.socket = null; // WebSocket 实例this.isConnected = false; // 连接状态this.messageCallback = null; // 消息回调this.errorCallback = null; // 错误回调}// 初始化 WebSocketconnect() {if (!this.socket) {this.socket = new WebSocket(this.url);// 监听连接成功this.socket.onopen = () => {this.isConnected = true;console.log("WebSocket connected success");this.startHeartbeat(); // 开始心跳};// 监听消息this.socket.onmessage = (event) => {if (this.messageCallback) {this.messageCallback(JSON.parse(event.data));}};// 监听错误this.socket.onerror = (error) => {console.error("WebSocket error:", error);if (this.errorCallback) {this.errorCallback(error);}};// 监听连接关闭this.socket.onclose = () => {this.isConnected = false;console.log("WebSocket disconnected");this.stopHeartbeat(); // 停止心跳this.reconnect(); // 尝试重新连接};}}// 重新连接逻辑reconnect() {console.log("Reconnecting WebSocket...");setTimeout(() => {this.connect();}, 5000); // 5 秒后重连}// 发送消息sendMessage(message) {if (this.isConnected && this.socket) {this.socket.send(JSON.stringify(message));} else {console.error("WebSocket is not connected");}}// 设置消息回调onMessage(callback) {this.messageCallback = callback;}// 设置错误回调onError(callback) {this.errorCallback = callback;}// 手动关闭 WebSocketclose() {if (this.socket) {this.socket.close();this.socket = null;this.isConnected = false;}}// 添加心跳机制startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.isConnected) {this.sendMessage({type: 'ping', message: 'Heartbeat', timestamp: Date.now()});// console.log('Heartbeat sent');}}, 10000); // 每 10 秒发送一次心跳}stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);}}
}
- main.js引入
// 自己地址
import WebSocketService from './utils/socket/websocket';// 创建一个全局的 WebSocket 实例
// const wsService = new WebSocketService('ws://127.0.0.1:2358');
const wsService = new WebSocketService('wss://xs.qgmykj.com/wss');
wsService.connect();// 将 WebSocket 实例通过 provide 注入
app.provide('websocket', wsService);app.mount("#app");
- 页面引入
const websocket = inject('websocket'); // 获取 WebSocket 实例
const message = ref(null); // 保存收到的消息onMounted(() => {websocket.onMessage((data) => {message.value = data;console.log('收到消息:', data);});})// 发送消息
const sendMessage = (userId) => {const data = {type: 'login', user_id: userId, message: 'Hello, WebSocket!'};websocket.sendMessage(data);
};