php:使用Ratchet类实现分布式websocket服务

一、前言

        最近需要做一个有关聊天的小程序,逻辑很简单,所以不打算用Swoole和workerman之类的,最后选择了Ratchet,因为简单易用,适合小型websocket服务。

二、问题

        但是目前我的项目是分布式环境,统一通过Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?这就涉及到了分布式websocket服务,但是我不希望太复杂,所以采用了消息队列的方式实现效果。

三、安装Ratchet类

直接使用composer安装,我用版本是"cboden/ratchet": "^0.2.8",比较老了。

composer require cboden/ratchet

四、创建websocket服务

因为php是同步阻塞型语言,通常每次请求都会从头到尾执行完成,在PHP中,所有的代码都按照顺序执行,直到脚本结束为止。但是我们要在一个进程中启动websocket服务还要监听消息队列,就需要用到ratchet中的事件循环机制,实现异步非阻塞通信效果。

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as ReactSocket;set_time_limit(0);
ini_set('default_socket_timeout', -1);
/*** Websocket_Server*/
class ControllerWebsocket_Server
{public function indexAction(){try {$port = 8083;// 创建事件循环(使用该机制实现异步非阻塞通信)$loop = LoopFactory::create();// 创建 React Socket 服务器$socket = new ReactSocket($loop);$socket->listen($port, '0.0.0.0'); // 指定监听的端口和地址// 启动 WebSocket 服务器$server = new IoServer(new WsServer(new \ModelWebsocket_Handler($loop)),$socket,$loop);// 启动事件循环$loop->run();} catch (\Exception $e) {echo $e->getMessage();}}
}

其中ModelWebsocket_Handler是封装好的websocket操作类

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;/*** websocket服务端-相关操作*/
class ModelWebsocket_Handler implements MessageComponentInterface {//数据缓存const REDIS_KEY_RESOURCE_DATA_MAP = 'h:websocket:resource:data:map';//客户端public $clients;public function __construct($loop) {$this->clients = new \SplObjectStorage();$this->subscribeMessage($loop);}/*** 连接建立时的逻辑*/public function onOpen(ConnectionInterface $conn) {$this->clients->attach($conn);echo "New connection! ({$conn->resourceId})\n";//获取连接请求的参数$params = [];$queryString = $conn->WebSocket->request->getQuery();parse_str($queryString, $params);//存储资源id相关数据$this->setResourceDataMap($conn->resourceId, $params);}/*** 收到消息时的逻辑*/public function onMessage(ConnectionInterface $from, $msg) {echo "Received message: {$msg}\n";foreach ($this->clients as $client) {if ($client === $from) {continue;}//发送消息$client->send($msg);}}/*** 连接关闭时的逻辑*/public function onClose(ConnectionInterface $conn) {$this->delResourceDataMap($conn->resourceId);$this->clients->detach($conn);echo "Connection {$conn->resourceId} has disconnected\n";}/*** 错误处理逻辑*/public function onError(ConnectionInterface $conn, \Exception $e) {echo "An error occurred: {$e->getMessage()}\n";$this->delResourceDataMap($conn->resourceId);$conn->close();}/*** 存储资源id相关数据* * @param  string  $resourceId* @param  array   $data* @return bool*/public function setResourceDataMap($resourceId, $data) {$redis = Comm_Redis::init(Comm_Redis::REDIS_TVDB, true);$rs = $redis->hSet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId, json_encode($data));return $rs;}/*** 获取资源id相关数据* * @param  string  $resourceId* @return array*/public function getResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hGet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return json_decode($rs, true) ?: [];}/*** 删除资源id相关数据* * @param  string  $resourceId* @return bool*/public function delResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hDel(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return $rs;}/*** 订阅消息*/public function subscribeMessage($loop){$loop->addPeriodicTimer(1, function () {//在这里可以使用redis订阅消息、也可以使用kafka消费消息,然后再比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用foreach ($this->clients as $client) {$client->send("测试");} });}
}

其中:subscribeMessage方法监听消息队列,收到消息之后比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用。

当然如果你能直接找到用户所连接的服务器,并且可以直接推给相应的服务器,那更好,可以节省流量开销和一些额外的逻辑处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/477761.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙开发-音视频

Media Kit 特点 一般场合的音视频处理&#xff0c;可以直接使用系统集成的Video组件&#xff0c;不过外观和功能自定义程度低Media kit&#xff1a;轻量媒体引擎&#xff0c;系统资源占用低支持音视频播放/录制&#xff0c;pipeline灵活拼装&#xff0c;插件化扩展source/demu…

基于SSM的婚庆管理系统+LW示例参考

1.项目介绍 系统角色&#xff1a;管理员、商家&#xff08;婚庆公司&#xff09;、用户功能模块&#xff1a;管理员&#xff08;用户管理、商家管理、摄影风格管理、礼服款式管理、案例管理、婚车品牌管理、婚纱拍摄管理、策划服务管理、婚宴酒店管理、婚车套餐管理、在线咨询…

manin动画编程(安装+入门)

文章目录 1.基本介绍2.效果展示3.安装步骤3.1安装manba软件3.2配置环境变量3.3查看是否成功3.4什么是mamba3.5创建虚拟环境3.6尝试进入虚拟环境 4.vscode操作4.1默认配置文件 5.安装ffmpeg6.安装manim软件6.vscode制作7.我的学习收获 1.基本介绍 这个manim就是一款软件&#x…

CH595 驱动数码管

先上原理图 我手里的是型号SR410361K的 4段数码管是共阳的&#xff08;低电平驱动&#xff09;&#xff0c;先发送数据&#xff0c;然后发送片选 共阴 共阳的图如下&#xff1a; 如何测量呢&#xff1f; 首先将数字万用表档位调节到蜂鸣器/二极管档&#xff0c;红表笔和黑表笔…

Vue生命周期详解

目录 1.beforeCreate2.created3.beforeMount4.mounted5.beforeUpdate6.updated7.beforeUnmount&#xff08;beforeDestroy&#xff09;8.unmounted&#xff08;destroyed&#xff09; 1.beforeCreate 分析 beforeCreate执行时Vue实例还没有被创建&#xff0c;data和methods也…

MySQL底层概述—1.InnoDB内存结构

大纲 1.InnoDB引擎架构 2.Buffer Pool 3.Page管理机制之Page页分类 4.Page管理机制之Page页管理 5.Change Buffer 6.Log Buffer 1.InnoDB引擎架构 (1)InnoDB引擎架构图 (2)InnoDB内存结构 (1)InnoDB引擎架构图 下面是InnoDB引擎架构图&#xff0c;主要分为内存结构和磁…

【力扣算法题】双指针-战场上的矛与盾的组合(移动零)(快乐数)

前言 &#x1f31f;&#x1f31f;本期讲解关于力扣算法两道双指针题目解析~~~ &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 &#x1f525; 你的点赞就是小编不断更新的最大动力 &#x1f386;那么…

第三十九篇 ShuffleNet V1、V2模型解析

摘要 ShuffleNet V1 ShuffleNet V1是由旷视科技&#xff08;Megvii&#xff0c;又称Face&#xff09;在2017年底提出的一种轻量级卷积神经网络架构。该网络专为移动设备和边缘计算环境设计&#xff0c;旨在以较低的计算资源实现高效的图像分类和其他计算机视觉任务。 特点与…

Springboot系列之:创建Springboot项目,Springboot整合MyBatis-plus

Springboot系列之&#xff1a;创建Springboot项目&#xff0c;Springboot整合MyBatis-plus 一、快速创建Spring boot项目二、项目完整目录三、pom.xml四、application.yaml五、实体类六、mapper七、IService接口八、Service实现类九、配置类十、枚举十一、增删改查测试类十二、…

C++:用红黑树封装map与set-1

文章目录 前言一、STL源码分析二、红黑树的构建三、map与set整体框架的搭建与解析四、如何取出进行比较&#xff1f;1. met与set的数据是不同的2. 取出数据进行比较1&#xff09;问题发现2&#xff09;仿函数解决 五、封装插入六、迭代器的实现1. operator* 与operator->2. …

Perforce《2024游戏技术现状报告》Part3:生成式AI、版本控制、CI/CD等游戏技术的未来趋势与应用

游戏开发者一直处于创新前沿。他们的实践、工具和技术受到各行各业的广泛关注&#xff0c;正在改变着组织进行数字创作的方式。 近期&#xff0c;Perforce发布了《2024游戏技术现状报告》&#xff0c;通过收集来自游戏、媒体与娱乐、汽车和制造业等高增长行业的从业者、管理人…

4-SpringCloud整合服务间的调用即负载均衡

springcloud目录&#xff1a; 1.Spring Cloud简介 2.SpringCloud整合eureka注册中心 3.SpringCloud整合服务注册 4.SpringCloud整合服务间的调用即负载均衡 5.SpringCloud整合Feign调用 6.SpringCloud整合config配置中心 7.SpringCloud整合zuul路由网关 我们复制一个yqx-user服…

Elasticsearch客户端在和集群连接时,如何选择特定的节点执行请求的?

大家好&#xff0c;我是锋哥。今天分享关于【Elasticsearch客户端在和集群连接时&#xff0c;如何选择特定的节点执行请求的&#xff1f;】面试题。希望对大家有帮助&#xff1b; Elasticsearch客户端在和集群连接时&#xff0c;如何选择特定的节点执行请求的&#xff1f; 100…

深入浅出,快速安装并了解汇编语言

1.什么是汇编语言 了解汇编语言需要先从了解机器语言开始&#xff0c;在计算机发展的初期阶段&#xff0c;机器语言是计算机直接理解和执行的二进制代码语言&#xff0c;其核心特点包括直接执行性、资源高效性、学习难度大以及平台依赖性。它主要由指令码构成&#xff0c;这些…

2.2_3 纠错编码—海明码

目录 1、海明码的纠错过程 2、海明距离 3、确认检验码位数 4、确定校验码和数据的位置 5、求出校验码的值 6、检错并纠错 方法一 方法二 1、海明码的纠错过程 2、海明距离 两个合法编码(码字)的对应比特取值不同的比特数称为这两个码字的海明距离(码距)&#xff0c;一…

1992-2021年 各省市县经过矫正的夜间灯光数据(GNLD、VIIRS)区域汇总:省份、城市、区县面板数据

1992-2021年 各省市县经过矫正的夜间灯光数据&#xff08;GNLD、VIIRS&#xff09;区域汇总&#xff1a;省份、城市、区县面板数据 .r.rar https://download.csdn.net/download/2401_84585615/90001905 从1992年至2021年&#xff0c;中国各省份、城市及区县的夜间灯光数据经过…

微信小程序上传微信官方审核流程(1)

1&#xff0c;打开微信开发者工具 2&#xff0c;微信开发者工具右上角有一个上传按钮&#xff0c;点击上传按钮 3&#xff0c;点击完上传按钮会弹出一个上传成功的提示&#xff0c;点击提示框中的确定按钮 4&#xff0c;点击完确定按钮后会显示填写版本好和项目备注 5&#x…

快速获取镜像包的方法

1、当我们需要在无网络的环境中&#xff0c;在Docker环境中安装某个镜像时&#xff0c;需要先下载这个镜像包后&#xff0c;再上传 2、下面以在minio为例 在有网络的电脑中使用使用命令下载 docker pull minio/minio将下载好的tar包保存到指定的目录下 save -o /home/cl/app…

11 —— 打包模式的应用

需求&#xff1a;在开发模式下想让webpack使用style-loader进行css样式的处理&#xff1b;让它把css代码内嵌在js中&#xff1b;在生产模式下提取css代码 —— 判断当前运行命令时所在的环境 方案&#xff1a;借助cross-env全局软件包&#xff0c;设置参数区分打包运行环境 …

docker容器化部署springboot项目

前言 docker安装 下载官网 选择自己的系统 然后安装文档内给的命令按顺序执行即可。设置仓库&#xff0c;安装docker. 一、更换镜像源 一般情况下,docker原本自带的镜像网站不一定连的上,就很容易导致下载镜像失败,因此需要换源. 创建/etc/docker/daemon.json并填入数据…