Hyperf 如何做到用两个端口 9501/9502 都能连接 Websocket 服务以及多 Worker 协作实现聊天室功能

为何 Hyperf 能够在两个端口上监听 WebSocket 连接?

源码角度来看,在配置了多个 Servers 时,实际上,只启动了一个 Server

注:我之前接触的代码都是启动一个服务绑定一个端口,之前也看过 swoole 扩展的文档,但是没留意服务和监听端口也是分离的,这启发了我一种思维,代码凡是能继续拆分的,就继续拆分,这样代码就会有更多的灵活,每个功能都能进行扩展,将服务和端口进行拆分之后,就可以在一个 Server 绑定多个 Port,每个 Port 又能有独立的事件。

/*** @param Port[] $servers* @return Port[]*/
protected function sortServers(array $servers): array
{$sortServers = [];foreach ($servers as $server) {switch ($server->getType()) {case ServerInterface::SERVER_HTTP:$this->enableHttpServer = true;if (! $this->enableWebsocketServer) {array_unshift($sortServers, $server);} else {$sortServers[] = $server;}break;case ServerInterface::SERVER_WEBSOCKET:$this->enableWebsocketServer = true;array_unshift($sortServers, $server);break;default:$sortServers[] = $server;break;}}return $sortServers;
}

从源码看,排在第一个的服务配置,会被创建服务,之后都是增加监听

