brpc之io事件分发器

结构

EventDispatcher
+int Start(const bthread_attr_t* consumer_thread_attr)
+int AddConsumer(SocketId socket_id, int fd)
+int RegisterEvent(SocketId socket_id, int fd, bool pollin)
+int UnregisterEvent(SocketId socket_id, int fd, bool pollin)
-static void* RunThis(void* arg)
-void Run()
-int RemoveConsumer(int fd)

初始化

brpc的io事件分发器,使用多线程Reactor模式
通过InitializeGlobalDispatchers来初始化全局io事件分发器
分为task_group_ntags组,每组有event_dispatcher_num

void InitializeGlobalDispatchers() {g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num];for (int i = 0; i < FLAGS_task_group_ntags; ++i) {for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {bthread_attr_t attr =FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));}}// This atexit is will be run before g_task_control.stop() because above// Start() initializes g_task_control by creating bthread (to run epoll/kqueue).CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
}

分发策略

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {pthread_once(&g_edisp_once, InitializeGlobalDispatchers);if (FLAGS_task_group_ntags == 1 && FLAGS_event_dispatcher_num == 1) {return g_edisp[0];}int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}

读事件

对于acceptor,读事件处理函数为OnNewConnections

options.on_edge_triggered_events = OnNewConnections;

连接后新socket的读事件处理函数为OnNewDataFromTcp或者OnNewMessages

#if BRPC_WITH_RDMAif (am->_use_rdma) {options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;} else {
#else{
#endifoptions.on_edge_triggered_events = InputMessenger::OnNewMessages;}

写事件

对于非阻塞connect时,会调用事件分发器的RegisterEvent注册EPOLLOUT

int Socket::Connect(const timespec* abstime,int (*on_connect)(int, int, void*), void* data) {if (_ssl_ctx) {_ssl_state = SSL_CONNECTING;} else {_ssl_state = SSL_OFF;}struct sockaddr_storage serv_addr;socklen_t addr_size = 0;if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) {PLOG(ERROR) << "Fail to get sockaddr";return -1;}butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));if (sockfd < 0) {PLOG(ERROR) << "Fail to create socket";return -1;}CHECK_EQ(0, butil::make_close_on_exec(sockfd));// We need to do async connect (to manage the timeout by ourselves).CHECK_EQ(0, butil::make_non_blocking(sockfd));const int rc = ::connect(sockfd, (struct sockaddr*)&serv_addr, addr_size);if (rc != 0 && errno != EINPROGRESS) {PLOG(WARNING) << "Fail to connect to " << remote_side();return -1;}if (on_connect) {EpollOutRequest* req = new(std::nothrow) EpollOutRequest;if (req == NULL) {LOG(FATAL) << "Fail to new EpollOutRequest";return -1;}req->fd = sockfd;req->timer_id = 0;req->on_epollout_event = on_connect;req->data = data;// A temporary Socket to hold `EpollOutRequest', which will// be added into epoll device soonSocketId connect_id;SocketOptions options;options.bthread_tag = _bthread_tag;options.user = req;if (Socket::Create(options, &connect_id) != 0) {LOG(FATAL) << "Fail to create Socket";delete req;return -1;}// From now on, ownership of `req' has been transferred to// `connect_id'. We hold an additional reference here to// ensure `req' to be valid in this scopeSocketUniquePtr s;CHECK_EQ(0, Socket::Address(connect_id, &s));// Add `sockfd' into epoll so that `HandleEpollOutRequest' will// be called with `req' when epoll event reachesif (GetGlobalEventDispatcher(sockfd, _bthread_tag).RegisterEvent(connect_id, sockfd, false) !=0) {const int saved_errno = errno;PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",(int)sockfd, berror(saved_errno));return -1;}// Register a timer for EpollOutRequest. Note that the timeout// callback has no race with the one above as both of them try// to `SetFailed' `connect_id' while only one of them can succeed// It also work when `HandleEpollOutRequest' has already been// called before adding the timer since it will be removed// inside destructor of `EpollOutRequest' after leaving this scopeif (abstime) {int rc = bthread_timer_add(&req->timer_id, *abstime,HandleEpollOutTimeout,(void*)connect_id);if (rc) {LOG(ERROR) << "Fail to add timer: " << berror(rc);s->SetFailed(rc, "Fail to add timer: %s", berror(rc));return -1;}}} else {if (WaitEpollOut(sockfd, false, abstime) != 0) {PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;return -1;}if (CheckConnected(sockfd) != 0) {return -1;}}return sockfd.release();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/428474.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习算法与实践_03概率论与贝叶斯算法笔记

