Thinkphp 使用workerman消息实现消息推送完整示例

1.版本说明:ThinkPHP6.0+ 、PHP8.0+
2.实现原理

       应用场景,监听用户登陆,修改用户登陆状态,通过心跳时间计算用户是否下线,其他应用场景自行修改逻辑即可。

  • 第一步:安装workerman
composer require topthink/think-worker
  • 第二步:修改配置文件
//  执行以上代码后,config目录下会自动生成三个文件:gateway_worker.php、worker.php、worker_server.php//  因本人使用的gateway方式,因此需要修改的配置文件是gateway_worker.php,具体根据自己应用场景修改,本人修改的为业务处理逻辑的控制器<?phpreturn [// 其他配置没有动,所以没复制上,只改的下面的eventHandler// BusinsessWorker配置'businessWorker' => ['name' => 'BusinessWorker','count' => 1,'eventHandler' => '\app\common\event\WorkerHandler',],];
  • 第三步:启动服务
//    windows无法启动的请看本人博客,里面有相关说明,Mac和Linux没问题 https://blog.csdn.net/qq_38997379/article/details/143957149?spm=1001.2014.3001.5501php think worker:gateway
  • 第四步:连接测试,vue或html都可以,下面有具体封装的方法
  • 第五步:封装业务逻辑处理,下方有封装的方法
  • 第六步:线上部署,修改nginx伪静态

location /wss {proxy_pass http://127.0.0.1:2358;    //    端口号改为自己的端口proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";proxy_set_header X-Real-IP $remote_addr;
}

