06 | Swoole 源码分析之 Coroutine 协程模块

首发原文链接:Swoole 源码分析之 Coroutine 协程模块
大家好,我是码农先森。

引言

协程又称轻量级线程,但与线程不同的是;协程是用户级线程,不需要操作系统参与。由用户显式控制,可以在需要的时候挂起、或恢复执行。

通过协程程序可以在执行的过程中保存当前的状态,并在恢复后从该状态处继续执行,整体上来说创建、销毁、切换的成本低。

但在 Swoole 中的协程是无法利用多核 CPU 的,如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。

协程的出现为 Swoole 程序提升并发效率、及系统的处理能力,注入了强劲的动力;可以说是 Swoole 作为高性能通信框架的的核心模块。

源码拆解

这次我们以下面这段代码,来作为本次拆解源码的切入点。

// 协程容器
Swoole\Coroutine\run(function () {// Socket 协程客户端$socket = new Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, 0);// 建立连接,在建立连接的过程中会发生协程切换$retval = $socket->connect('127.0.0.1', 9601);if ($retval) {// 发送数据,在发送数据的过程中会发生协程切换$n = $socket->send('hello');var_dump($n);// 解释数据,在接收数据的过程中会发生协程切换$data = $socket->recv();var_dump($data);// 关闭连接$socket->close();}
});

这段代码主要是使用 Socket 的协程客户端与本地的 9601 端口建立连接,并且发送、接收数据。在分析源码之前,我对这次的源码做了一个图解梳理,把整个调用链路上的函数串联了起来。我们可以先对整体有个大致的了解,便于后面分析源代码。

Socket 协程客户端

Socket 协程客户端是专门用于 Swoole 在协程环境中使用的,可以实现在 IO 调用时切换协程,让出 CPU 的使用权。例如:在连接建立、发送数据、接收数据 等阶段会进行协程的切换。

这个函数主要是发起 Socket 连接的建立,并且在 wait_event 这个函数内部实现了协程的切换。