1、概率论基础知识介绍 人工智能项目本质上是一个统计学项目&#xff0c;是通过对 样本 的分析&#xff0c;来评估/估计 总体 的情况&#xff0c;与数学知识相关联 高等数学 ——> 模型优化 概率论与数理统计 ——> 建模思想 线性代数 ——> 高性能计算 在机器学…

2024年最新版Vue3学习笔记

本篇文章是记录来自尚硅谷禹神2023年课程的学习笔记&#xff0c;不得不说禹神讲的是真的超级棒&#xff01; 文章目录 创建Vue3工程main.ts文件解析初始化项目写一个简单的效果 Vue3核心语法setup函数setup和选项式的区别setup语法糖指定组件名称 响应式数据ref函数定义基本类…

OpenSSH从7.4升级到9.8的过程 亲测--图文详解

一、下载软件 下载openssh 下载地址&#xff1a; Downloads | Library 下载openssl Index of /pub/OpenBSD/OpenSSH/ zlib Home Site 安装的 openssl-3.3.1.tar.gz ,安装3.3.2有问题 安装有问题&#xff0c; 二、安装依赖 yum install -y perl-CPAN perl-ExtUtils-CB…

信息安全工程师(8)网络新安全目标与功能

前言 网络新安全目标与功能在当前的互联网环境中显得尤为重要&#xff0c;它们不仅反映了网络安全领域的最新发展趋势&#xff0c;也体现了对网络信息系统保护的不断加强。 一、网络新安全目标 全面防护与动态应对&#xff1a; 目标&#xff1a;建立多层次、全方位的网络安全防…

搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(二)-索引

场景 首先介绍测试的场景&#xff0c;本文schema定义 pdm文档索引&#xff0c;包括nested&#xff0c;扩展字段&#xff0c;文档属性扩展&#xff0c;其中_content字段是组件保留字段&#xff0c;支持文本内容 索引 索引服务索引的操作&#xff0c;包括构建&#xff0c;put …

人工智能——猴子摘香蕉问题

一、实验目的 求解猴子摘香蕉问题&#xff0c;根据猴子不同的位置&#xff0c;求解猴子的移动范围&#xff0c;求解对应的过程&#xff0c;针对不同的目标状态进行求解。 二、实验内容 根据场景有猴子、箱子、香蕉&#xff0c;香蕉挂天花板上。定义多种谓词描述位置、状态等…

【Python语言初识(二)】

一、分支结构 1.1、if语句 在Python中&#xff0c;要构造分支结构可以使用if、elif和else关键字。所谓关键字就是有特殊含义的单词&#xff0c;像if和else就是专门用于构造分支结构的关键字&#xff0c;很显然你不能够使用它作为变量名&#xff08;事实上&#xff0c;用作其他…

Python编码系列—Python适配器模式:无缝集成的桥梁

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

UGit:腾讯自研的Git客户端新宠

UGit 是一款专门针对腾讯内部研发环境特点量身定制的 Git 客户端&#xff0c;其目标在于大幅提升开发效率以及确保团队协作的高度流畅性。UGit 能够良好地支持 macOS 10.11 及以上版本、Apple Silicon 以及 Win64 位系统。 可以下载体验一把。 https://ugit.qq.com/zh/index.…

稀土抗菌剂:厨房用品中的安全卫士