3.业务处理代码-WorkerHandler.php
<?phpnamespace app\common\event;use app\common\model\JudgesPublic;
use GatewayWorker\Lib\Gateway;
use think\facade\Log;use function htmlspecialchars;/*** WebSocket 事件处理器*/
class WorkerHandler
{/*** 当客户端连接时触发* @param mixed $client_id 客户端 ID*/public static function onConnect(mixed $client_id): void{try {$response = ['type' => 'connect','message' => 'Welcome to WebSocket server!',];$jsonResponse = json_encode($response);if (json_last_error() !== JSON_ERROR_NONE) {throw new \Exception('Failed to encode JSON response');}Gateway::sendToClient($client_id, $jsonResponse);} catch (\Exception $e) {// 记录日志error_log("Error in onConnect: " . $e->getMessage());}}/*** 当客户端发送消息时触发* @param mixed $client_id 客户端 ID* @param string $message 接收到的消息*/public static function onMessage(mixed $client_id, string $message): void{try {$data = json_decode($message, true);if (json_last_error() !== JSON_ERROR_NONE || !$data || !isset($data['type'])) {Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Invalid message format',]));return;}switch ($data['type']) {case 'ping': // 心跳包//  保存用户最近一次心跳时间,本人使用redis$uid = Gateway::getUidByClientId($client_id);redis_set('user_judges_public_last_heartbeat_' . $uid, $data['timestamp'], 600);Gateway::sendToClient($client_id, json_encode(['type' => 'pong','message' => 'Heartbeat received',]));break;case 'login': // 登陆//  绑定用户信息Gateway::bindUid($client_id, $data['user_id']);redis_set('user_client_judges_public_' . $data['user_id'], $client_id, 18 * 3600);//  修改用户状态self::changeJudgeStatus($data['user_id']);Gateway::sendToClient($client_id, json_encode(['type' => 'login','message' => '登陆成功,user_id' . $data['user_id'],]));break;case 'broadcast': // 广播消息$safeMessage = htmlspecialchars($data['message'] ?? 'No message', ENT_QUOTES, 'UTF-8');Gateway::sendToAll(json_encode(['type' => 'broadcast','message' => $safeMessage,]));break;case 'private': // 私聊消息if (isset($data['to_client_id']) && is_numeric($data['to_client_id'])) {$toClientId = intval($data['to_client_id']);if (Gateway::isUidOnline($toClientId)) {$safeMessage = htmlspecialchars($data['message'] ?? 'No message', ENT_QUOTES, 'UTF-8');Gateway::sendToClient($toClientId, json_encode(['type' => 'private','from_client_id' => $client_id,'message' => $safeMessage,]));} else {Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Recipient client ID is not online',]));}} else {Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Invalid recipient client ID',]));}break;default:Gateway::sendToClient($client_id, json_encode(['type' => 'error','message' => 'Unknown message type',]));break;}} catch (\Exception $e) {// 记录日志error_log("Error in onMessage: " . $e->getMessage());}}/*** 当客户端断开连接时触发* @param mixed $client_id 客户端 ID*/public static function onClose(mixed $client_id): void{try {$response = ['type' => 'disconnect','client_id' => $client_id,'message' => 'A client has disconnected',];$jsonResponse = json_encode($response);if (json_last_error() !== JSON_ERROR_NONE) {throw new \Exception('Failed to encode JSON response');}Gateway::sendToAll($jsonResponse);} catch (\Exception $e) {// 记录日志error_log("Error in onClose: " . $e->getMessage());}}/*** @note 错误监听* @param mixed $client_id* @param mixed $code*/public static function onError(mixed $client_id, mixed $code): void{try {Log::info($code . "_错误码" . $client_id);$response = ['type' => 'error','client_id' => $client_id,'message' => 'An error occurred',];$jsonResponse = json_encode($response);if (json_last_error() !== JSON_ERROR_NONE) {throw new \Exception('Failed to encode JSON response');}
//            Gateway::sendToAll($jsonResponse);} catch (\Exception $e) {error_log("Error in onError: " . $e->getMessage());}}/*** @note 修改登陆状态* @param mixed $user_id* @param int $status 1=登陆 0=未登陆*/protected static function changeJudgeStatus(mixed $user_id, int $status = 1): void{try {$model = new model();$info = $model->findOrEmpty($user_id);if (!$info->isEmpty()) {$info->login_status = $status;$info->save();} else {Log::error($user_id . "_登陆状态修改失败");}} catch (\Exception $e) {Log::error($user_id . "_登陆状态修改失败:" . $e->getMessage());}}}
4.前端代码封装
  • 封装scoket.js,带心跳
// src/utils/websocket.js
export default class WebSocketService {constructor(url) {this.url = url; // WebSocket 服务器地址this.socket = null; // WebSocket 实例this.isConnected = false; // 连接状态this.messageCallback = null; // 消息回调this.errorCallback = null; // 错误回调}// 初始化 WebSocketconnect() {if (!this.socket) {this.socket = new WebSocket(this.url);// 监听连接成功this.socket.onopen = () => {this.isConnected = true;console.log("WebSocket connected success");this.startHeartbeat(); // 开始心跳};// 监听消息this.socket.onmessage = (event) => {if (this.messageCallback) {this.messageCallback(JSON.parse(event.data));}};// 监听错误this.socket.onerror = (error) => {console.error("WebSocket error:", error);if (this.errorCallback) {this.errorCallback(error);}};// 监听连接关闭this.socket.onclose = () => {this.isConnected = false;console.log("WebSocket disconnected");this.stopHeartbeat(); // 停止心跳this.reconnect(); // 尝试重新连接};}}// 重新连接逻辑reconnect() {console.log("Reconnecting WebSocket...");setTimeout(() => {this.connect();}, 5000); // 5 秒后重连}// 发送消息sendMessage(message) {if (this.isConnected && this.socket) {this.socket.send(JSON.stringify(message));} else {console.error("WebSocket is not connected");}}// 设置消息回调onMessage(callback) {this.messageCallback = callback;}// 设置错误回调onError(callback) {this.errorCallback = callback;}// 手动关闭 WebSocketclose() {if (this.socket) {this.socket.close();this.socket = null;this.isConnected = false;}}// 添加心跳机制startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.isConnected) {this.sendMessage({type: 'ping', message: 'Heartbeat', timestamp: Date.now()});// console.log('Heartbeat sent');}}, 10000); // 每 10 秒发送一次心跳}stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);}}
}

  • main.js引入
//  自己地址
import WebSocketService from './utils/socket/websocket';// 创建一个全局的 WebSocket 实例
// const wsService = new WebSocketService('ws://127.0.0.1:2358');
const wsService = new WebSocketService('wss://xs.qgmykj.com/wss');
wsService.connect();// 将 WebSocket 实例通过 provide 注入
app.provide('websocket', wsService);app.mount("#app");
  • 页面引入
const websocket = inject('websocket'); // 获取 WebSocket 实例
const message = ref(null); // 保存收到的消息onMounted(() => {websocket.onMessage((data) => {message.value = data;console.log('收到消息:', data);});})// 发送消息
const sendMessage = (userId) => {const data = {type: 'login', user_id: userId, message: 'Hello, WebSocket!'};websocket.sendMessage(data);
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/496441.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot 工程使用proguard混淆

在 Maven 构建的 Spring Boot 项目中使用 ProGuard 进行代码混淆时&#xff0c;需要正确配置 Maven 插件和 ProGuard 的混淆规则。由于 Spring Boot 项目通常会依赖大量的反射机制和动态代理&#xff0c;因此必须特别小心确保这些部分在混淆过程中不会被破坏。 步骤 1&#xf…

我的秋招总结

我的秋招总结 个人背景 双非本&#xff0c;985硕&#xff0c;科班 准备情况 以求职为目的学习Java的时间大概一年。 八股&#xff0c;一开始主要是看B站黑马的八股文课程&#xff0c;背JavaGuide和小林coding还有面试鸭。 算法&#xff0c;250&#xff0c;刷了3遍左右 项目&…

Java Stream流详解——串行版

Stream流——串行版 ​ Stream流是java8引入的特性&#xff0c;极大的方便了我们对于程序内数据的操作&#xff0c;提高了性能。通过函数式编程解决复杂问题。 1.BaseStream<T,S extense BaseStream<T,S>> ​ 他是流处理的基石概念&#xff0c;重点不在于这个接…

fisco-bcos系统架构

系统架构 整体架构 标签&#xff1a;架构 强扩展性 模块设计 整体架构上&#xff0c;FISCO BCOS划分成基础层、核心层、管理层和接口层&#xff1a; 基础层:提供区块链的基础数据结构和算法库 核心层: 实现了区块链的核心逻辑&#xff0c;核心层分为两大部分&#xff1a…

探秘仓颉编程语言:使用体验与功能剖析

目录 一、引言&#xff1a;仓颉登场&#xff0c;编程新纪元开启 二、初体验&#xff1a;搭建环境与 “Hello World” &#xff08;一&#xff09;环境搭建指南 &#xff08;二&#xff09;Hello World 初印象 三、核心特性剖析&#xff1a;智能、高效、安全多维解读 &…

Java 面试合集(2024版)

种自己的花&#xff0c;爱自己的宇宙 目录 第一章-Java基础篇 1、你是怎样理解OOP面向对象??? 难度系数&#xff1a;? 2、重载与重写区别??? 难度系数&#xff1a;? 3、接口与抽象类的区别??? 难度系数&#xff1a;? 4、深拷贝与浅拷贝的理解??? 难度系数&…

指针与数组:深入C语言的内存操作艺术

数组名的理解 在上⼀个章节我们在使⽤指针访问数组的内容时&#xff0c;有这样的代码&#xff1a; int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0]; 这⾥我们使⽤ &arr[0] 的⽅式拿到了数组…

使用RabbitMQ

一、MQ是什么 MQ全称 Message Queue&#xff08;消息队列&#xff09;&#xff0c;是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信&#xff0c;主要功能业务解耦。 二、市面上常见的MQ产品 RabbitMQ、RocketMQ&#xff08;阿里的&#xff09;、Kafka 、…

大模型的实践应用33-关于大模型中的Qwen2与Llama3具体架构的差异全解析

大家好,我是微学AI,今天给大家介绍一下大模型的实践应用33-关于大模型中的Qwen2与Llama3具体架构的差异全解析。Qwen2模型与Llama3模型在架构上存在一些细微的差异,这些差异主要体现在注意力机制、模型尺寸相关参数以及嵌入层处理等方面。以下是对这些差异的详细分析。 文章…

NAT 技术如何解决 IP 地址短缺问题?

NAT 技术如何解决 IP 地址短缺问题&#xff1f; 前言 这是我在这个网站整理的笔记,有错误的地方请指出&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;神的孩子都在歌唱 随着互联网的普及和发展&#xff0c;IP 地址的需求量迅速增加。尤其是 IPv4 地址&…

kafka的备份策略:从备份到恢复

文章目录 一、全量备份二、增量备份三、全量恢复四、增量恢复 前言&#xff1a;Kafka的备份的单元是partition&#xff0c;也就是每个partition都都会有leader partiton和follow partiton。其中leader partition是用来进行和producer进行写交互&#xff0c;follow从leader副本进…

使用sam进行零样本、零学习的分割实践

参照&#xff1a;利用SAM实现自动标注_sam标注-CSDN博客&#xff0c;以及SAM&#xff08;分割一切模型&#xff09;的简单调用_sam使用-CSDN博客 sam简介&#xff1a; Segment Anything Model&#xff08;SAM&#xff09;是Meta公司于2023年发布的一种AI模型&#xff0c;它打破…

【Git】—— 使用git操作远程仓库(gitee)

目录 一、远程仓库常用命令 1、从远程仓库克隆项目 2、查看关联的远程仓库 3、添加关联的远程仓库 4、移除关联的远程仓库 5、将本地仓库推送到远程仓库 6、从远程仓库拉取项目 二、分支命令 1、查询分支 2、创建分支 3、切换分支 4、推送到远程分支 5、合并分支 …

攻防世界web新手第五题supersqli

这是题目&#xff0c;题目看起来像是sql注入的题&#xff0c;先试一下最常规的&#xff0c;输入1&#xff0c;回显正常 输入1‘&#xff0c;显示错误 尝试加上注释符号#或者–或者%23&#xff08;注释掉后面语句&#xff0c;使1后面的单引号与前面的单引号成功匹配就不会报错…

【MySQL】SQL 优化经验

1. 表的设计优化 参考依据&#xff1a;参考阿里开发手册嵩山版&#xff0c;其中有很多关于MySQL表设计的内容。类型选择&#xff1a;根据存储内容选择合适类型&#xff0c;如数值存储可选tinyint、bigint等&#xff0c;字符串可选varchar或text&#xff0c;根据内容长短选择合…

使用 .NET 6 或 .NET 8 上传大文件

如果您正在使用 .NET 6&#xff0c;并且它拒绝上传大文件&#xff0c;那么本文适合您。 我分享了一些处理大文件时需要牢记的建议&#xff0c;以及如何根据我们的需求配置我们的服务&#xff0c;并提供无限制的服务。 本文与 https://blog.csdn.net/hefeng_aspnet/arti…

STM32使用UART发送字符串与printf输出重定向

首先我们先看STM32F103C8T6的电路图 由图可知&#xff0c;其PA9和PA10引脚分别为UART的TX和RX(注意&#xff1a;这个电路图是错误的&#xff0c;应该是PA9是X而PA9是RX&#xff0c;我们看下图的官方文件可以看出)&#xff0c;那么接下来我们应该找到该引脚的定义是什么&#xf…

转运机器人推动制造业智能化转型升级

​在当今制造业智能化转型的浪潮中&#xff0c;技术创新成为企业脱颖而出的关键。富唯转运机器人凭借一系列先进技术&#xff0c;成为智能转型的卓越之选。 一体化 AMR 控制系统是富唯的一大亮点。它采用低代码流程搭建和配置模式&#xff0c;极大地缩短了部署时间。企业无需耗…

深度分析java 使用 proguard 如何解析混淆后的堆栈

经过proguard混淆过后&#xff0c;发生异常时堆栈也进行了混淆&#xff0c;那么如果获取的原始的堆栈呢&#xff1f;我们下面来看下 使用proguard 根据mapping文件直接解析 import proguard.obfuscate.MappingReader; import proguard.retrace.FrameInfo; import proguard.re…