// swoole-src/src/coroutine/socket.cc:595
bool Socket::connect(const struct sockaddr *addr, socklen_t addrlen) {if (sw_unlikely(!is_available(SW_EVENT_RDWR))) {return false;}int retval;do {// 发起连接建立retval = ::connect(sock_fd, addr, addrlen);} while (retval < 0 && errno == EINTR);if (retval < 0) {if (errno != EINPROGRESS) {set_err(errno);return false;} else {TimerController timer(&write_timer, connect_timeout, this, timer_callback);// wait_event 这个函数内部实现了协程的切换if (!timer.start() || !wait_event(SW_EVENT_WRITE)) {if (is_closed()) {set_err(ECONNABORTED);}return false;} else {if (socket->get_option(SOL_SOCKET, SO_ERROR, &errCode) < 0 || errCode != 0) {set_err(errCode);return false;}}}}connected = true;set_err(0);return true;
}

再看看 wait_event 函数的内部实现,先是获取到当前的协程,然后根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中,最后将当前的协程切换出去,让出其 CPU 的控制权。

// swoole-src/src/coroutine/socket.cc:147
bool Socket::wait_event(const EventType event, const void **__buf, size_t __n) {EventType added_event = event;// 获取到当前的协程Coroutine *co = Coroutine::get_current_safe();if (!co) {return false;}if (sw_unlikely(socket->close_wait)) {set_err(SW_ERROR_CO_SOCKET_CLOSE_WAIT);return false;}// clear the last errCodeset_err(0);
#ifdef SW_USE_OPENSSL// 根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中if (sw_unlikely(socket->ssl && ((event == SW_EVENT_READ && socket->ssl_want_write) ||(event == SW_EVENT_WRITE && socket->ssl_want_read)))) {if (sw_likely(socket->ssl_want_write && add_event(SW_EVENT_WRITE))) {want_event = SW_EVENT_WRITE;} else if (socket->ssl_want_read && add_event(SW_EVENT_READ)) {want_event = SW_EVENT_READ;} else {return false;}added_event = want_event;} else
#endifif (sw_unlikely(!add_event(event))) {return false;}swoole_trace_log(SW_TRACE_SOCKET,"socket#%d blongs to cid#%ld is waiting for %s event",sock_fd,co->get_cid(),get_wait_event_name(this, event));Coroutine::CancelFunc cancel_fn = [this, event](Coroutine *co) { return cancel(event); };// 将当前的协程切换出去,让出其 CPU 的控制权if (sw_likely(event == SW_EVENT_READ)) {read_co = co;read_co->yield(&cancel_fn);read_co = nullptr;} else if (event == SW_EVENT_WRITE) {if (sw_unlikely(!zero_copy && __n > 0 && *__buf != get_write_buffer()->str)) {write_buffer->clear();if (write_buffer->append((const char *) *__buf, __n) != SW_OK) {set_err(ENOMEM);goto _failed;}*__buf = write_buffer->str;}write_co = co;write_co->yield(&cancel_fn);write_co = nullptr;} else {assert(0);return false;}
_failed:
#ifdef SW_USE_OPENSSL// maybe read_co and write_co are all waiting for the same event when we use SSLif (sw_likely(want_event == SW_EVENT_NULL || !has_bound()))
#endif{Reactor *reactor = SwooleTG.reactor;if (sw_likely(added_event == SW_EVENT_READ)) {reactor->remove_read_event(socket);} else {reactor->remove_write_event(socket);}}
#ifdef SW_USE_OPENSSLwant_event = SW_EVENT_NULL;
#endifswoole_trace_log(SW_TRACE_SOCKET,"socket#%d blongs to cid#%ld trigger %s event",sock_fd,co->get_cid(),get_trigger_event_name(this, added_event));return !is_closed() && !errCode;
}

同理 send()recv() 函数,也和 connect() 函数是一样的实现方式。

// swoole-src/src/coroutine/socket.cc:847
ssize_t Socket::send(const void *__buf, size_t __n) {if (sw_unlikely(!is_available(SW_EVENT_WRITE))) {return -1;}ssize_t retval;TimerController timer(&write_timer, write_timeout, this, timer_callback);do {// 发送数据retval = socket->send(__buf, __n, 0);} while (retval < 0 && socket->catch_write_error(errno) == SW_WAIT && timer.start() &&wait_event(SW_EVENT_WRITE, &__buf, __n));check_return_value(retval);return retval;
}// swoole-src/src/coroutine/socket.cc:874
ssize_t Socket::recv(void *__buf, size_t __n) {if (sw_unlikely(!is_available(SW_EVENT_READ))) {return -1;}ssize_t retval;TimerController timer(&read_timer, read_timeout, this, timer_callback);do {// 接收数据retval = socket->recv(__buf, __n, 0);} while (retval < 0 && socket->catch_read_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ));check_return_value(retval);return retval;
}

也是调用 wait_event() 函数来实现当前的协程切换,唯一的区别就是事件的类型不同,一个是读事件,一个是写事件。

Run 协程容器

在 Swoole 中要想使用协程,那么必须要在协程的环境中使用协程的客户端,或者支持 Hook 的原生 PHP 函数。才能享受到 Swoole 中协程带来的高性能,不然和普通的 PHP 执行程序没有什么区别,变成了同步阻塞。

在源码中协程容器主要是实现了事件循环的初始化、协程上下文的创建管理、事件循环的 IO 事件监听,接下来我们会主要分析关于事件管理的部分内容。

// swoole-src/src/coroutine/base.cc:210
namespace coroutine {bool run(const CoroutineFunc &fn, void *arg) {// 事件循环的初始化if (swoole_event_init(SW_EVENTLOOP_WAIT_EXIT) < 0) {return false;}// 协程上下文的创建管理Coroutine::activate();long cid = Coroutine::create(fn, arg);// 事件循环的 IO 事件监听swoole_event_wait();Coroutine::deactivate();return cid > 0;}
}

Event 事件初始化

Event 事件初始化主要是定义一些事件的回调函数,便于在事件被触发时恢复对应的协程进行后续的逻辑处理,例如:读事件回调函数 readable_event_callback、写事件回调函数 writable_event_callback 等。

// swoole-src/src/wrapper/event.cc:37
int swoole_event_init(int flags) {if (!SwooleG.init) {std::unique_lock<std::mutex> lock(init_lock);swoole_init();}// 创建一个 Reactor 实例对象Reactor *reactor = new Reactor(SW_REACTOR_MAXEVENTS);if (!reactor->ready()) {return SW_ERR;}if (flags & SW_EVENTLOOP_WAIT_EXIT) {reactor->wait_exit = 1;}// Socket 事件初始化coroutine::Socket::init_reactor(reactor);coroutine::System::init_reactor(reactor);network::Client::init_reactor(reactor);SwooleTG.reactor = reactor;return SW_OK;
}
// swoole-src/include/swoole_coroutine_sokcet.h:157
static inline void init_reactor(Reactor *reactor) {// 定义对应事件的回调函数reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_READ, readable_event_callback);reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_WRITE, writable_event_callback);reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_ERROR, error_event_callback);
}
// swoole-src/src/coroutine/socket.c:48
int Socket::readable_event_callback(Reactor *reactor, Event *event) {Socket *socket = (Socket *) event->socket->object;socket->set_err(0);
#ifdef SW_USE_OPENSSLif (sw_unlikely(socket->want_event != SW_EVENT_NULL)) {if (socket->want_event == SW_EVENT_READ) {// 恢复对应的协程socket->write_co->resume();}} else
#endif{if (socket->recv_barrier && (*socket->recv_barrier)() && !event->socket->event_hup) {return SW_OK;}// 恢复对应的协程socket->read_co->resume();}return SW_OK;
}

Event 事件监听

Event 事件监听主要是针对被加入到事件循环中的 Socket 进行 IO 事件的监听,如果有读或写 IO 事件的触发,则回调到对应的处理函数上进行执行。

// swoole-src/src/warpper/event.cc:84
int swoole_event_wait() {Reactor *reactor = SwooleTG.reactor;int retval = 0;if (!reactor->wait_exit or !reactor->if_exit()) {// 事件循环等待调用retval = reactor->wait(nullptr);}swoole_event_free();return retval;
}
// swoole-src/src/reactor/epoll.cc:153
int ReactorEpoll::wait(struct timeval *timeo) {Event event;ReactorHandler handler;int i, n, ret;int reactor_id = reactor_->id;int max_event_num = reactor_->max_event_num;if (reactor_->timeout_msec == 0) {if (timeo == nullptr) {reactor_->timeout_msec = -1;} else {reactor_->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;}}reactor_->before_wait();while (reactor_->running) {if (reactor_->onBegin != nullptr) {reactor_->onBegin(reactor_);}// 监听 IO 事件n = epoll_wait(epfd_, events_, max_event_num, reactor_->get_timeout_msec());if (n < 0) {if (!reactor_->catch_error()) {swoole_sys_warning("[Reactor#%d] epoll_wait failed", reactor_id);return SW_ERR;} else {goto _continue;}} else if (n == 0) {reactor_->execute_end_callbacks(true);SW_REACTOR_CONTINUE;}for (i = 0; i < n; i++) {event.reactor_id = reactor_id;event.socket = (Socket *) events_[i].data.ptr;event.type = event.socket->fd_type;event.fd = event.socket->fd;if (events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {event.socket->event_hup = 1;}// read 读事件,这里的 handler 对应 readable_event_callbackif ((events_[i].events & EPOLLIN) && !event.socket->removed) {handler = reactor_->get_handler(SW_EVENT_READ, event.type);ret = handler(reactor_, &event);if (ret < 0) {swoole_sys_warning("EPOLLIN handle failed. fd=%d", event.fd);}}// write 写事件,这里的 handler 对应 writable_event_callbackif ((events_[i].events & EPOLLOUT) && !event.socket->removed) {handler = reactor_->get_handler(SW_EVENT_WRITE, event.type);ret = handler(reactor_, &event);if (ret < 0) {swoole_sys_warning("EPOLLOUT handle failed. fd=%d", event.fd);}}// error 错误处理,这里的 handler 对应 error_event_callbackif ((events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed) {// ignore ERR and HUP, because event is already processed at IN and OUT handler.if ((events_[i].events & EPOLLIN) || (events_[i].events & EPOLLOUT)) {continue;}handler = reactor_->get_error_handler(event.type);ret = handler(reactor_, &event);if (ret < 0) {swoole_sys_warning("EPOLLERR handle failed. fd=%d", event.fd);}}if (!event.socket->removed && (event.socket->events & SW_EVENT_ONCE)) {reactor_->_del(event.socket);}}_continue:reactor_->execute_end_callbacks(false);SW_REACTOR_CONTINUE;}return 0;
}

总结

  • 协程又称轻量级线程,协程是用户级线程;不需要操作系统参与,创建切换成本低。
  • Swoole 中的协程是无法利用多核 CPU 的,如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。
  • Swoole 中协程的是利用的 Event 事件循环进行调度的,将遇到 IO 操作的 Socket 统一加入到事件循环中。
  • 本次的源码分析旨在了解整个协程在 Swoole 中的运行逻辑,打开我们的思路,便于我们更好的体会到协程所带来的高性能价值。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/294991.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分享一种快速移植OpenHarmony Linux内核的方法

移植概述 本文面向希望将 OpenHarmony 移植到三方芯片平台硬件的开发者&#xff0c;介绍一种借助三方芯片平台自带 Linux 内核的现有能力&#xff0c;快速移植 OpenHarmony 到三方芯片平台的方法。 移植到三方芯片平台的整体思路 内核态层和用户态层 为了更好的解释整个内核…

曲线降采样之道格拉斯-普克算法Douglas–Peucker

曲线降采样之道格拉斯-普克算法Douglas–Peucker 该算法的目的是&#xff0c;给定一条由线段构成的曲线&#xff0c;找到一条点数较少的相似曲线&#xff0c;来近似描述原始的曲线&#xff0c;达到降低时间、空间复杂度和平滑曲线的目的。 附赠自动驾驶学习资料和量产经验&…

通过提交容器的方式修改ubuntu镜像的apt源

通过提交容器的方式修改ubuntu镜像的apt源 步骤总结 问题&#xff0c;每次创建容器之后&#xff0c;都要在容器内手动更改镜像源。 不如&#xff0c;干脆修改镜像的apt源&#xff0c;一次到位。 步骤 先创建一个容器&#xff0c;到容器内执行变更命令。 D:/sandbox> dock…

【Vue】vue3简介与环境配置

文章目录 项目编码规范什么是 Vue&#xff1f;安装node环境nvm针对node版本惊醒管理的工具 项目编码规范 组合式API Typescript setup(语法糖) 什么是 Vue&#xff1f; Vue 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建&#xff0c;…

【SQL】1633. 各赛事的用户注册率(COUNT函数 表达式用法)

题目描述 leetcode题目&#xff1a;1633. 各赛事的用户注册率 Code select contest_id, round(count(*)/(select count(*) from Users)*100, 2) as percentage from Register group by contest_id order by percentage desc, contest_id ascCOUNT()函数 COUNT函数用法&#…

docker容器之etcd安装

一、etcd介绍 1、etcd是什么 etcd是CoreOS团队于2013年6月发起的开源项目&#xff0c;它的目标是构建一个高可用的分布式键值(key-value)数据库。 2、etcd特点 简单的接口&#xff0c;通过标准的HTTP API进行调用&#xff0c;也可以使用官方提供的 etcdctl 操作存储的数据。…

1999-2022年上市公司员工人数数据

1999-2022年上市公司员工人数数据 1、时间&#xff1a;1999-2022年 2、指标&#xff1a;证券代码、时间、员工人数 3、来源&#xff1a;整理自csmar 4、范围&#xff1a;上市公司 5、指标解释&#xff1a; 上市公司员工人数是衡量公司规模和发展状的重要指标。该数据直接…

编程新手必看,python中的转义字符及注释!(4)

1、常见的转义字符 Python中的转义字符用于在字符串中表示一些特殊的字符&#xff0c;这些字符通常无法直接输入或具有特殊的意义。以下是一些常见的转义字符及其含义&#xff1a; 在Python中&#xff0c;可以使用转义字符来表示一些特殊的字符。以下是使用转义字符的几种常见…

VuePress基于 Vite 和 Vue 构建优秀框架

VitePress 是一个静态站点生成器 (SSG)&#xff0c;专为构建快速、以内容为中心的站点而设计。简而言之&#xff0c;VitePress 获取用 Markdown 编写的内容&#xff0c;对其应用主题&#xff0c;并生成可以轻松部署到任何地方的静态 HTML 页面。 VitePress 附带一个用于技术文档…

Java复习第十六天学习笔记(JSP、Servlet),附有道云笔记链接

【有道云笔记】十六 4.2 JSP、Servlet https://note.youdao.com/s/QccA5g1G 一、软件的结构 C/S (Client - Server 客户端-服务器端) 典型应用&#xff1a;QQ软件 &#xff0c;飞秋&#xff0c;印象笔记。 特点&#xff1a; 必须下载特定的客户端程序。服务器端升级&#…

保护Android应用安全:全面探究代码混淆在加固中的作用

Android APP 加固是优化 APK 安全性的一种方法&#xff0c;常见的加固方式有混淆代码、加壳、数据加密、动态加载等。下面介绍一下 Android APP 加固的具体实现方式。 混淆代码 使用 ipaguard工具可以对代码进行混淆&#xff0c;使得反编译出来的代码很难阅读和理解&#xff…

CSS面试题---基础

1、css选择器及优先级 选择器优先级&#xff1a;内联样式>id选择器>类选择器、属性选择器、伪类选择器>标签选择器、微元素选择器 注意&#xff1a; !important优先级最高&#xff1b; 如果优先级相同&#xff0c;则最后出现的样式生效&#xff1b; 继承得到的样式优先…

腾讯云2024年4月优惠券及最新活动入口

腾讯云是腾讯集团倾力打造的云计算品牌&#xff0c;提供全球领先的云计算、大数据、人工智能等技术产品与服务。为了吸引用户上云&#xff0c;腾讯云经常推出各种优惠活动。本文将为大家分享腾讯云优惠券及最新活动入口&#xff0c;助力大家轻松上云&#xff01; 一、优惠券领取…

大模型prompt技巧——思维链(Chain-of-Thought)

1、Zero-shot、One-shot、Few-shot 与fintune prompt的时候给出例子答案&#xff0c;然后再让模型回答。 2、zero-shot-CoT “Let’s think step by step”有奇迹效果 3、多数投票提高CoT性能——自洽性&#xff08;Self-consistency&#xff09; 多个思维链&#xff0c;然后取…

浪潮分布式存储AS13000G6-M36改扩配后管理界面不能识别和标记硬盘的处理方法

AS13000G6 改配出问题的场景 浪潮分布式存储AS13000G6-M36渠道备货的分布式存储通常是流量机型&#xff0c;实际出货可能会涉及改配 集群部署完以后建议在系统视图下查看一下盘是否能识别 这个是正常的情况&#xff0c;可以正确管理到盘,硬盘侧边有绿色的指示灯。 如图是管理…

如果符合这7点,说明你经历过职场PUA。

今天聊聊在职场中比较普遍&#xff0c;但又容易被忽视的问题——职场PUA。 工作是为了更好的生活&#xff0c;但有时候可能会发现&#xff0c;这份工作怎么越做越不对劲&#xff0c;感觉像是偏航了。 简单来说&#xff0c;职场PUA就是一种精神控制&#xff0c;常常以批评和否…

java的警示之有危险的行为

&#x1f468;‍&#x1f4bb;作者简介&#xff1a;&#x1f468;&#x1f3fb;‍&#x1f393;告别&#xff0c;今天 &#x1f4d4;高质量专栏 &#xff1a;☕java趣味之旅 欢迎&#x1f64f;点赞&#x1f5e3;️评论&#x1f4e5;收藏&#x1f493;关注 &#x1f496;衷心的希…

MATLAB科研绘图与学术图表绘制从入门到精通

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【工具大全】&#x1f91f; 一站式轻松构建小程序、Web网站、移动应用&#xff1a;&#x1f449;注册地址&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交…

vue-ueditor-wrap上传图片报错:后端配置项没有正常加载,上传插件不能正常使用

如图所示,今天接收一个项目其中富文本编辑器报错 此项目为vue2项目,富文本编辑器为直接下载好的资源存放在public目录下的 经过排查发现报错的函数在ueditor.all.min.js文件内,但是ueditor.all.min.js文件夹是经过压缩的 所以直接,将index.html中的引用路径修改为ueditor…

C++算法——滑动窗口

一、长度最小的子数组 1.链接 209. 长度最小的子数组 - 力扣&#xff08;LeetCode&#xff09; 2.描述 3.思路 本题从暴力求解的方式去切入&#xff0c;逐步优化成“滑动窗口”&#xff0c;首先&#xff0c;暴力枚举出各种组合的话&#xff0c;我们先让一个指针指向第一个&…