文章目录
- redis多线程模型
- redis为什么引入I/O多线程
- I/O多线程模型
- 源码解析
- 测试设置
- 连接建立
- 数据传输
- 线程调度
- 开启io线程`startThreadedIO`
- 关闭io线程`stopThreadedIO`
redis多线程模型
redis为什么引入I/O多线程
Redis 的性能瓶颈在网络 IO 的处理上。Redis 是网络 IO 密集型,需要同时处理多条并发请求,读写 IO 的问题(请求大量数据,写日志业务等)。多线程处理网络 IO,单线程执行命令。
Redis 线程池作用读写 IO 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程执行命令,是因为 Redis 采用高效的数据结构,其业务逻辑处理较快。
I/O多线程模型
主线程拥有两个全局队列clients_pending_read
和clients_pending_write
,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]
。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。
首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read
中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。
接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。
redis的单线程是指,命令执行(logic)都是在单线程中运行的
接受数据read和发送数据write都是可以在io多线程(线程池)中去运行
在Redis中,生产者也可以作为消费者,反之亦然,没有明确界限。
源码解析
测试设置
redis 线程池默认作用在 encode, send 阶段,这是因为客户端从 redis 获取大量数据需要并发处理。若想作用在 read, decode 阶段,需要手动开启。在 redis.conf 文件中,可以设置:
# 开启io线程的数量io-threads 4# 优化:read deconde 过程。默认优化,encode send从 redis 获取大量数据io-threads-do-reads yes
开启 io 多线程的前提是有多个并发连接。如何在单个连接的情况下,开启 io 多线程调试,需要修改 redis 源码:
// networking.cint stopThreadedIOIfNeeded(void) {// 单个连接的情况下,开启多线程调试,永远不关闭 io 多线程return 0; ...}
连接建立
主线程处理连接建立,listenfd
- 连接到达,触发读事件回调:acceptTcpHandler
- 接收连接:acceptTcpHandler
- 初始化新连接:createClient
// server.cvoid initServer(void) {...// 1、连接到来,触发读事件回调if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR) ...}// networking.cvoid acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {...while(max--) {// 2、接收连接:内部封装 acceptcfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);...// 为 cfd 初始化新连接,内部调用 createClientacceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}}static void acceptCommonHandler(connection *conn, int flags, char *ip) {.../* Create connection and client */// 3、创建新的连接if ((c = createClient(conn)) == NULL) {...}...}client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));/* passing NULL as conn it is possible to create a non connected client.* This is useful since all the commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */if (conn) {connNonBlock(conn);connEnableTcpNoDelay(conn);if (server.tcpkeepalive)connKeepAlive(conn,server.tcpkeepalive);// 4.接收数据的读事件触发,回调readQueryFromClient函数connSetReadHandler(conn, readQueryFromClient);connSetPrivateData(conn, c);}
}
数据传输
clientfd
-
读事件回调:readQueryFromClient
-
分割并处理数据包 processInputBuffer
-
- 分割数据包:processInlineBuffer 和 processMultibulkBuffer
- 处理数据包:processCommandAndResetClient
-
数据写到 buffer:addReply
-
数据写到 socket:writeToClient
-
写事件回调:sendReplyToClient
当读事件触发时,执行读事件回调函数。主线程收集读事件就绪的连接放入全局任务队列``clients_pending_read,并设置连接状态为
CLIENT_PENDING_READ`。子线程从该全局队列中获取任务后,也调用该读事件回调函数,进行 read 和 decode 的业务逻辑处理。
// networking.cvoid readQueryFromClient(connection *conn) {.../* Check if we want to read from the client later when exiting from* the event loop. This is the case if threaded I/O is enabled. */// 开启 io 线程后,延迟处理客户端的读,将任务丢到全局队列,再分配给 io 线程// 主线程返回 1,不执行业务逻辑处理;// 子线程返回 0,继续往下,执行业务逻辑处理if (postponeClientRead(c)) return; // 1、read 阶段,(io 线程)将任务读到缓冲区 nread = connRead(c->conn, c->querybuf+qblen, readlen);// 2、decode 阶段,(io 线程)解析数据包processInputBuffer(c);}int postponeClientRead(client *c) {if (server.io_threads_active &&server.io_threads_do_reads &&!clientsArePaused() &&!ProcessingEventsWhileBlocked &&!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))){// 主线程,返回 1// 将连接状态设置为 CLIENT_PENDING_READc->flags |= CLIENT_PENDING_READ;// 收集任务,把客户端连接放到全局队列中,后续会分配到 io 线程listAddNodeHead(server.clients_pending_read,c);return 1;} else {// 子线程,即 io 线程,返回 0return 0;}}
子线程(IO 线程)从专属任务队列 io_threads_pending
获取任务,执行 read decode 和 encode write 业务逻辑处理。
// networking.c// 线程池入口函数:子线程void *IOThreadMain(void *myid) {...while(1) {/* Wait for start */// 等待获取专属任务队列中的任务for (int j = 0; j < 1000000; j++) {if (io_threads_pending[id] != 0) break;} .../* Process: note that the main thread will never touch our list* before we drop the pending count to 0. */listIter li;listNode *ln;// 从专属任务队列中取出任务listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {// encode 和 writewriteToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {// read 和 decode,读事件回调函数readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);io_threads_pending[id] = 0;...}}
子线程 decode 结束后,设置连接状态 CLIENT_PENDING_COMMAND
,交给主线程来 compute,退出读事件回调函数。主线程负责 compute ,解析 redis 命令。
// networking.c// readQueryFromClient 函数中 decode 阶段调用void processInputBuffer(client *c) {/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {...if (c->reqtype == PROTO_REQ_INLINE) {// 分割数据包。并判断是否完整if (processInlineBuffer(c) != C_OK) break;...} else if (c->reqtype == PROTO_REQ_MULTIBULK) {// 分割 pipline 的数据包,并判断是否完整if (processMultibulkBuffer(c) != C_OK) break;}...else {/* If we are in the context of an I/O thread, we can't really* execute the command here. All we can do is to flag the client* as one that needs to process the command. */// io 线程设置任务状态,交给主线程compute,退出读事件回调函数if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}/* We are finally ready to execute the command. */// 3、compute,主线程解析命令if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid exiting this* loop and trimming the client buffer later. So we return* ASAP in that case. */return;}}}...}
主线程 compute 结束后,调用 addReply 函数,将处理完的连接放到全局任务队列clients_pending_write
,并将待发送的数据写到缓冲区。
// networking.cint processCommandAndResetClient(client *c) {...// 处理命令if (processCommand(c) == C_OK) {commandProcessed(c);}...}// server.cint processCommand(client *c) {.../* Exec the command */// 开启 io 多线程,且不是事务命令if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){// 把数据写到缓冲区addReply(c,shared.queued);} else {// 执行 redis 命令call(c,CMD_CALL_FULL);...}...}// networking.c// 数据写到发送缓冲区void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;...}int prepareClientToWrite(client *c) {...if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))clientInstallWriteHandler(c); // 任务写到全局队列中...}
接下来,子线程和主线程都可以从自己的专属任务队列中获得该任务,执行 encode 和 send 的业务逻辑处理 writeToClient。若数据未发送完,则注册写事件回调,等待再次发送。
// 子线程:线程池入口函数void *IOThreadMain(void *myid) {... if (io_threads_op == IO_THREADS_OP_WRITE) {// encode 和 writewriteToClient(c,0); // 数据写到 socket} else if (io_threads_op == IO_THREADS_OP_READ) {// read 和 decodereadQueryFromClient(c->conn); // 读事件回调函数...}// 主线程int handleClientsWithPendingWritesUsingThreads(void) {int processed = listLength(server.clients_pending_write);if (processed == 0) return 0; /* Return ASAP if there are no clients. *//* If I/O threads are disabled or we have few clients to serve, don't* use I/O threads, but the boring synchronous code. */if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {//判断是否有必要开启IO多线程return handleClientsWithPendingWrites();}/* Start threads if needed. */if (!server.io_threads_active) startThreadedIO();//开启io多线程/* Distribute the clients across N different lists. */listIter li;listNode *ln;listRewind(server.clients_pending_write,&li);//创建一个迭代器li,用于遍历任务队列clients_pending_writeint item_id = 0;//默认是0,先分配给主线程去做(生产者也可能是消费者),如果设置成1,则先让io线程1去做//io_threads_list[0] 主线程//io_threads_list[1] io线程//io_threads_list[2] io线程 //io_threads_list[3] io线程 //io_threads_list[4] io线程while((ln = listNext(&li))) {client *c = listNodeValue(ln);//取出一个任务c->flags &= ~CLIENT_PENDING_WRITE;/* Remove clients from the list of pending writes since* they are going to be closed ASAP. */if (c->flags & CLIENT_CLOSE_ASAP) {//表示该客户端的输出缓冲区超过了服务器允许范围,将在下一次循环进行一个关闭,也不返回任何信息给客户端,删除待读客户端listDelNode(server.clients_pending_write, ln);continue;}/* Since all replicas and replication backlog use global replication* buffer, to guarantee data accessing thread safe, we must put all* replicas client into io_threads_list[0] i.e. main thread handles* sending the output buffer of all replicas. */if (getClientType(c) == CLIENT_TYPE_SLAVE) {listAddNodeTail(io_threads_list[0],c);continue;}//负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了//这样做的好处是,避免加锁。当前是在主线程中,进行分配任务//通过取余操作,将任务均分给不同io线程int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}/* Give the start condition to the waiting threads, by setting the* start condition atomic var. */io_threads_op = IO_THREADS_OP_WRITE;for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);setIOPendingCount(j, count);//设置io线程启动条件,启动io线程}/* Also use the main thread to process a slice of clients. */// 让主线程去处理一部分任务listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);writeToClient(c,0); }listEmpty(io_threads_list[0]);/* Wait for all the other threads to end their work. */while(1) {//剩下的任务io_threads_list[1],io_threads_list[2].....给io线程去做,等待io线程完成任务unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += getIOPendingCount(j);//等待io线程结束,并返回处理的数量if (pending == 0) break;}io_threads_op = IO_THREADS_OP_IDLE;/* Run the list of clients again to install the write handler where* needed. */listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);/* Install the write handler if there are pending writes in some* of the clients. */// 数据没写完,注册写事件回调if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){freeClientAsync(c);}}listEmpty(server.clients_pending_write);...}
负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了。这样做的好处是,避免加锁。当前是在主线程中,进行分配任务通过取余操作,将任务均分给不同的io线程。
线程调度
开启io线程startThreadedIO
每个io线程都有一把锁,如果主线程把锁还回去了,那么io线程就会启动,不再阻塞
并设置io线程标识为活跃状态io_threads_active=1
void startThreadedIO(void) {serverAssert(server.io_threads_active == 0);for (int j = 1; j < server.io_threads_num; j++)pthread_mutex_unlock(&io_threads_mutex[j]);server.io_threads_active = 1;
}
关闭io线程stopThreadedIO
每个io线程都有一把锁,如果主线程拿了,那么io线程就会阻塞等待,也就是停止了IO线程
并设置io线程标识为非活跃状态io_threads_active=0
void stopThreadedIO(void) {/* We may have still clients with pending reads when this function* is called: handle them before stopping the threads. */handleClientsWithPendingReadsUsingThreads();serverAssert(server.io_threads_active == 1);for (int j = 1; j < server.io_threads_num; j++)pthread_mutex_lock(&io_threads_mutex[j]);//server.io_threads_active = 0;
}