稀土抗菌剂的抗菌机制是基于稀土的光催化半导体特性&#xff0c;通过光生氧自由基ROS机理杀灭细菌&#xff1b;稀土化合物与细菌表面静电结合&#xff0c;造成直接的杀灭&#xff1b;稀土化合物破坏细胞膜通透性&#xff0c;造成破损导致细胞质流出杀灭细菌;稀土离子跨膜后与细…

【Text2SQL】PET-SQL:在Spider基准测试中取得了SOTA

解读&#xff1a;PET-SQL: A Prompt-enhanced Two-stage Text-to-SQL Framework with Cross-consistency 这篇论文介绍了一个名为 PET-SQL 的文本到 SQL&#xff08;Text-to-SQL&#xff09;框架&#xff0c;旨在通过增强提示&#xff08;prompt&#xff09;和利用不同大型语言…

数据结构--双链表

目录 一、引言 二 、链表的分类 1.单向或双向 2.带头或不带头 3.循环或不循环 三、双链表的概念与基本结构 1.概念 2.基本结构 三、双链表的常见操作 1.创建节点 2.初始化 3.头插 4.尾插 5.头删 6.尾删 7.打印 8.查找 9.插入节点 10.删除节点 11.销毁链…

OpenAi assistant run always fails when called from PHP

题意&#xff1a;从 PHP 调用时&#xff0c;OpenAI 助理运行总是失败。 问题背景&#xff1a; The runs I create with the openai-php library fail direct in 100% of cases. What am I doing wrong? I do not have much experience with php but this is the test script.…

Codeforces Round 973 (Div. 2) - D题

传送门&#xff1a;Problem - D - Codeforces 题目大意&#xff1a; 思路&#xff1a; 尽量要 最大值变小&#xff0c;最小值变大 即求 最大值的最小 和 最小值的最大 -> 二分答案 AC代码&#xff1a; 代码有注释 #include<bits/stdc.h> using namespace std; #…

neo4j(spring) 使用示例

文章目录 前言一、neo4j是什么二、开始编码1. yml 配置2. crud 测试3. node relation 与java中对象的关系4. 编码测试 总结 前言 图数据库先驱者 neo4j&#xff1a;neo4j官网地址 可以选择桌面版安装等多种方式,我这里采用的是docker安装 直接执行docker安装命令: docker run…

Git之如何删除Untracked文件(六十八)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

基于Springboot的助学金管理系统设计与实现

文未可获取一份本项目的java源码和数据库参考。 一、研究背景 利用计算机来实现助学金管理系统&#xff0c;已经成为一种趋势&#xff0c;相比传统的手工管理方式&#xff0c;利用软件进行助学金管理系统&#xff0c;有着执行快&#xff0c;可行性高、容量存储大&#xff0c;…

【C#】内存的使用和释放

在 C# 中&#xff0c;内存管理主要是由 .NET 的垃圾回收器&#xff08;Garbage Collector, GC&#xff09;自动处理的。然而&#xff0c;了解如何正确地使用和释放内存对于编写高效且可靠的代码非常重要。以下是一些关键点和最佳实践&#xff1a; 1. 内存分配 托管资源&#x…

CSS——弹性盒子布局(display: flex)

CSS——弹性盒子布局&#xff08;display: flex&#xff09; 我们经常听说一种布局&#xff1a;Flexbox或者是弹性布局&#xff0c;它的全称叫做弹性盒子布局&#xff08;Flexible Box Layout&#xff09;&#xff0c;那么它到底该如何实现呢&#xff1f;从我们熟悉的 display…

大模型训练实战经验总结

在当今AI技术飞速发展的背景下&#xff0c;定制化大模型的自主训练已成为满足特定行业需求、保障数据安全、提升模型应用效能的关键途径。本文将深度剖析这一过程的核心价值与实践智慧&#xff0c;从数据隐私保护、模型透明度增强&#xff0c;到数据预处理的精细操作&#xff0…