redis中的io多线程(线程池)

文章目录

    • redis多线程模型
      • redis为什么引入I/O多线程
      • I/O多线程模型
    • 源码解析
      • 测试设置
      • 连接建立
      • 数据传输
      • 线程调度
        • 开启io线程`startThreadedIO`
        • 关闭io线程`stopThreadedIO`

redis多线程模型

redis为什么引入I/O多线程

Redis 的性能瓶颈在网络 IO 的处理上。Redis 是网络 IO 密集型,需要同时处理多条并发请求,读写 IO 的问题(请求大量数据,写日志业务等)。多线程处理网络 IO,单线程执行命令。

Redis 线程池作用读写 IO 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程执行命令,是因为 Redis 采用高效的数据结构,其业务逻辑处理较快。

在这里插入图片描述

I/O多线程模型

主线程拥有两个全局队列clients_pending_readclients_pending_write,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。

首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。

接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。

在这里插入图片描述

redis的单线程是指,命令执行(logic)都是在单线程中运行的
接受数据read和发送数据write都是可以在io多线程(线程池)中去运行

在Redis中,生产者也可以作为消费者,反之亦然,没有明确界限。

源码解析

测试设置

redis 线程池默认作用在 encode, send 阶段,这是因为客户端从 redis 获取大量数据需要并发处理。若想作用在 read, decode 阶段,需要手动开启。在 redis.conf 文件中,可以设置:

 # 开启io线程的数量io-threads 4# 优化:read deconde 过程。默认优化,encode send从 redis 获取大量数据io-threads-do-reads yes

开启 io 多线程的前提是有多个并发连接。如何在单个连接的情况下,开启 io 多线程调试,需要修改 redis 源码:

 // networking.cint stopThreadedIOIfNeeded(void) {// 单个连接的情况下,开启多线程调试,永远不关闭 io 多线程return 0;   ...}

连接建立

主线程处理连接建立,listenfd

  • 连接到达,触发读事件回调:acceptTcpHandler
  • 接收连接:acceptTcpHandler
  • 初始化新连接:createClient
 // server.cvoid initServer(void) {...// 1、连接到来,触发读事件回调if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR)  ...}// networking.cvoid acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {...while(max--) {// 2、接收连接:内部封装 acceptcfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);...// 为 cfd 初始化新连接,内部调用 createClientacceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}}static void acceptCommonHandler(connection *conn, int flags, char *ip) {.../* Create connection and client */// 3、创建新的连接if ((c = createClient(conn)) == NULL) {...}...}client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));/* passing NULL as conn it is possible to create a non connected client.* This is useful since all the commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */if (conn) {connNonBlock(conn);connEnableTcpNoDelay(conn);if (server.tcpkeepalive)connKeepAlive(conn,server.tcpkeepalive);// 4.接收数据的读事件触发,回调readQueryFromClient函数connSetReadHandler(conn, readQueryFromClient);connSetPrivateData(conn, c);}
}

数据传输

clientfd

  • 读事件回调:readQueryFromClient

  • 分割并处理数据包 processInputBuffer

    • 分割数据包:processInlineBuffer 和 processMultibulkBuffer
    • 处理数据包:processCommandAndResetClient
  • 数据写到 buffer:addReply

  • 数据写到 socket:writeToClient

  • 写事件回调:sendReplyToClient