protected function initServers(ServerConfig $config)
{$servers = $this->sortServers($config->getServers());foreach ($servers as $server) {$name = $server->getName();$type = $server->getType();$host = $server->getHost();$port = $server->getPort();$sockType = $server->getSockType();$callbacks = $server->getCallbacks();if (! $this->server instanceof SwooleServer) {$this->server = $this->makeServer($type, $host, $port, $config->getMode(), $sockType);$callbacks = array_replace($this->defaultCallbacks(), $config->getCallbacks(), $callbacks);$this->registerSwooleEvents($this->server, $callbacks, $name);$this->server->set(array_replace($config->getSettings(), $server->getSettings()));ServerManager::add($name, [$type, current($this->server->ports)]);if (class_exists(BeforeMainServerStart::class)) {// Trigger BeforeMainServerStart event, this event only trigger once before main server start.$this->eventDispatcher->dispatch(new BeforeMainServerStart($this->server, $config->toArray()));}} else {/** @var bool|\Swoole\Server\Port $slaveServer */$slaveServer = $this->server->addlistener($host, $port, $sockType);if (! $slaveServer) {throw new \RuntimeException("Failed to listen server port [{$host}:{$port}]");}$server->getSettings() && $slaveServer->set(array_replace($config->getSettings(), $server->getSettings()));$this->registerSwooleEvents($slaveServer, $callbacks, $name);ServerManager::add($name, [$type, $slaveServer]);}// Trigger beforeStart event.if (isset($callbacks[Event::ON_BEFORE_START])) {[$class, $method] = $callbacks[Event::ON_BEFORE_START];if ($this->container->has($class)) {$this->container->get($class)->{$method}();}}if (class_exists(BeforeServerStart::class)) {// Trigger BeforeServerStart event.$this->eventDispatcher->dispatch(new BeforeServerStart($name));}}
}

从makeServer函数来看,如果服务中有SERVER_WEBSOCKET,则这个会被作为主服务启动,new SwooleWebSocketServer

protected function makeServer(int $type, string $host, int $port, int $mode, int $sockType): SwooleServer
{switch ($type) {case ServerInterface::SERVER_HTTP:return new SwooleHttpServer($host, $port, $mode, $sockType);case ServerInterface::SERVER_WEBSOCKET:return new SwooleWebSocketServer($host, $port, $mode, $sockType);case ServerInterface::SERVER_BASE:return new SwooleServer($host, $port, $mode, $sockType);}throw new RuntimeException('Server type is invalid.');
}

$this->registerSwooleEvents($this->server, $callbacks, $name); 这句代码会将 Websocket 的各种事件都注册进去,于是主服务器拥有 websocket 的各种事件,而后 http 服务器挂载 9501 端口上,绑定了 onrequest 事件,但是如果有 websocket 连接9501 端口上时,默认该服务器是自动开启 websocket 自动升级的,又因为监听 9501 端口绑定的主服务器是 WebSocketServer,因此,WebSocketServer 默认的 onmessage,onopen事件就会被拿来用。

推测,如果开启 9503 WebSocket服务器,那么理论上用 WebSocket连接 9501 端口,应该就是连接的 9503 的回调事件。如果不想让 http 监听端口自动开启 websocket 协议,则将open_websocket_protocol=false

<?php
return [
// 这里省略了该文件的其它配置
'servers' => [['name' => 'http','type' => Server::SERVER_HTTP,'host' => '0.0.0.0','port' => 9501,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [Event::ON_REQUEST => [Hyperf\HttpServer\Server::class,'onRequest'],],'settings' => ['open_websocket_protocol' => false,]],]];

关于很多SWOOLE中出现的常量来看,这些东西在执行脚本时,会被自动设置好,通过实际代码运行发现

define('SWOOLE_HTTP2_ERROR_COMPRESSION_ERROR', 9);
define('SWOOLE_HTTP2_ERROR_CONNECT_ERROR', 10);
define('SWOOLE_HTTP2_ERROR_ENHANCE_YOUR_CALM', 11);
define('SWOOLE_HTTP2_ERROR_INADEQUATE_SECURITY', 12);
define('SWOOLE_BASE', 1);
define('SWOOLE_PROCESS', 2);
define('SWOOLE_IPC_UNSOCK', 1);
define('SWOOLE_IPC_MSGQUEUE', 2);
define('SWOOLE_IPC_PREEMPTIVE', 3);

以下,随便设置的一个test.php中输出SWOOLE_BASE,都能输出1,我之前都以为这些常量是运行时设置的呢,看来这种理解是错误的。

<?php
echo SWOOLE_BASE."\n";
echo "hello\n";// 输出
1
hello

Hyperf-skeleton 给的默认配置就是进程模式,这个竟然没有发现,这样就比较明确了,使用的都是PROCESS模式,那么在websocket连接时,所有的连接都是由Manager来控制

SWOOLE_PRECESS 和 SWOOLE_BASE 两种模式

Server 的两种运行模式介绍

在 Swoole\Server 构造函数的第三个参数,可以填 2 个常量值 -- SWOOLE_BASE或 SWOOLE_PROCESS,下面将分别介绍这两个模式的区别以及优缺点

SWOOLE_PROCESS

SWOOLE_PROCESS 模式的 Server 所有客户端的 TCP 连接都是和主进程建立的,内部实现比较复杂,用了大量的进程间通信、进程管理机制。适合业务逻辑非常复杂的场景。Swoole 提供了完善的进程管理、内存保护机制。 在业务逻辑非常复杂的情况下,也可以长期稳定运行。

Swoole 在 Reactor线程中提供了 Buffer 的功能,可以应对大量慢速连接和逐字节的恶意客户端。

进程模式的优点:

  • 连接与数据请求发送是分离的,不会因为某些连接数据量大某些连接数据量小导致 Worker 进程不均衡

  • Worker 进程发生致命错误时,连接并不会被切断

  • 可实现单连接并发,仅保持少量 TCP 连接,请求可以并发地在多个 Worker 进程中处理

进程模式的缺点:

  • 存在 2 次 IPC 的开销,master 进程与 worker 进程需要使用 unixSocket进行通信

  • SWOOLE_PROCESS 不支持 PHP ZTS,在这种情况下只能使用 SWOOLE_BASE 或者设置 single_thread为 true

SWOOLE_BASE

SWOOLE_BASE 这种模式就是传统的异步非阻塞 Server。与 Nginx 和 Node.js 等程序是完全一致的。

worker_num参数对于 BASE 模式仍然有效,会启动多个 Worker 进程。

当有 TCP 连接请求进来的时候,所有的 Worker 进程去争抢这一个连接,并最终会有一个 worker 进程成功直接和客户端建立 TCP 连接,之后这个连接的所有数据收发直接和这个 worker 通讯,不经过主进程的 Reactor 线程转发。

  • BASE 模式下没有 Master 进程的角色,只有 Manager进程的角色。

  • 每个 Worker 进程同时承担了 SWOOLE_PROCESS模式下 Reactor线程和 Worker 进程两部分职责。

  • BASE 模式下 Manager 进程是可选的,当设置了 worker_num=1,并且没有使用 Task 和 MaxRequest 特性时,底层将直接创建一个单独的 Worker 进程,不创建 Manager 进程

BASE 模式的优点:

  • BASE 模式没有 IPC 开销,性能更好

  • BASE 模式代码更简单,不容易出错

BASE 模式的缺点:

  • TCP 连接是在 Worker 进程中维持的,所以当某个 Worker 进程挂掉时,此 Worker 内的所有连接都将被关闭

  • 少量 TCP 长连接无法利用到所有 Worker 进程

  • TCP 连接与 Worker 是绑定的,长连接应用中某些连接的数据量大,这些连接所在的 Worker 进程负载会非常高。但某些连接数据量小,所以在 Worker 进程的负载会非常低,不同的 Worker 进程无法实现均衡。

  • 如果回调函数中有阻塞操作会导致 Server 退化为同步模式,此时容易导致 TCP 的 backlog队列塞满问题。

BASE 模式的适用场景:

如果客户端连接之间不需要交互,可以使用 BASE 模式。如 Memcache、HTTP 服务器等。

BASE 模式的限制:

在 BASE 模式下,Server 方法除了 send和 close以外,其他的方法都不支持跨进程执行。

Reactor 线程和 Worker 进程

Reactor 线程

  • Reactor 线程是在 Master 进程中创建的线程

  • 负责维护客户端 TCP 连接、处理网络 IO、处理协议、收发数据

  • 不执行任何 PHP 代码

  • 将 TCP 客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包

Worker 进程

  • 接受由 Reactor 线程投递的请求数据包,并执行 PHP 回调函数处理数据

  • 生成响应数据并发给 Reactor 线程,由 Reactor 线程发送给 TCP 客户端

  • 可以是异步非阻塞模式,也可以是同步阻塞模式

  • Worker 以多进程的方式运行

他们之间的关系可以理解为 Reactor 就是 nginx,Worker 就是 PHP-FPM。Reactor 线程异步并行地处理网络请求,然后再转发给 Worker 进程中去处理。Reactor 和 Worker 间通过 unixSocket进行通信。

在 PHP-FPM 的应用中,经常会将一个任务异步投递到 Redis 等队列中,并在后台启动一些 PHP 进程异步地处理这些任务。Swoole 提供的 TaskWorker 是一套更完整的方案,将任务的投递、队列、PHP 任务处理进程管理合为一体。通过底层提供的 API 可以非常简单地实现异步任务的处理。另外 TaskWorker 还可以在任务执行完成后,再返回一个结果反馈到 Worker。

Swoole 的 Reactor、Worker、TaskWorker 之间可以紧密的结合起来,提供更高级的使用方式。

一个更通俗的比喻,假设 Server 就是一个工厂,那 Reactor 就是销售,接受客户订单。而 Worker 就是工人,当销售接到订单后,Worker 去工作生产出客户要的东西。而 TaskWorker 可以理解为行政人员,可以帮助 Worker 干些杂事,让 Worker 专心工作。

结论

  1. SWOOLE_PROCESS 模式下,Websocket 的连接对象都是由 Server 来控制,创建一个 Reactor 后,将该连接交付给 Reactor 来管理,Reactor 会将请求分配给 Worker 处理,Worker 处理完后,再把消息发给Server,Server 将消息压入队列等具体的 Reactor 发送出去

  2. hyperf 中有一处进程间通信,目前还不清楚,这里为何要向其他进程求助,以及其他进程监听到消息后会不会多发

    1. 开发者说,如果是 SWOOLE_PROCESS模式下不会触发让其他 Worker 发送的机制,只有 SWOOLE_BASE 模式下,每个链接交给每个 Worker 单独处理时,才需要在多个 Worker 协作处理,因为每个 Worker 争抢到的连接都是隔离的,所以不会出现发送多个的情况

  3. 上面 2 给了一种分布式 websocket 构建方式,采用这种多进程的方式就能让多台服务器协作提供长连接服务,保证千级万级用户的接入量

 

 下面这段代码是来自onPipeMessage 监听器的,意味着其他 worker 收到后,判断是在自己进程上的连接就执行

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/108332.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【el-tree】树形组件图标的自定义

饿了么树形组件的图标自定义 默认样式: 可以看到el-tree组件左侧自带展开与收起图标,咱们可以把它隐藏:: .groupList {::v-deep .el-tree-node { .el-icon-caret-right {display: none;} } } 我的全部代码 <div class"groupList"><el…

[NLP]深入理解 Megatron-LM

一. 导读 NVIDIA Megatron-LM 是一个基于 PyTorch 的分布式训练框架&#xff0c;用来训练基于Transformer的大型语言模型。Megatron-LM 综合应用了数据并行&#xff08;Data Parallelism&#xff09;&#xff0c;张量并行&#xff08;Tensor Parallelism&#xff09;和流水线并…

DML语句的用法(MySQL)

文章目录 前言一、DML介绍二、DML语句操作1、给指定字段添加数据2、给全部字段添加数据3、批量添加数据4、修改数据5、删除数据 总结 前言 本文主要介绍SQL语句中DML语句的用法。 在实验开始之前我们先创建一下所要使用表&#xff0c;如下图所示&#xff1a; 一、DML介绍 DM…

matlab使用教程(22)—非线性优化函数的设置

1.设置优化选项 可以使用由 optimset 函数创建的 options 结构体来指定优化参数。然后&#xff0c;可以将 options 作为输入传递给优化函数&#xff0c;例如&#xff0c;通过使用以下语法调用 fminbnd x fminbnd(fun,x1,x2,options) 或使用以下语法调用 fminsearch x f…

单片机IO模拟串口协议

一、前言 嵌入式硬件平台调试中常用的debug方法是看串口打印定位问题&#xff0c;但有时候会遇到单片机没有串口外设或者串口引脚被占用的情况&#xff0c;这时候也可以在代码里操作空闲的IO输出不同个数的脉冲来达到调试的效果&#xff0c;但是要用逻辑分析仪抓线逐个看波形比…

js深拷贝三种方法

使用递归函数实现深拷贝 const obj {name: zzz,age: 18,hobby: [篮球, 足球],family: {baby: baby}} // 深拷贝 数组 对象 一定要先筛数组再筛对象,因为万物皆对象function deepcopy(newObj, oldObj) {for (const k in oldObj) {// 判断值是否属于array类if (oldObj[k] i…

01-jupyter notebook的使用方法

一、Tab补全 在shell中输入表达式&#xff0c;按下Tab&#xff0c;会搜索已输入变量&#xff08;对象、函数等等&#xff09;的命名空间&#xff1a; 除了补全命名、对象和模块属性&#xff0c;Tab还可以补全其它的。当输入看似文件路径时 &#xff08;即使是Python字符串&…

云计算服务体系-架构真题(十四)

云计算服务体系结构SaaS、PaaS、IaaS相对应分别&#xff08;&#xff09;。 答案。应用层、平台层、基础设施层 (2022)给定关系模式R(U,F)&#xff0c;其中U为属性集&#xff0c;F是U的一组函数依赖&#xff0c;那么函数依赖的公理系统(Armstrong)中分解规则是指&#xff08;&…

JavaSE学习——异常

目录 一、异常概述 二、异常的体系结果 二、异常的处理&#xff1a;抓抛模型 三、try-catch-finally的使用 四、throws 异常类型 的使用 五、开发中如何选择使用try-catch-finally还是使用throws&#xff1f; 六、自定义异常 自定义异常步骤&#xff1a; 七、总结&a…

VR/AR/眼镜投屏充电方案(LDR6020)

VR眼镜即VR头显&#xff0c;也称虚拟现实头戴式显示设备&#xff0c;随着元宇宙概念的传播&#xff0c;VR眼镜的热度一直只增不减&#xff0c;但是头戴设备的续航一直被人诟病&#xff0c;如果增大电池就会让头显变得笨重影响体验&#xff0c;所以目前最佳的解决方案还是使用VR…

实时同步ES技术选型:Mysql+Canal+Adapter+ES+Kibana

基于之前的文章&#xff0c;精简操作而来 让ELK在同一个docker网络下通过名字直接访问Ubuntu服务器ELK部署与实践使用 Docker 部署 canal 服务实现MySQL和ES实时同步Docker部署ES服务&#xff0c;canal全量同步的时候内存爆炸&#xff0c;ES/Canal Adapter自动关闭&#xff0c…

设计模式--工厂模式(Factory Pattern)

一、 什么是工厂模式 工厂模式&#xff08;Factory Pattern&#xff09;是一种创建型设计模式&#xff0c;它提供了一种创建对象的接口&#xff0c;但是将对象的实例化过程推迟到子类中。工厂模式允许通过调用一个共同的接口方法来创建不同类型的对象&#xff0c;而无需暴露对…

wireshark 流量抓包例题重现

[TOC](这里写目录标题 wireshark抓包方法wireshark组成 wireshark例题 wireshark抓包方法 wireshark组成 wireshark的抓包组成为&#xff1a;分组列表、分组详情以及分组字节流。 上面这一栏想要显示&#xff0c;使用&#xff1a;CtrlF 我们先看一下最上侧的搜索栏可以使用的…

LAMP架构详解+构建LAMP平台之Discuz论坛

LAMP架构详解构建LAMP平台之Discuz论坛 1、LAPM架构简介1.1动态资源与语言1.2LAPM架构得组成1.3LAPM架构说明1.4CGI和astcgi1.4.1CGI1.4.2fastcgi1.4.3CGI和fastcgi比较 2、搭建LAMP平台2.1编译安装apache httpd2.2编译安装mysql2.3编译安装php2.4安装论坛 1、LAPM架构简介 1.…

Mysql--技术文档--基本概念--《世界上最流行的关系型数据库之一》

官方网址 MySQL 阿丹&#xff1a; 作为关系型数据库管理的老大哥&#xff0c;一个合格的程序员多多少少一定要了解mysql库。 官方解释 MySQL是一个关系型数据库管理系统&#xff0c;由瑞典MySQL AB 公司开发&#xff0c;属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管…

SpringCache

SpringCache是Spring提供的一个缓存框架&#xff0c;在Spring3.1版本开始支持将缓存添加到现有的spring应用程序中&#xff0c;在4.1开始&#xff0c;缓存已支持JSR-107注释和更多自定义的选项。 Spring Cache利用了AOP&#xff0c;实现了基于注解的缓存功能&#xff0c;并且进…

智慧能源助力绿色发展

居民生活是碳排放的重要贡献源&#xff0c;作为居民生活的主要场所&#xff0c;社区是低碳城市建设的重要空间载体。推动低碳社区建设&#xff0c;逐渐打造低碳生活方式&#xff0c;是低碳社会建设的重要内容之一。智慧新能源公共设施助力碳中和&#xff0c;用于各社区改造&…

CSDN编程题-每日一练(2023-08-27)

CSDN编程题-每日一练&#xff08;2023-08-27&#xff09; 一、题目名称&#xff1a;异或和二、题目名称&#xff1a;生命进化书三、题目名称&#xff1a;熊孩子拜访 一、题目名称&#xff1a;异或和 时间限制&#xff1a;1000ms内存限制&#xff1a;256M 题目描述&#xff1a; …

【广州华锐互动】VR沉浸式体验红军长征路:追寻红色记忆,传承红色精神

在历史的长河中&#xff0c;长征无疑是一段充满艰辛和英勇的伟大征程。为了让更多的人了解这段历史&#xff0c;我们利用虚拟现实&#xff08;VR&#xff09;技术&#xff0c;为您带来一场沉浸式的体验&#xff0c;重温红军万里长征的壮丽篇章。 一、踏上长征之路 戴上VR眼镜&a…

【沐风老师】如何在3dMax中将3D物体转化为样条线构成的对象?

在3dMax中如何把三维物体转化为由样条线构成的对象&#xff1f;通常这样的场景会出现在科研绘图或一些艺术创作当中&#xff0c;下面给大家详细讲解一种3dmax三维物体转样条线的方法。 第一部分&#xff1a;用粒子填充3D对象&#xff1a; 1.创建一个三维对象&#xff08;本例…