当读事件触发时,执行读事件回调函数。主线程收集读事件就绪的连接放入全局任务队列``clients_pending_read,并设置连接状态为CLIENT_PENDING_READ`。子线程从该全局队列中获取任务后,也调用该读事件回调函数,进行 read 和 decode 的业务逻辑处理。

// networking.cvoid readQueryFromClient(connection *conn) {.../* Check if we want to read from the client later when exiting from* the event loop. This is the case if threaded I/O is enabled. */// 开启 io 线程后,延迟处理客户端的读,将任务丢到全局队列,再分配给 io 线程// 主线程返回 1,不执行业务逻辑处理;// 子线程返回 0,继续往下,执行业务逻辑处理if (postponeClientRead(c)) return;  // 1、read 阶段,(io 线程)将任务读到缓冲区 nread = connRead(c->conn, c->querybuf+qblen, readlen);// 2、decode 阶段,(io 线程)解析数据包processInputBuffer(c);}int postponeClientRead(client *c) {if (server.io_threads_active &&server.io_threads_do_reads &&!clientsArePaused() &&!ProcessingEventsWhileBlocked &&!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))){// 主线程,返回 1// 将连接状态设置为 CLIENT_PENDING_READc->flags |= CLIENT_PENDING_READ;// 收集任务,把客户端连接放到全局队列中,后续会分配到 io 线程listAddNodeHead(server.clients_pending_read,c);return 1;} else {// 子线程,即 io 线程,返回 0return 0;}}

子线程(IO 线程)从专属任务队列 io_threads_pending获取任务,执行 read decode 和 encode write 业务逻辑处理。

// networking.c// 线程池入口函数:子线程void *IOThreadMain(void *myid) {...while(1) {/* Wait for start */// 等待获取专属任务队列中的任务for (int j = 0; j < 1000000; j++) {if (io_threads_pending[id] != 0) break;}      .../* Process: note that the main thread will never touch our list* before we drop the pending count to 0. */listIter li;listNode *ln;// 从专属任务队列中取出任务listRewind(io_threads_list[id],&li);    while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {// encode 和 writewriteToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {// read 和 decode,读事件回调函数readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);io_threads_pending[id] = 0;...}}

子线程 decode 结束后,设置连接状态 CLIENT_PENDING_COMMAND,交给主线程来 compute,退出读事件回调函数。主线程负责 compute ,解析 redis 命令。

// networking.c// readQueryFromClient 函数中 decode 阶段调用void processInputBuffer(client *c) {/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {...if (c->reqtype == PROTO_REQ_INLINE) {// 分割数据包。并判断是否完整if (processInlineBuffer(c) != C_OK) break;...} else if (c->reqtype == PROTO_REQ_MULTIBULK) {// 分割 pipline 的数据包,并判断是否完整if (processMultibulkBuffer(c) != C_OK) break;}...else {/* If we are in the context of an I/O thread, we can't really* execute the command here. All we can do is to flag the client* as one that needs to process the command. */// io 线程设置任务状态,交给主线程compute,退出读事件回调函数if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}/* We are finally ready to execute the command. */// 3、compute,主线程解析命令if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid exiting this* loop and trimming the client buffer later. So we return* ASAP in that case. */return;}}}...}

主线程 compute 结束后,调用 addReply 函数,将处理完的连接放到全局任务队列clients_pending_write,并将待发送的数据写到缓冲区。

// networking.cint processCommandAndResetClient(client *c) {...// 处理命令if (processCommand(c) == C_OK) {commandProcessed(c);}...}// server.cint processCommand(client *c) {.../* Exec the command */// 开启 io 多线程,且不是事务命令if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){// 把数据写到缓冲区addReply(c,shared.queued);} else {// 执行 redis 命令call(c,CMD_CALL_FULL);...}...}// networking.c// 数据写到发送缓冲区void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;...}int prepareClientToWrite(client *c) {...if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))clientInstallWriteHandler(c);   // 任务写到全局队列中...}

接下来,子线程和主线程都可以从自己的专属任务队列中获得该任务,执行 encode 和 send 的业务逻辑处理 writeToClient。若数据未发送完,则注册写事件回调,等待再次发送。

// 子线程:线程池入口函数void *IOThreadMain(void *myid) {... if (io_threads_op == IO_THREADS_OP_WRITE) {// encode 和 writewriteToClient(c,0); // 数据写到 socket} else if (io_threads_op == IO_THREADS_OP_READ) {// read 和 decodereadQueryFromClient(c->conn); // 读事件回调函数...}// 主线程int handleClientsWithPendingWritesUsingThreads(void) {int processed = listLength(server.clients_pending_write);if (processed == 0) return 0; /* Return ASAP if there are no clients. *//* If I/O threads are disabled or we have few clients to serve, don't* use I/O threads, but the boring synchronous code. */if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {//判断是否有必要开启IO多线程return handleClientsWithPendingWrites();}/* Start threads if needed. */if (!server.io_threads_active) startThreadedIO();//开启io多线程/* Distribute the clients across N different lists. */listIter li;listNode *ln;listRewind(server.clients_pending_write,&li);//创建一个迭代器li,用于遍历任务队列clients_pending_writeint item_id = 0;//默认是0,先分配给主线程去做(生产者也可能是消费者),如果设置成1,则先让io线程1去做//io_threads_list[0] 主线程//io_threads_list[1] io线程//io_threads_list[2] io线程   //io_threads_list[3] io线程   //io_threads_list[4] io线程while((ln = listNext(&li))) {client *c = listNodeValue(ln);//取出一个任务c->flags &= ~CLIENT_PENDING_WRITE;/* Remove clients from the list of pending writes since* they are going to be closed ASAP. */if (c->flags & CLIENT_CLOSE_ASAP) {//表示该客户端的输出缓冲区超过了服务器允许范围,将在下一次循环进行一个关闭,也不返回任何信息给客户端,删除待读客户端listDelNode(server.clients_pending_write, ln);continue;}/* Since all replicas and replication backlog use global replication* buffer, to guarantee data accessing thread safe, we must put all* replicas client into io_threads_list[0] i.e. main thread handles* sending the output buffer of all replicas. */if (getClientType(c) == CLIENT_TYPE_SLAVE) {listAddNodeTail(io_threads_list[0],c);continue;}//负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了//这样做的好处是,避免加锁。当前是在主线程中,进行分配任务//通过取余操作,将任务均分给不同io线程int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}/* Give the start condition to the waiting threads, by setting the* start condition atomic var. */io_threads_op = IO_THREADS_OP_WRITE;for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);setIOPendingCount(j, count);//设置io线程启动条件,启动io线程}/* Also use the main thread to process a slice of clients. */// 让主线程去处理一部分任务listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);writeToClient(c,0); }listEmpty(io_threads_list[0]);/* Wait for all the other threads to end their work. */while(1) {//剩下的任务io_threads_list[1],io_threads_list[2].....给io线程去做,等待io线程完成任务unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += getIOPendingCount(j);//等待io线程结束,并返回处理的数量if (pending == 0) break;}io_threads_op = IO_THREADS_OP_IDLE;/* Run the list of clients again to install the write handler where* needed. */listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);/* Install the write handler if there are pending writes in some* of the clients. */// 数据没写完,注册写事件回调if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){freeClientAsync(c);}}listEmpty(server.clients_pending_write);...}

负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了。这样做的好处是,避免加锁。当前是在主线程中,进行分配任务通过取余操作,将任务均分给不同的io线程。

线程调度

开启io线程startThreadedIO

每个io线程都有一把锁,如果主线程把锁还回去了,那么io线程就会启动,不再阻塞
并设置io线程标识为活跃状态io_threads_active=1

void startThreadedIO(void) {serverAssert(server.io_threads_active == 0);for (int j = 1; j < server.io_threads_num; j++)pthread_mutex_unlock(&io_threads_mutex[j]);server.io_threads_active = 1;
}
关闭io线程stopThreadedIO

每个io线程都有一把锁,如果主线程拿了,那么io线程就会阻塞等待,也就是停止了IO线程
并设置io线程标识为非活跃状态io_threads_active=0

void stopThreadedIO(void) {/* We may have still clients with pending reads when this function* is called: handle them before stopping the threads. */handleClientsWithPendingReadsUsingThreads();serverAssert(server.io_threads_active == 1);for (int j = 1; j < server.io_threads_num; j++)pthread_mutex_lock(&io_threads_mutex[j]);//server.io_threads_active = 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/173823.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

链表加法与节点交换:数据结构的基础技能

目录 两两交换链表中的节点单链表加一链表加法使用栈实现使用链表反转实现 两两交换链表中的节点 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点…

ModbusTCP 转 Profinet 主站网关控制汇川伺服驱动器配置案例

ModbusTCP Client 通过 ModbusTCP 控制 Profinet 接口设备&#xff0c;Profinet 接口设备接入 DCS/工控机等 兴达易控ModbusTCP转Profinet主站网关&#xff08;XD-ETHPNM20&#xff09;采用数据映射方式进行工作。 使用设备&#xff1a;兴达易控ModbusTCP 转 Profinet 主站网关…

竞赛 深度学习实现行人重识别 - python opencv yolo Reid

文章目录 0 前言1 课题背景2 效果展示3 行人检测4 行人重识别5 其他工具6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习的行人重识别算法研究与实现 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c…

几个常用的nosql数据库的操作方式

dynamoDB 键 partition key&#xff1a;分区键 定义&#xff1a;分区键是用于分布数据存储的主键&#xff0c;每个项&#xff08;Item&#xff09;在表中都必须有一个唯一的分区键值。 特点&#xff1a; 唯一性&#xff1a;每个分区键值在表中必须是唯一的&#xff0c;这是因为…

闭包通俗解释,Demo(Go Java Python)

闭包的概念 想象一下&#xff0c;你有一个包裹着变量的函数&#xff0c;就像是一个封闭的包裹。这个包裹里有一个变量&#xff0c;而这个函数&#xff08;或包裹&#xff09;本身就是一个完整的单元。当你把这个函数传递给其他地方&#xff0c;就像是把这个包裹传递出去。 这…

Webpack简介及打包演示

Webpack 是一个静态模块打包工具&#xff0c;从入口构建依赖图&#xff0c;打包有关的模块&#xff0c;最后用于展示你的内容 静态模块&#xff1a;编写代码过程中的&#xff0c;html&#xff0c;css&#xff0c; js&#xff0c;图片等固定内容的文件 打包过程&#xff0c;注…

【黑马程序员】mysql进阶再进阶篇笔记

64. 进阶-锁-介绍(Av765670802,P121) 为了应对不同场景 全局锁-所有表 表计锁 一张表 行级锁 一行数据 65. 进阶-锁-全局锁-介绍(Av765670802,P122) 66. 进阶-锁-全局锁-一致性数据备份(Av765670802,P123) 67. 进阶-锁-表级锁-表锁(Av765670802,P124) 读锁、写锁 68. 进阶…

Ansible脚本进阶---playbook

目录 一、playbooks的组成 二、案例 2.1 在webservers主机组中执行一系列任务&#xff0c;包括禁用SELinux、停止防火墙服务、安装httpd软件包、复制配置文件和启动httpd服务。 2.2 在名为dbservers的主机组中创建一个用户组&#xff08;mysql&#xff09;和一个用户&#xf…

Jtti:Apache服务的反向代理及负载均衡怎么配置

配置Apache服务的反向代理和负载均衡可以帮助您分散负载并提高应用程序的可用性和性能。下面是一些通用的步骤&#xff0c;以配置Apache反向代理和负载均衡。 1. 安装和配置Apache&#xff1a; 确保您已经安装了Apache HTTP服务器。通常&#xff0c;Apache的配置文件位于/etc…

Python文件——使用Python读取txt文件

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 本文专栏&#xff1a;Python专栏 专栏介绍&#xff1a;本专栏为免费专栏&#xff0c;并且会持续更新python基础知识&#xff0c;欢迎各位订阅关注. 目录 一、文件的编码 1. 什么是编码 2. 常见的编码 二、P…

深入浅出排序算法之堆排序

目录 1. 算法介绍 2. 执行流程⭐⭐⭐⭐⭐✔ 3. 代码实现 4. 性能分析 1. 算法介绍 堆是一种数据结构&#xff0c;可以把堆看成一棵完全二叉树&#xff0c;这棵完全二叉树满足&#xff1a;任何一个非叶结点的值都不大于(或不小于)其左右孩子结点的值。若父亲大孩子小&#x…

《动手学深度学习 Pytorch版》 10.7 Transformer

自注意力同时具有并行计算和最短的最大路径长度这两个优势。Transformer 模型完全基于注意力机制&#xff0c;没有任何卷积层或循环神经网络层。尽管 Transformer 最初是应用于在文本数据上的序列到序列学习&#xff0c;但现在已经推广到各种现代的深度学习中&#xff0c;例如语…

提高抖音小店用户黏性和商品销量的有效策略

抖音小店是抖音平台上的电商模式&#xff0c;用户可以在抖音上购买各类商品。要提高用户黏性和商品销量&#xff0c;四川不若与众帮你整理了需要注意以下几个方面。 首先&#xff0c;提供优质的商品和服务。在抖音小店中&#xff0c;用户会通过观看商品展示视频和用户评价来选…

Linux 网络驱动实验(PHY芯片LAN8720)

目录 嵌入式网络简介嵌入式下的网络硬件接口 网络驱动是linux 里面驱动三巨头之一&#xff0c;linux 下的网络功能非常强大&#xff0c;嵌入式linux 中也常 常用到网络功能。前面我们已经讲过了字符设备驱动和块设备驱动&#xff0c;本章我们就来学习一下 linux 里面的网络设备…

GAMP源码阅读(中)伪距单点定位 SPP

原始 Markdown文档、Visio流程图、XMind思维导图见&#xff1a;https://github.com/LiZhengXiao99/Navigation-Learning 文章目录 一、SPP 解算1、spp()&#xff1a;单点定位主入口函数2、estpos()3、estpose_()4、valsol()&#xff1a;GDOP和卡方检验结果有效性 二、卫星位置钟…

N-130基于springboot,vue校园社团管理系统

开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8 项目构建&#xff1a;maven 数据库&#xff1a;mysql5.7 系统分前后台&#xff0c;项目采用前后端分离 前端技术&#xff1a;vueelementUI 服务端技术&#xff1a;springbootmybatis-plus 本系…

Redis -- 基础知识3 数据类型及指令

FLUSHALL:清空所有键值对操作(最好别搞,删库要被绳之以法的) 1.string类型 1.介绍 1.redis的字符串,直接按照二进制进行存储,所以可以存储任何数据,取出时不需要转码 2.redis的string类型,限制大小最大为512M,因为为单线程模型为了操作短平快 2.操作 1.set与get set key value …

STM32G030F6P6 芯片实验 (一)

STM32G030F6P6 芯片实验 (一) 淘宝搞了几片, 没试过 G系列, 试试感觉. 先搞片小系统版: 套 STM32F103C8T6小系统板格式. 原理图: (1) Ref 有点跳, 从 STM32F103C8T6 系统板改的, 没重编号. (2) Type-C 纯给电, 砍了 16pin的, 直接换 6pin的。 (3) 测试LED放 B2。 (4) 测试底…

uni-app中tab选项卡的实现效果 @click=“clickTab(‘sell‘)“事件可传参数

一、效果图 二、代码 <template><view><view class"choose-tab"><view class"choose-tab-item" :class"chooseTab 0 ? active : " data-choose"0" click"clickTab">选项1</view><view …

webpack 解决:TypeError: merge is not a function 的问题

1、问题描述&#xff1a; 其一、存在的问题为&#xff1a; TypeError: merge is not a function 中文为&#xff1a; 类型错误&#xff1a;merge 不是函数 其二、问题描述为&#xff1a; 想执行 npm run dev 命令&#xff0c;运行起项目时&#xff0c;控制台报错 TypeErro…