因为主从复制的过程很复杂, 同时核心逻辑主要集中在 replication.c 这个文件中, 避免篇幅过大, 所以将主从复制中涉及这个文件的代码集中到了另一篇文章。
在当前文章主要分析主从复制的大体代码逻辑, 如果需要了解整体的过程, 可以配合 Redis 主从复制 - relication 源码分析 这篇文章。
1 主从节点建立连接准备
Redis 主从节点建立连接的 3 种方式, 本质都是从节点执行 slaveof 命令, 和父节点建立初步的关联关系。
这个命令执行的方法为 replicaofCommand (高版本的 Redis 可以通过 replicaof 达到 slaveof 的效果)。
void replicaofCommand(client *c) {// 开启了集群功能, 直接返回, 集群模式不允许执行 slaveof if (server.cluster_enabled) {addReplyError(c,"REPLICAOF not allowed in cluster mode.");return;}// 第一参数为 no, 第二个参数为 one// slaveof no one, 可以让从节点和主节点断开连接, 停止主从复制 if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) {if (server.masterhost) {// 取消复制操作, 同时设置当前节点为主节点// 具体代码逻辑, 可以查看 replication 文章的 replicationUnsetMaster 方法解析// 1. 置空 server.masterhost // 2. 将第一组 replid 和 offset 赋值到第二组, 重试生成一个 replid// 3. 置空 server.cached_server// 4. 如果在传输 RDB 文件中或者处于握手阶段, 进行取消, 同时取消和主节点的连接// 5. 如果有从节点, 释放所有的从节点客户端, 也就是断开从节点的连接// 6. 当前节点的状态变为 REPL_STATE_NONE (普通状态, 无主从复制状态)replicationUnsetMaster();// 获取 client 的每种信息, 并以 sds 形式返回, 并打印到日志中sds client = catClientInfoString(sdsempty(),c);sdsfree(client);}} else {// 本身是一个从节点了, 无法在执行 salveof ip 端口if (c->flags & CLIENT_SLAVE) {addReplyError(c, "Command is not valid when client is a replica.");return;}// 从入参中获取端口if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))return;// 已经有主节点了, 同时主节点的的 host 和 ip 和入参的相同if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));return;}// 保存主节点信息并进入待连接状态// 具体代码逻辑, 可以查看 replication 文章的 replicationSetMaster 方法解析// 1. 保存主节点的 IP 和 端口到 server.masterhost 和 server.masterport// 2. 解除所有阻塞状态的客户端// 3. 释放所有的从节点信息// 4. 取消主从复制的握手操作// 5. 当前节点如果没有主节点, 为 server.cached_server 赋值一个默认生成的 client// 6. 设置当前的节点的状态为 REPL_STATE_CONNECT (待连接上主节点)replicationSetMaster(c->argv[1]->ptr, port);}// 响应客户端 okaddReply(c,shared.ok);
}
主从复制的第一步逻辑
- 将入参的主节点的 ip 和 port 保存在 server
- 创建出一个代表主节点的客户端 client, 赋值给 server.cached_server (如果是一开始是从节点, 重启了, 这一步未必会有)
- 当前从节点的节点状态变更为 REPL_STATE_CONNECT (开启了主从复制, 但是还没连接上主节点)
执行完上的逻辑后, salveof (replicaof) 就结束的, 但是整个的主从复制还没有开始, 可以得出 salveof 是一个异步的命令。
接下来的步骤则是由定时函数 serverCron 定时的调用。
2 主从节点建立 TCP 连接
在第一步中, 只是将主节点的信息保存到从节点中就结束了, 之间还是没有建立起相关的网络连接的, 第二步就是完成这个网络连接的操作。
而这个网络连接建立的触发是通过定时函数执行的
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {// 省略// 定时 1 秒执行一次run_with_period(1000) replicationCron();// 省略
}
replicationCron 里面涉及到了大量的逻辑, 基本整个复制运行阶段的状态判断等都是在里面判断的, 这里只截取了涉及到当前步骤相关的逻辑。
在第一步操作完成后, 可以知道从节点当前的状态为 REPL_STATE_CONNECT。
void replicationCron(void) {// 省略// 当前的状态为 REPL_STATE_CONNECT (开启了主从复制, 但是还没连接上主节点)// 顺利执行了 salveof 后, 从节点的默认状态if (server.repl_state == REPL_STATE_CONNECT) {// 尝试连接主节点, 连接成功后, 从节点的状态会变为 REPL_STATE_CONNECTING (正在连接主节点)// 具体代码逻辑, 可以查看 replication 文章的 connectWithMaster 方法解析// 1. 通过保存的 ip 和 port 和主节点建立一个 TCP 连接// 2. 向事件轮询添加对应的 TCP 通道的读写事件, 执行的函数为 syncWithMaster// 3. 更新当前节点的状态为 REPL_STATE_CONNECTING (正在连接主节点)if (connectWithMaster() == C_OK) {}}// 省略
}
主从复制的第二步逻辑
- 和主节点建立起了 TCP 连接
- 向事件轮询注册一个读写事件, 触发的函数为 connectWithMaster, 同时这个函数会在下次事件轮询时自动执行一次
- 将当前节点的状态更新为 REPL_STATE_CONNECTING (从节点开始连接主节点)
3 发送 PING 命令
3.1 从节点发送 PING 命令给主节点
在第二步的步骤中, 通过保存的主节点 ip 和 port 建立起 TCP 连接后, 会向事件轮询中注册一个 AE_READABLE|AE_WRITABLE 的事件。
读写同时注册时, 会自动触发一次, 也就是在下次事件轮询中会执行到其注册的函数 syncWithMaster 函数, 所以第三步的入口就是这个函数了。
// 入参中的 fd 就是和主节点建立的 Socket 连接的文件描述符
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {int sockerr = 0;socklen_t errlen = sizeof(sockerr);UNUSED(el);UNUSED(privdata);UNUSED(mask);// 状态为 REPL_STATE_NONE, 关闭对应的文件描述符if (server.repl_state == REPL_STATE_NONE) {close(fd);return;}// 检查当前的 Socket 通道的状态if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)// 获取异常信息sockerr = errno;// 有异常信息if (sockerr) {goto error;}// 从节点和父节点建立了 Socket 后的第一个状态为 REPL_STATE_CONNECTINGif (server.repl_state == REPL_STATE_CONNECTING) {// 删除当前这个 Socket 的可写事件, 不关心写事件aeDeleteFileEvent(server.el,fd,AE_WRITABLE);// 状态修改为 REPL_STATE_RECEIVE_PONG (发送 pong, 等待 ping 回答)server.repl_state = REPL_STATE_RECEIVE_PONG;// 发送同步命令, 也就是 ping 到主节点, SYNC_CMD_WRITE = 1// 具体代码逻辑, 可以查看 replication 文章的 sendSynchronousCommand 方法解析err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);if (err) goto write_error;return;}error:aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);if (dfd != -1) close(dfd);close(fd);server.repl_transfer_s = -1;server.repl_state = REPL_STATE_CONNECT;return;write_error: // 从节点向父节点发送 SYNC_CMD_WRITE 失败时的处理逻辑sdsfree(err);goto error;
}
3.2 主节点发送 Pong 响应从节点的 Ping 命令
主节点收到了从节点的 Ping 命令后, 处理正常后, 会响应一个 Pong 的命令。
主节点 执行的 Ping 命令的逻辑如下
void pingCommand(client *c) {// ping 命令的参数只能是 1 个或者 0 个if (c->argc > 2) {addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);return;}// 对应的客户端处于 Pub/Sub 模式if (c->flags & CLIENT_PUBSUB) {addReply(c,shared.mbulkhdr[2]);addReplyBulkCBuffer(c,"pong",4);if (c->argc == 1)addReplyBulkCBuffer(c,"",0);elseaddReplyBulk(c,c->argv[1]);} else {// 其他模式// 参数是 1 个, 响应一个 pongif (c->argc == 1)addReply(c,shared.pong);else// 响应入参的第 2 个参数addReplyBulk(c,c->argv[1]);}
}
3.3 从节点收到主节点的响应的 Pong 命令
从节点 收到了主节点发送过来的 Pone 响应命令, 这时会触发在上面第 2 步对 Socket 连接建立的可读事件。
当事件轮询循环中找到了可读事件, 又执行到 syncWithMaster 函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 当前从节点处于 REPL_STATE_RECEIVE_PONG 状态 (发送 ping, 等待 pong 应答)if (server.repl_state == REPL_STATE_RECEIVE_PONG) {// 读取主节点响应的信息err = sendSynchronousCommand(SYNC_CMD_READ, fd, NULL);// 异常情况if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) {sdsfree(err);goto error;} else {// 响应的是 Pong, 能继续处理}sdsfree(err);// 状态切换到 REPL_STATE_SEND_AUTH, 等待认证结果应答server.repl_state = REPL_STATE_SEND_AUTH;}
}
主从复制的第三步逻辑
- 从节点向主节点发送了一个 Ping 命令, 自身状态由 REPL_STATE_CONNECTING (从节点开始连接主节点) 变为 REPL_STATE_RECEIVE_PONG (等待主节点响应 Pong 命令)
- 主节点收到从节点发送的 Ping 命令, 向其响应了一个 Pong 命令
- 从节点收到主节点响应的 Pong 命令, 会将其自身的状态从 REPL_STATE_RECEIVE_PONG (等待主节点响应 Pong 命令) 变为 REPL_STATE_SEND_AUTH (准备发送认证)
主要是通过发送 Ping 命令和接受主节点响应的 Pong 命令, 初步确定双方网络的正常
4 密码认证
在从节点发送 Ping, 主节点响应 Pong , 从节点收到 Pong 响应后, 进入处理时 (syncWithMaster 函数),
状态修改为 REPL_STATE_SEND_AUTH 后, 方法继续执行下去, 立即进入密码认证的过程。
4.1 从节点发送认证密码
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 当前从节点处于 REPL_STATE_RECEIVE_PONG 状态 (发送 ping, 等待 pong 应答)if (server.repl_state == REPL_STATE_RECEIVE_PONG) {// 省略// 状态切换到 REPL_STATE_SEND_AUTH, 准备发送认证server.repl_state = REPL_STATE_SEND_AUTH;}// 进入认证, 如果需要的话if (server.repl_state == REPL_STATE_SEND_AUTH) {// 配置了主节点的密码if (server.masterauth) {// 发送认证请求和密码到主节点// auth 密码err = sendSynchronousCommand(SYNC_CMD_WRITE, fd, "AUTH", server.masterauth, NULL);if (err) goto write_error;// 状态切换为 REPL_STATE_RECEIVE_AUTH (等待认证结果响应)server.repl_state = REPL_STATE_RECEIVE_AUTH;return;} else {// 不需要认证, 状态之间切换为 REPL_STATE_SEND_PORT 准备发送端口server.repl_state = REPL_STATE_SEND_PORT;}}
}
从节点根据是否配置了主节点认证密码, 走不同的逻辑
- 配置了认证密码, 发送auth 密码给主节点, 同时状态变为 REPL_STATE_RECEIVE_AUTH (等待主节点响应认证结果应答)
- 没有配置认证密码, 直接将状态变为 REPL_STATE_SEND_PORT (准备发送从节点的监听的端口)
4.2 主节点响应 Auth 命令
主节点收到从节点的认证请求 auth, 就会进入到权限认证的过程, 执行的逻辑如下:
void authCommand(client *c) {// 主节点不需要密码认证if (!server.requirepass) {addReplyError(c,"Client sent AUTH, but no password is set");} else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {// 密码认证成功 c->authenticated = 1;addReply(c,shared.ok);} else {// 密码认证失败 c->authenticated = 0;addReplyError(c,"invalid password");}
}
主节点收到从节点的 auth 命令后
- 本身没有设置密码, 直接返回错误
- 收到的密码和自身配置的密码一样, 返回成功
- 收到的密码和自身配置的密码不一样, 返回错误
4.3 从节点收到 Auth 命令的响应结果
同 Ping Pong 的处理逻辑一样, 这时从节点读取到主节点的响应, 事件轮询触发 syncWithMaster 函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 当前从节点处于 REPL_STATE_RECEIVE_PONG 状态 (发送 ping, 等待 pong 应答)if (server.repl_state == REPL_STATE_RECEIVE_PONG) {// 省略// 状态切换为 REPL_STATE_RECEIVE_AUTH (等待认证结果响应)server.repl_state = REPL_STATE_RECEIVE_AUTH;return;}// 接收到请求认证的响应if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {// 读取响应信息err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);// 认证失败if (err[0] == '-') {sdsfree(err);goto error;}// 认证成功sdsfree(err);// 状态变为 REPL_STATE_SEND_PORT (准备发送从节点的监听的端口)server.repl_state = REPL_STATE_SEND_PORT;}
}
主从复制的第四步逻辑
- 如果从节点没有配置 masterauth, 则直接进入下一个阶段, 状态从 REPL_STATE_SEND_AUTH (准备发送认证) 直接变为 REPL_STATE_SEND_PORT (准备发送从节点的监听的端口)
- 如果有配置 masterauth, 则会
1 从节点向主节点发送了 Auth 密码 的命令给主节点, 自身状态由 REPL_STATE_SEND_AUTH (准备发送认证) 变为 REPL_STATE_RECEIVE_AUTH (等待认证结果响应)
2 主节点收到从节点发送的 Auth 密码, 在确定没错后, 向其响应了一个 ok 的字符串
3 从节点收到主节点响应的 ok, 会将其自身的状态从 REPL_STATE_RECEIVE_AUTH (等待认证结果响应) 变为 REPL_STATE_SEND_PORT (准备发送从节点的监听的端口)
第四步, 通过配置的密码和主节点进行认证(如果需要的话), 在认证成功或无需认证后, 进入到 REPL_STATE_SEND_PORT (准备发送从节点的监听的端口)
5 发送端口号
在不需要权限认证或者从节点收到主节点的权限认证成功后, 此时从节点的状态为 REPL_STATE_SEND_PORT, 顺着上一步的处理逻辑中, 继续处理
5.1 发送从节点主从复制的端口
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 收到权限认证的响应if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {// 配置了主节点的密码if (server.masterauth) {// 省略} else {// 不需要认证, 状态之间切换为 REPL_STATE_SEND_PORT 准备发送端口server.repl_state = REPL_STATE_SEND_PORT;}}// 接收到请求认证的响应if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {// 省略// 状态变为 REPL_STATE_SEND_PORT (准备发送从节点的监听的端口)server.repl_state = REPL_STATE_SEND_PORT;}// 进入发送端口阶段if (server.repl_state == REPL_STATE_SEND_PORT) {// 如果有配置一个专门复制的端口的话, 使用配置的端口, 没有使用当前服务器的端口sds port = sdsfromlonglong(server.slave_announce_port ? server.slave_announce_port : server.port);// 发送端口信息给主节点 命令: replconf listening-port 端口err = sendSynchronousCommand(SYNC_CMD_WRITE, fd, "REPLCONF", "listening-port", port, NULL);sdsfree(port);if (err) goto write_error;sdsfree(err);// 切换状态为 REPL_STATE_RECEIVE_PORT (等待主节点响应收到从节点端口)server.repl_state = REPL_STATE_RECEIVE_PORT;return;}
}
向主节点发送自己的主从复制的端口
5.2 主节点响应从节点的发送端口命令
主节点收到从节点的发送端口请求 replconf listening-port 端口, 执行的逻辑如下
void replconfCommand(client *c) {int j;// 参数需要是 2 的倍数if ((c->argc % 2) == 0) {addReply(c,shared.syntaxerr);return;}for (j = 1; j < c->argc; j+=2) {// 每个循环使用 2 个参数// replconf listening-port portif (!strcasecmp(c->argv[j]->ptr,"listening-port")) {long port;// 获取下一个项, 也就是端口号if ((getLongFromObjectOrReply(c,c->argv[j+1], &port,NULL) != C_OK))return;// 保存到对应的客户端的 slave_listening_port c->slave_listening_port = port;} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {// 省略} else {// 响应错误addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr);return;}}// 响应 OK addReply(c,shared.ok);
}
可以看到主节点收到从节点发送过来的端口
- 会保存到从节点客户端 client 的 slave_listening_port 字段
- 响应一个 ok 字符串
5.3 从节点收到主节点对自己发送端口命令的响应
收到主节点的响应后, 从节点同样是事件轮询触发 syncWithMaster 函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 进入发送端口阶段if (server.repl_state == REPL_STATE_SEND_PORT) {// 省略// 切换状态为 REPL_STATE_RECEIVE_PORT (等待主节点响应收到从节点端口)server.repl_state = REPL_STATE_RECEIVE_PORT;return;}// REPL_STATE_RECEIVE_PORT 等待主节点响应发送端口请求的响应if (server.repl_state == REPL_STATE_RECEIVE_PORT) {err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);sdsfree(err);// 状态变为 REPL_STATE_SEND_IP, 准备发送 IP 到主节点server.repl_state = REPL_STATE_SEND_IP;}
}
主从复制的第五步逻辑
- 从节点向主节点发送 replconf listening-port 自身的端口给主节点, 同时进入到 REPL_STATE_RECEIVE_PORT (等待主节点响应收到从节点端口请求)
- 主节点收到 replconf listening-port 从节点的端口 命令后, 将其存到自身维护的客户端对象的 slave_listening_port 字段, 向其响应了一个 ok 的字符串
- 从节点收到主节点响应的 ok, 会将其自身的状态从 REPL_STATE_RECEIVE_PORT 变为 REPL_STATE_SEND_IP (发送主从复制配置的监听的 IP 地址)
6 发送 IP 地址
从节点收到主节点对 replconf listening-port 端口 的响应后, 从节点会将状态修改为 REPL_STATE_SEND_IP, 然后顺着逻辑走下去
6.1 发送从节点主从复制的 Ip
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// REPL_STATE_RECEIVE_PORT 等待主节点响应发送 IP 请求的响应if (server.repl_state == REPL_STATE_RECEIVE_PORT) {// 省略// 状态变为 REPL_STATE_SEND_IP, 准备发送 IP 到主节点server.repl_state = REPL_STATE_SEND_IP;}// slave_announce_ip 为空 (没有配置指定的 IP), 直接跳过发送 IP 的阶段if (server.repl_state == REPL_STATE_SEND_IP && server.slave_announce_ip == NULL) {// 进入下一个节点 准备发送从节点的发送能力server.repl_state = REPL_STATE_SEND_CAPA;}if (server.repl_state == REPL_STATE_SEND_IP) {// 发送 replconf ip-address iperr = sendSynchronousCommand(SYNC_CMD_WRITE, fd, "REPLCONF", "ip-address", server.slave_announce_ip, NULL);if (err) goto write_error;sdsfree(err);// 状态变为 REPL_STATE_RECEIVE_IP (等待主节点响应收到从节点的 IP 地址)server.repl_state = REPL_STATE_RECEIVE_IP;return;}
}
从节点进入发送 IP 地址阶段时, 除了状态需要为 REPL_STATE_SEND_IP (准备发送 IP 地址阶段), 还必须有指定 slave_announce_ip, 从节点的 IP (对应配置文件的 slave-announce-ip),
2 个条件都满足的情况下, 才会真正的进入发送 Ip 地址, 否则直接进入下一阶段 REPL_STATE_SEND_CAPA (准备发送从节点支持的复制能力)。
6.2 主节点响应从节点的发送 Ip 命令
主节点收到从节点的发送的 replconf ip-address Ip 请求, 执行的逻辑如下
void replconfCommand(client *c) {int j;// 参数需要是 2 的倍数if ((c->argc % 2) == 0) {addReply(c,shared.syntaxerr);return;}for (j = 1; j < c->argc; j+=2) {// 每个循环使用 2 个参数if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {// replconf ip-address Ip// 获取对应的从节点发送的 Ip sds ip = c->argv[j+1]->ptr;// Ip 的长度判断if (sdslen(ip) < sizeof(c->slave_ip)) {// 保存到客户端的 client 的 slave_ip 属性memcpy(c->slave_ip,ip,sdslen(ip)+1);} else {// 错误提示addReplyErrorFormat(c,"REPLCONF ip-address provided by replica instance is too long: %zd bytes", sdslen(ip));return;}} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {// 省略} else {// 响应错误addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr);return;}}// 响应 OK addReply(c,shared.ok);
}
可以看到主节点收到从节点发送过来的 Ip
- 会保存到从节点客户端 client 的 slave_ip 字段
- 响应一个 ok 字符串
6.3 从节点收到主节点对自己发送 Ip 命令的响应
从节点收到主节点的响应后, 同样是由事件轮询触发 syncWithMaster 函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 收到主节点对发送 IP 请求的响应if (server.repl_state == REPL_STATE_RECEIVE_IP) {err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);sdsfree(err);// 状态变为等待发送发送能力状态server.repl_state = REPL_STATE_SEND_CAPA;}
}
主从复制的第六步逻辑
- 如果没有配置从节点用于主从复制的专门 Ip, 则直接进入到 REPL_STATE_SEND_CAPA (准备发送从节点支持的同步能力)
- 如果配置了专门的主从复制 Ip
1 从节点向主节点发送 replconf ip-address Ip, 同时进入到 REPL_STATE_RECEIVE_IP (等待主节点响应收到从节点的 IP 地址)
2 主节点收到 replconf ip-address Ip 命令后, 将其存到自身维护的客户端对象的 slave_ip 字段, 向其响应了一个 ok 的字符串
3 从节点收到主节点对自身的 replconf 命令的响应后, 将自身从 REPL_STATE_RECEIVE_PORT 切换到 REPL_STATE_SEND_CAPA (准备发送从节点支持的同步能力)
7 发送支持的同步能力
收到主节点响应的 Ip 请求, 从节点的状态切换为了 REPL_STATE_SEND_CAPA (准备发送从节点支持的复制能力),
如果从节点没有配置 slave-announce-ip, 也就不会有发送 IP 相关的操作, 也会直接过度到 REPL_STATE_SEND_CAPA。
这一步主要是兼容不同版本的 Redis, 主节点需要知道当前从节点支持的复制能力, 才可以决定如何和从节点进行数据复制。
状态切换到 REPL_STATE_SEND_CAPA 后, 会继续下面的逻辑, 同样在 syncWithMaster 函数。
7.1 发送从节点支持的复制能力
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 没有配置宣布的 IP, slave_announce_ip 为空, 直接跳过发送 IP 的阶段if (server.repl_state == REPL_STATE_SEND_IP && server.slave_announce_ip == NULL) {// 进入下一个节点 准备发送从节点的发送能力server.repl_state = REPL_STATE_SEND_CAPA;}// 省略// 收到主节点对发送 IP 请求的响应if (server.repl_state == REPL_STATE_RECEIVE_IP) {// 状态变为等待发送发送能力状态server.repl_state = REPL_STATE_SEND_CAPA;}if (server.repl_state == REPL_STATE_SEND_CAPA) {// 发送从节点支持的复制能力到主节点 // replconf capa eof capa psync2err = sendSynchronousCommand(SYNC_CMD_WRITE, fd, "REPLCONF", "capa", "eof", "capa", "psync2", NULL);if (err) goto write_error;sdsfree(err);// 状态修改为 REPL_STATE_RECEIVE_CAPA, 等待主节点响应发送能力的应答server.repl_state = REPL_STATE_RECEIVE_CAPA;return;}
}
从节点发送过去的支持的 2 种复制能力
- eof: 全量复制, 正常 全量复制时, 主节点将数据持久化为一个文件, 然后将文件发送给从节点, EOF 则表示直接将数据通过 Socket 直接发送给从节点
- psync2: 部分复制, 利用复制积压缓冲区等实现部分复制
7.2 主节点响应从节点的复制能力请求
主节点收到从节点的 replconf capa eof capa psync2 请求后, 执行的逻辑如下
void replconfCommand(client *c) {int j;// 参数需要是 2 的倍数if ((c->argc % 2) == 0) {addReply(c,shared.syntaxerr);return;}for (j = 1; j < c->argc; j+=2) {// 每个循环使用 2 个参数if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {if (!strcasecmp(c->argv[j+1]->ptr,"eof"))// SLAVE_CAPA_EOF = 1c->slave_capa |= SLAVE_CAPA_EOF;else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))// SLAVE_CAPA_PSYNC2 = 2 c->slave_capa |= SLAVE_CAPA_PSYNC2;// 如果不支持的能力, 不做处理} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {// 省略} else {// 响应错误addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr);return;}}// 响应 OK addReply(c,shared.ok);
}
主节点收到从节点发送的复制能力
- 会保存到从节点客户端 client 的 slave_capa 字段
- 响应一个 ok 字符串
7.3 从节点收到主节点对自己发送支持的复制能力 命令的响应
从节点收到主节点的响应后, 从节点同样是事件轮询触发 syncWithMaster 函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 读取主节点发送过来的信息if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {err = sendSynchronousCommand(SYNC_CMD_READ, fd, NULL);sdsfree(err);// 变更状态为 REPL_STATE_SEND_PSYNC (向主节点发送 psync 命令, 请求同步复制)// 向主节点发送 psync 命令, 请求全量复制server.repl_state = REPL_STATE_SEND_PSYNC;}// 省略
}
主从复制的第七步逻辑
- 从节点向主节点发送 replconf capa eof capa psync2, 同时进入到 REPL_STATE_RECEIVE_CAPA (等待主节点响应收到支持的复制能力的应答)
- 主节点收到 replconf capa eof capa psync2 命令后, 将其存到自身维护的客户端对象的 slave_capa 字段
- 从节点收到主节点对自身的 replconf 命令的响应后, 将自身从 REPL_STATE_RECEIVE_CAPA 切换到 REPL_STATE_SEND_PSYNC (向主节点发送 psync 命令, 请求同步复制)
8 发送 PSYNC 命令
上面 7 步只是主从复制前的准备, 而到了 psync 这步, 就是真正的复制开始了。而且这个过程涉及到全量复制 + 部分复制等情况, 有的绕。
8.1 从节点发送 psync 命令, 通知主节点开始复制
从节点收到了主节点对其同步能力的响应后, 接着会发送一个 psync 的命令给主节点, 这个请求就是同步复制的真正开始了
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 读取主节点发送过来的信息if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {// 省略// 向主节点发送 psync 命令, 请求全量复制server.repl_state = REPL_STATE_SEND_PSYNC;}// 发送 psync 命令if (server.repl_state == REPL_STATE_SEND_PSYNC) {// 入参的 0 表示写消息给主节点, 1 表示从主节点读取数据// 入参 0 的逻辑, 根据当前是否缓存了主节点 (cached_master 是否为空), 来发送 psync 命令, 为空, 发送全量复制请求, 不为空, 发送部分复制请求// 部分复制, 最终发送的命令 psync replid repl_offset (主节点还是会判断这 2 个的值, 最终决定是全量还是部分)// 全量复制, 最终发送的命令 psync ? -1// 具体代码逻辑, 可以查看 replication 文章的 slaveTryPartialResynchronization 方法解析if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {err = sdsnew("Write error sending the PSYNC command.");goto write_error;}// 切换状态为 REPL_STATE_RECEIVE_PSYNC (等待 psync 应答)server.repl_state = REPL_STATE_RECEIVE_PSYNC;return;}
}
slaveTryPartialResynchronization 函数可以根据入参进行写消息给主节点和读取主节点消息, 因为上面只涉及到写的操作,下面梳理的是写部分的逻辑
- 声明了 2 个变量 psync_replid 和 psync_offset, 前者用来存储缓存主节点的 replid, 后者存储的是当前从节点同步复制积压缓冲区的位置
- 首先根据自身保存的 server.cached_master 是否为空, 得出 2 个变量的值
- 如果 server.cached_master 不为空, psync_replid 等于 cached_master.replid, psync_offset 等于 cached_master.reploff + 1
- 如果 server.cached_master 为空, psync_replid 等于 ?, psync_offset 等于 -1
- server.cached_master 不为空, 表示从节点上次是有主节点的, 当前可能是重启等情况, 导致从节点重新走了一次复制的流程, 可以尝试进行部分复制, 不进行全量复制
- 向主节点发送 psync <psync_replid> <psync_offset>, 也就是 psync ? -1 或者 psync replid offset, 后者主节点收到后会直接判定为全量复制, 后者主节点会判断是否可进行部分复制
上面就是 slaveTryPartialResynchronization 写操作的逻辑, 也是从节点向主节点发送 psync 的过程
8.2 主节点收到 psync 命令, 为开始复制做准备
主节点收到从节点的发送过来的同步请求命令 psync, 执行的逻辑如下
void syncCommand(client *c) {// 客户端不是从节点, 直接返回if (c->flags & CLIENT_SLAVE) return;// 当前节点是另一个节点的从节点, 同时节点的状态不是 REPL_STATE_CONNECTED (已经连接状态), 直接返回if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));return;}// 判断 client c 的 bufpos != 0 || reply 有数据// 也就是判断当前节点有数据准备发送给从节点, 是的话, 直接返回if (clientHasPendingReplies(c)) {addReplyError(c,"SYNC and PSYNC are invalid with pending output");return;}// 执行的命令为 psyncif (!strcasecmp(c->argv[0]->ptr, "psync")) {// psync repl_id repl_offset// 主节点尝试进行部分同步复制 // 可以部分复制 stat_sync_partial_ok 部分复制次数 + 1, 然后直接结束// 这段代码的核心, 尝试进行部分复制, 不行返回内会返回 C_ERR, 走到下面全量复制的逻辑// 可以结合下面的说明继续分析// 具体代码逻辑, 可以查看 replication 文章的 masterTryPartialResynchronization 方法解析if (masterTryPartialResynchronization(c) == C_OK) {server.stat_sync_partial_ok++;return;}char *master_replid = c->argv[1]->ptr;// 从节点指定了 replid, 但是现在部分复制失败了if (master_replid[0] != '?') // 部分同步复制失败次数 + 1server.stat_sync_partial_err++;} else {// 执行的命令不是 psync, 按照 sync 命令处理c->flags |= CLIENT_PRE_PSYNC;}// 全量复制 --> 可以结合下面的 8.2.1 一起使用 // 全量复制次数 + 1server.stat_sync_full++;// 修改从节点的状态为 SLAVE_STATE_WAIT_BGSAVE_START (等待 bgsave 的开始, 也就是 RDB 文件的创建)c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;// 关闭了 TCP_NODELAY 功能if (server.repl_disable_tcp_nodelay)// 启用 nagle 算法anetDisableTcpNoDelay(NULL, c->fd);c->repldbfd = -1;// 客户端设置从节点标识c->flags |= CLIENT_SLAVE;// 把当前的客户端添加到从节点列表listAddNodeTail(server.slaves,c); // 如果有需要, 创建复制积压缓冲区// 条件: 从节点只有 1 个, 复制积压缓冲区为空if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {// 生成 replid, 存放到 server.replid 中changeReplicationId();// 清除 replid2 和 second_replid_offset// 具体逻辑可以查看 replication 文章的 clearReplicationId2 方法解析clearReplicationId2();// 创建复制积压缓冲区createReplicationBacklog();}if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) {// 正在执行 RDB, 同时类型是写入磁盘client *slave;listNode *ln;listIter li;listRewind(server.slaves,&li);// 遍历所有的从节点, 找到第一个节点的状态为 SLAVE_STATE_WAIT_BGSAVE_END (等待 bgsave 的结束, 即等待 RDB 文件的创建结束)while((ln = listNext(&li))) {slave = ln->value;if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;}// 有找到对应的节点, 当前客户端的节点的复制能力和找到的节点的复制能力一样if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {// 把找到的节点的输出缓存复制到当前的客户端// 将 slave 的 buf 拷贝到 c 的 buf// 将 slave 的 reply_bytes 拷贝到 c 的 reply_bytes, 响应缓冲区// 将 slave 的 bufpos 设置等于 c 的 bufposcopyClientOutputBuffer(c,slave);// 更新从节点客户端的偏移量, 状态和发送全量复制消息给从节点// 这个命令会发送 FULLRESYNC replid offset\r\n 的响应给从节点, 可以看做是对 psync 命令的响应// 具体逻辑可以查看 replication 文章的 replicationSetupSlaveForFullResync 方法解析replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);}} else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {// 正在执行将主节点的数据写入到 Socket, 直接结束} else { // 没有在执行 RDB // 复制类型为无盘同步 同时 当前的客户端支持 EOF 的同步方式if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {// 支持延迟无盘同步, 打印日志后结束// 后续在定时器执行的 replicationCron 函数时, 会创建出子进程进行同步// 延迟一段时间, 可以等待几个从节点, 后面同步处理} else {// 没有子进程正在执行 BGSAVE, 且没有进行写 AOF 文件, 则开始为复制执行 BGSAVE, 并且是将 RDB 文件写到磁盘上if (server.aof_child_pid == -1) {// 内部和 RDB 的操作类似, 分为主子进程, fork 出子进程后, 子进程进行 RDB 的生成// 主进程也会通过 replicationSetupSlaveForFullResync 函数进行 psync 的应答// 具体逻辑可以查看 replication 文章的 startBgsaveForReplication 方法解析startBgsaveForReplication(c->slave_capa);}}}return;
}
这段代码的核心 masterTryPartialResynchronization,
内部会根据 psync 的 2 个参数 repl_id + repl_offset 和自身的 replid 和 replid2 和复制积压缓冲区的情况, 判断是否走部分复制
1 入参的 repl_id 和当前主节点的 replid 和 replid2 (自己保存的主节点的 id) 不一样
2 入参的 repl_id 和当前主节点的 repl_id2 一样, 但是 repl_offset 和当前主节点的 second_replid_offset 大 (以前同步于同一个主节点, 但是现在的从节点比自己同步的快)
3 请求的偏移量 repl_offset 小于 repl_backlog_off (复制积压缓冲区最老的位置) , 说明 backlog 所备份的数据的已经太新了, 需要的已经不在复制积压缓冲区中
4 请求的偏移量 repl_offset 大于 repl_backlog_off + repl_backlog_histlen (复制积压缓冲区的容量大小) , 表示当前 backlog 的数据不够全,从节点超过了复制积压缓冲区的最新数据了, 需要进行全量复制
出现上面 4 种情况的其中一种, 就会进入到全量复制
8.2.1 masterTryPartialResynchronization 判断需要全量复制
- 设置当前的客户端为 SLAVE_STATE_WAIT_BGSAVE_START (等待 bgsave 开始, 也就是 RDB 文件的创建)
- 把当前的客户端加入维护的从节点客户端列表中
- 如果从节点客户端列表只有 1 个节点同时复制积压缓冲区为空, 创建复制积压缓冲区
- 如果当前主节点在执行自身的持久化 RDB, 并且从节点客户端列表中找到第一个状态为 SLAVE_STATE_WAIT_BGSAVE_END (等待 RDB 创建文件结束) 同时复制能力和自己一样的
4.1 将这个节点的响应缓冲区的数据拷贝一份到自己的输出缓冲区 (自身不需要在去触发一次 RDB 了, 复用它已有的输出数据)
4.2 修改状态为自身的状态为 SLAVE_STATE_WAIT_BGSAVE_END (等待 RDB 创建文件结束)
4.3 同时向从节点发送 +FULLRESYNC replid offset\r\n (避免从节点发一次全量复制就触发一次 RDB, 复用已有的)
4.4 发送 +FULLRESYNC replid offset\r\n 给从节点
- 如果当前主节点在执行主从复制的 RDB, 直接结束方法, 避免大量的子进程写文件, 拖垮主节点
- 当前的主节点支持无盘延迟同步同时当前从节点支持 EOF 也就是网络同步, 那么先结束, 后续由定时任务触发 replicationCron 函数, 创建 RDB 文件, 这样可以延迟同步, 延迟的这段时间可能等到其他需要全量复制和同配置的从节点, 到时可以 bgsave 出一个文件, 共用 (replicationCron 1s 触发一次, 每次会检查所有的从节点和当前主节点的上次交流的时间差, 有一个超过了配置的 repl_diskless_sync_delay 时间, 就立即主从复制)
- 当前主节点在 AOF 持久化中, 提前结束
- 如果当前的从节点支持 EOF 复制能力同时主节点设置了支持无盘同步
8.1 在节点客户端列表中找到所有状态为 SLAVE_STATE_WAIT_BGSAVE_START (等待 bgsave 开始, 也就是开始创建 RDB 文件) 并且支持 psync 命令的客户端, 发送 +FULLRESYNC replid offset\r\n, 并修改状态为 SLAVE_STATE_WAIT_BGSAVE_END (等待 RDB 创建文件结束)
8.2 fork 出一个子进程, 将当前主节点的数据以 EOF 要求的格式写入到满足上一步的所有从节点的 Socket 中, 并阻塞住, 全部写完后一次性全部发送过期
- 如果当前的从节点不支持 EOF 复制能力
9.1 fork 出一个子进程, 将当前主节点的数据按照 RDB 的方式保存到文件中
9.1 在节点客户端列表 中找到所有状态为 SLAVE_STATE_WAIT_BGSAVE_START (等待 bgsave 开始, 也就是开始创建 RDB 文件) 并且支持 psync 命令的客户端, 发送 +FULLRESYNC replid offset\r\n, 并修改状态为 SLAVE_STATE_WAIT_BGSAVE_END (等待 RDB 创建文件结束)
8.2.2 masterTryPartialResynchronization 判断可以部分复制
- 设置当前的从节点 repl_put_online_on_ack 为 0, 也就是还未 ack
- 把当前的从节点加入动节点客户端列表
- 如果当前的从节点支持 psync2 则向其发送 +CONTINUE 主节点当前的 repl_id \r\n, 否则发送 +CONTINUE\r\n
- 根据入参的 repl_offset 从复制积压缓冲区获取对应的数据, 写入到客户端的输出缓冲区中
上面就是主节点收到从节点 psync 命令, 为开始复制做准备, 虽然全量复制和部分部分复制有些不同, 但是都可以简单的看为, 将复制的数据写入到从节点的输出缓冲区中, 等待发送。
8.3 从节点收到主节点对 psync 命令的响应
从节点收到主节点的响应后, 从节点同样是事件轮询触发 syncWithMaster 函数。
在从节点发送出 psync 命令后, 状态为 REPL_STATE_RECEIVE_PSYNC (等待 psync 应答), 继续从这个状态走下去
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {// 省略// 状态不是 REPL_STATE_RECEIVE_PSYNC 直接失败if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {goto error;}// 读取主节点的响应, 如果是全量复制, 响应为 PSYNC_FULLRESYNC, 部分复制, 响应为 PSYNC_CONTINUE// 具体逻辑可以查看 replication 文章的 slaveTryPartialResynchronization 方法读的部分逻辑psync_result = slaveTryPartialResynchronization(fd,1);// 等待重新执行if (psync_result == PSYNC_WAIT_REPLY) return; // 主节点正处于暂时性错误状态if (psync_result == PSYNC_TRY_LATER) goto error;// psync 命令主节点完成, 判断可以部分复制, 从节点可以继续执行其他的, 到这步直接结束if (psync_result == PSYNC_CONTINUE) {return;}// 释放所有的从节点连接disconnectSlaves();// 清空已有的复制积压缓冲区// 具体逻辑可以查看 replication 文章的 freeReplicationBacklog 方法解析freeReplicationBacklog();// 主节点无法识别 psync, 需要尝试执行 sync if (psync_result == PSYNC_NOT_SUPPORTED) {// 不支持 psync, 则发送 syncif (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {goto error;}}char tmpfile[256], *err = NULL;int dfd = -1, maxtries = 5;// 尝试创建一个临时文件, 失败的话, 进行重试, 可以重试 5 次while(maxtries--) {// 创建一个临时文件snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);if (dfd != -1) break;sleep(1);}// 创建文件失败if (dfd == -1) {goto error;}// 为创建的文件描述符 添加可读事件, 用来下载监听到的数据, 也就是父级发送过来的 RDB, 处理函数为 readSyncBulkPayloadif (aeCreateFileEvent(server.el, fd, AE_READABLE, readSyncBulkPayload, NULL) == AE_ERR) {goto error;}// 修改状态为 REPL_STATE_TRANSFER (开始接收 RDB 文件)server.repl_state = REPL_STATE_TRANSFER;// 更新复制相关变量的值server.repl_transfer_size = -1;server.repl_transfer_read = 0;server.repl_transfer_last_fsync_off = 0;server.repl_transfer_fd = dfd;server.repl_transfer_lastio = server.unixtime;server.repl_transfer_tmpfile = zstrdup(tmpfile);return;
}
slaveTryPartialResynchronization 函数可以根据入参进行写消息给主节点和读取主节点消息, 这里需要读取主节点的响应数据, 所以下面梳理的是读部分的逻辑
- 读取到的内容为 +FULLRESYNC
1.1 将当前的从节点的 server 的 master_replid 和 master_initial_offset 设置为 +FULLRESYNC 后面的 replId 和 replId_offset
1.2 释放到 server.cached_master 的缓存主节点
1.3 返回 PSYNC_FULLRESYNC
- 读取到的内容为 +CONTINUE
2.1 获取 +CONTINUE 后面的 replId, 如果和已有的 server.cached_master 的 replId 不一致, 则将 server.cached_master 的 replId 和 offset 变为 replId2 和 offset2, 参数的 replId 变为 server.cached_master 的 replId, 同时断开所有自己所有从节点的连接
2.2 将原本的 server.cached_master 晋升为 server.master, server.cached_master 变为 null
2.3 将当前的从节点状态变为 REPL_STATE_CONNECTED (已连接上主节点)
2.4 为当前的主节点注册一个可读的事件, 执行函数为 readQueryFromClient
2.5 返回 PSYNC_CONTINUE
上面就是 slaveTryPartialResynchronization 读操作的逻辑, 整合 syncWithMaster 的 REPL_STATE_RECEIVE_PSYNC 状态的核心逻辑如下
- 收到了 +FULLRESYNC
1.1 创建一个临时的 RDB 文件, 向事件轮询注册一个可读的事件, 执行函数 readSyncBulkPayload, 用于将主节点发送的数据写入到临时文件中和其他的逻辑
1.2 自身状态变更为 REPL_STATE_TRANSFER (开始接收 RDB 文件)
1.3 更新自身的主从复制相关的信息
全量复制整个过程串起来如下:
- 收到了 +CONTINUE
2.1 将当前的从节点状态变为 REPL_STATE_CONNECTED (已连接上主节点)
2.2 向事件轮询注册一个可读的事件, 执行函数 readQueryFromClient, 用于读取并处理主节点发送过来的命令
部分复制整个过程串起来如下:
9 主节点向从节点推送数据
上面的步骤基本就是主从节点为数据同步做的前期准备, 主从节点只是做好了发送同步数据的准备, 实际此时还是没有数据的复制的 (无盘同步先写入到 Socket, 写完直接全部发送, 因为有不确定暂时认为未复制)。
从第 8 位可以知道, 在主节点将自身的数据发送给从节点的方式有 2 种
1 只能全量复制, 主节点所有数据的以 RDB 数据发送给从节点, 这里又细分为 2 种, 无盘同步, 直接将数据通过 Socket 发送过去, 非无盘同步, 先将数据保存为 rdb 文件, 再发送文件
2 可以部分复制, 从主节点复制积压缓冲区中获取的数据
对于这 2 种数据, 主节点有 2 种方式发送数据给从节点。
9.1 主节点和从节点全量复制的数据
9.1.1 主节点向推送节点全量复制的数据
前期的准备完成后, 主节点会尝试发送数据到从节点, 触发发送的链路如下:
- 定时函数 serverCron, 判断出当前的 RDB 子进程不为空, 同时子进行已经完成, 触发 backgroundSaveDoneHandler 函数
- backgroundSaveDoneHandler 中无论什么类型的 RDB (Socket/磁盘), 都会走到 updateSlavesWaitingBgsave 函数
- updateSlavesWaitingBgsave 中, 遍历所有的从节点
4.1 所有从节点中任意有一个的状态为 SLAVE_STATE_WAIT_BGSAVE_START (等待 bgsave 开始, 也就是开始创建 RDB 文件), 触发第 8.2.1 中第 8 步的逻辑
4.2 遍历的从节点状态等于 SLAVE_STATE_WAIT_BGSAVE_END 并且同步类型为 Socket 同步, 则当前的从节点的状态变为 SLAVE_STATE_ONLINE (在线), 同时 repl_put_online_on_ack = 1, 从节点已 ack
4.3 遍历的从节点状态等于 SLAVE_STATE_WAIT_BGSAVE_END 并且同步类型不是 Socket 同步, 向事件轮询注册发送数据事件 sendBulkToSlave, 从节点的状态变为 SLAVE_STATE_SEND_BULK (批量发送数据中)
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {client *slave = privdata;UNUSED(el);UNUSED(mask);char buf[PROTO_IOBUF_LEN];ssize_t nwritten, buflen;// 发送序言, 这里只是简单发送出了文件的大小, 格式 $<length>\r\n"if (slave->replpreamble) {nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));if (nwritten == -1) {freeClient(slave);return;}// 更新已经写到网络的字节数server.stat_net_output_bytes += nwritten;// 保留未写的字节, 删除已写的字节sdsrange(slave->replpreamble,nwritten,-1);// 如果已经写完了, 则释放 replpreambleif (sdslen(slave->replpreamble) == 0) {sdsfree(slave->replpreamble);// 设置为 null, 下次进来不会再触发slave->replpreamble = NULL;} else {return;}}// slave->repldbfd 代表 RDB 文件// 将文件指针移动到 slave->repldboff 位置, 准备读操作lseek(slave->repldbfd, slave->repldboff, SEEK_SET);// PROTO_IOBUF_LEN = 1024 * 16, 16kb// 从 RDB 文件的 slave->repldboff 位置读取 16k 数据的内容保存在 buf 中buflen = read(slave->repldbfd, buf, PROTO_IOBUF_LEN);// 读取不到数据, 结束if (buflen <= 0) {freeClient(slave);return;}// 将读取到的 16k 数据写到从节点if ((nwritten = write(fd, buf, buflen)) == -1) {// 写入失败if (errno != EAGAIN) {freeClient(slave);}return;}// 到了这里, 大概可以理解这个发送 RDB 文件的过程不是一次性的// 每次发送 16k, 下次事件轮询触发, 再触发发送 16k, 知道全部写完// 更新主节点已经向从节点同步的字节数slave->repldboff += nwritten;// 更新服务器已经写到网络的字节数server.stat_net_output_bytes += nwritten;// 如果写入完成, 即从网络读到的大小等于文件大小, 写完了if (slave->repldboff == slave->repldbsize) {// 关闭 RDB 文件流close(slave->repldbfd);slave->repldbfd = -1;// 删除等待从节点的文件可写事件aeDeleteFileEvent(server.el, slave->fd, AE_WRITABLE);// 更新从节点的状态为 SLAVE_STATE_ONLINE (在线状态) 和 repl_put_online_on_ack = 0 (ack 为未应答)// 为当前的从节点注册一个写事件, 触发函数为 sendReplyToClient, 将输出缓冲区中的数据发送给从节点, 也就是后续有新数据写给从节点, 就由 sendReplyToClient 触发发送// 具体逻辑可以查看 replication 文章的 putSlaveOnline 方法解析putSlaveOnline(slave);}
}
主节点的事件轮询触发 sendBulkToSlave 函数, 逻辑大体如下
- 先向从节点发送了当前 RDB 文件的大小
- 间断地将 RDB 文件的数据按照 16k 的大小发送给从节点
- RDB 文件全部写给从节点后, 修改从节点的状态为 SLAVE_STATE_ONLINE (在线), 但是 repl_put_online_on_ack = 0, 从节点未来 ack
9.1.2 从节点接收主节点推送的全量复制的数据
在 8.3 中从节点收到主节点对 psync 命令的响应, 判断到只能全量复制, 就注册了一个 readSyncBulkPayload 函数, 用来读取主节点发送过来的 RDB 数据
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {char buf[4096];ssize_t nread, readlen, nwritten;off_t left;UNUSED(el);UNUSED(privdata);UNUSED(mask);static char eofmark[CONFIG_RUN_ID_SIZE];static char lastbytes[CONFIG_RUN_ID_SIZE];static int usemark = 0;// 没有传输过任何数据if (server.repl_transfer_size == -1) {// 读取 1024 个字节的数据到 buf 中if (syncReadLine(fd, buf, 1024, server.repl_syncio_timeout*1000) == -1) {goto error;} if (buf[0] == '-') {// 主节点返回错误信息goto error;} else if (buf[0] == '\0') {// 回复了一个空行, 只是为了像 ping 一样让连接存活, 只是简单的刷新了一些最新的传输时间server.repl_transfer_lastio = server.unixtime;return;} else if (buf[0] != '$') {// 协议格式错误goto error;}// CONFIG_RUN_ID_SIZE 40 // 响应的内容为 EOF: 同时 buf 第 5 位后的字符串长度大于 40 (replid 的最大长度)// Socket 方式发送过来的数据if (strncmp(buf+1, "EOF:", 4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {// 使用 EOF 标记模式usemark = 1;// 2 个静态变量, 主要是保存 EOF 的中的 replidmemcpy(eofmark, buf+5, CONFIG_RUN_ID_SIZE);memset(lastbytes, 0, CONFIG_RUN_ID_SIZE);// 修改为 0, 防止下次调用又跑到这段逻辑里面server.repl_transfer_size = 0;} else {// 使用固定长度模式 (RDB 文件的方式, 主节点默认每次发送 16k 数据)usemark = 0;// 读取长度大小server.repl_transfer_size = strtol(buf+1, NULL, 10);}return;}// 根据模式, 计算这次读多少数据if (usemark) {// 每次读取缓冲区大小的数据readlen = sizeof(buf);} else {// 读取剩余数据与缓冲区大小的较小值left = server.repl_transfer_size - server.repl_transfer_read;readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);}// 读取数据到 buf 中nread = read(fd, buf, readlen);if (nread <= 0) {// 读取失败, 取消握手// 具体逻辑可以查看 replication 文章的 cancelReplicationHandshake 方法解析 cancelReplicationHandshake();return;}server.stat_net_input_bytes += nread;int eof_reached = 0;if (usemark) {// 当使用 EOF 标记模式时,需要检测是否已经到达数据结尾// 更新 lastbytes 并检查是否匹配 EOF 标记if (nread >= CONFIG_RUN_ID_SIZE) {memcpy(lastbytes, buf+nread-CONFIG_RUN_ID_SIZE, CONFIG_RUN_ID_SIZE);} else {int rem = CONFIG_RUN_ID_SIZE - nread;memmove(lastbytes,lastbytes+nread,rem);memcpy(lastbytes+rem,buf,nread);}// 通过维护一个 lastbytes 数组,存储最近接收到的字节,与 eofmark 进行比较// 如果匹配,表示已经接收到完整的数据,设置 eof_reached 标志if (memcmp(lastbytes, eofmark, CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;}// 将读取到的数据写入到 repl_transfer_fd 临时文件中server.repl_transfer_lastio = server.unixtime;if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) {goto error;}// 更新同步的字节数server.repl_transfer_read += nread;if (usemark && eof_reached) {// EOF 模式, 同时读取到末尾了// 计算文件 repl_transfer_read 的大小, 也就是把末尾的 40 个字节去掉if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1){goto error;}}// 当前读取到的数据大于上次刷盘数据的 REPL_MAX_WRITTEN_BEFORE_FSYNC 8M, 进行刷盘if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC){off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off;rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size);server.repl_transfer_last_fsync_off += sync_size;}// 在固定长度模式下, 检查已读取的字节数是否等于预期的总长度, 确定传输是否完成if (!usemark) {if (server.repl_transfer_read == server.repl_transfer_size)eof_reached = 1;}if (eof_reached) {// aof 是否启用状态int aof_is_enabled = server.aof_state != AOF_OFF;// 当前节点在 RDB 中, 则结束掉这个 RDB 过程if (server.rdb_child_pid != -1) {kill(server.rdb_child_pid,SIGUSR1);rdbRemoveTempFile(server.rdb_child_pid);}// 最后一次刷盘if (fsync(server.repl_transfer_fd) == -1) {cancelReplicationHandshake();return;}// 把从主节点同步的 RDB 文件重命名为真实的 RDB 文件if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {cancelReplicationHandshake();return;}// 停止 AOF 写入, 防止在清空和加载数据时出现写时复制(COW)问题,导致内存占用过高if(aof_is_enabled) stopAppendOnly();signalFlushedDb(-1);emptyDb(-1, server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, replicationEmptyDbCallback);// 删除文件事件,避免再次调用aeDeleteFileEvent(server.el, server.repl_transfer_s, AE_READABLE);rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;// 将新的 RDB 文件数据加载到内存中if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {// 加载失败cancelReplicationHandshake();// 重写开始 AOF 同步if (aof_is_enabled) restartAOFAfterSYNC();return;}// 释放临时文件名的内存zfree(server.repl_transfer_tmpfile);// 关闭文件描述符close(server.repl_transfer_fd);// 创建与主节点的新客户端连接, 同时指定数据库replicationCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db);// 更新复制状态为已连接, 重置下线时间server.repl_state = REPL_STATE_CONNECTED;server.repl_down_since = 0;memcpy(server.replid,server.master->replid,sizeof(server.replid));server.master_repl_offset = server.master->reploff;clearReplicationId2();if (server.repl_backlog == NULL) createReplicationBacklog();// 有停掉的 AOF, 重新启动if (aof_is_enabled) restartAOFAfterSYNC();}return;error:cancelReplicationHandshake();return;
}
readSyncBulkPayload 的逻辑算是比较清晰的, 主节点每次发送数据给从节点, 就触发一次 readSyncBulkPayload
- 计算结束表示, EOF 方式 (Socket 同步) 数据以 EOF:40位的 replId 开始, 以 replId 结束, 而正常的 RDB 文件同步, 则会在以开始就发送数据的长度
- 将读取到的数据写入到临时的 RDB 文件中
- 通过判断读取到的数据是否等于一开始读取到的 prelId 或者读取到的长度等于一开始读取到的长度, 确定是否已经读取完成了
- 读取到了末尾了, 将临时的 RDB 文件重命名为真实的 RDB 文件, 清空数据库, 删除注册的 readSyncBulkPayload 函数, 将 RDB 中的数据加载到内存中, 更新从节点的状态为 REPL_STATE_CONNECTED (主从节点连接中)
到此, 从节点和主节点开始的全量复制完成了
9.2 主节点向从节点推送部分复制的数据
主节点向从节点推送部分复制的数据这个, 只需要了解赋值积压缓冲区存储的内容是 Redis 执行的请求命令, 就很容易了。
复制积压缓冲区是一个大小为 1M 的循环队列。主节点在命令传播时, 不仅会将命令发送给所有的从节点, 还会将命令写入复制积压缓冲区中。
复制积压缓冲区最多可以备份 1M 大小的数据, 主节点会不断往里面写新的数据, 同时淘汰旧的。如果主从节点断线时间过长, 复制积压缓冲区的数据被新数据覆盖, 旧节点需要的数据已经不在里面, 就只能走全量复制, 否则可以从断开的位置继续同步复制积压缓冲区中的数据。
在 8.2.2 中判断可以部分复制时, 主节点的逻辑就是将复制积压缓冲区中将从节点指定的 repl_offset 后面的请求命令发送给从节点
从节点收到对应的命令重新执行一遍, 同时更新一下自己维护的 repl_offset 就行了
void processInputBuffer(client *c) {// 省略if (processCommand(c) == C_OK) {// 执行命令成功, 更新主从复制同步的偏移量if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;}// 省略
}
到此, 从节点和主节点开始的部分复制完成了
10 运行中的数据同步 - 命令传播
经过 psync 命令后, 也就是第一次复制后, 主从节点之间数据都同步了, 但是后续如果主节点继续数据的变更, 又会不一致。
为了数据的一致, 主节点应该有种方式将自身的变更同步到从节点, 这个实现的步骤就是命令传播。
在执行 Redis 命令的函数 call 中, 里面会根据执行的命令和客户端的类型等元素, 判断是否需要执行 propagate 函数, propagate 函数就是命令传播的方法。
propagate 函数很简单根据入参的标识判断是否需要进行复制传播, 如果判断为是, 会执行 replicationFeedSlaves 方法
replicationFeedSlaves 方法的执行逻辑如下:
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {listNode *ln;listIter li;int j, len;char llstr[LONG_STR_SIZE];// 有配置主节点, 直接返回if (server.masterhost != NULL) return;// 没有复制积压缓冲区 backlog 且没有从节点, 直接返回if (server.repl_backlog == NULL && listLength(slaves) == 0) return; if (server.slaveseldb != dictid) { // 如果当前从节点使用的数据库不是目标的数据库, 则要生成一个 select 命令robj *selectcmd;// 0 <= id < 10, 可以使用共享的 select 命令对象if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {selectcmd = shared.select[dictid];} else {// 按照协议格式构建 select 命令对象int dictid_len;dictid_len = ll2string(llstr,sizeof(llstr),dictid);selectcmd = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr));}// 把当前的 select 命令写入到复制积压缓冲区if (server.repl_backlog) // 具体逻辑可以查看 replication 文章的 feedReplicationBacklogWithObject 方法解析feedReplicationBacklogWithObject(selectcmd);listRewind(slaves,&li);// 遍历所有的从节点while((ln = listNext(&li))) {client *slave = ln->value;// 从节点服务器状态为等待 bgsave 的开始, 因此跳过回复, 遍历下一个节点if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;// 添加 select 命令到当前从节点的回复中 addReply(slave,selectcmd);}// 释放临时对象if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)decrRefCount(selectcmd);}// 设置当前从节点使用的数据库 IDserver.slaveseldb = dictid;// 将命令写入一份 backlog, 也就是复制积压缓冲区if (server.repl_backlog) {char aux[LONG_STR_SIZE+3];// 拼写命令 *<argc>\r\naux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';// 写入复制积压缓冲区feedReplicationBacklog(aux,len+3);// 遍历所有的参数个数for (j = 0; j < argc; j++) {// 参数对象的长度long objlen = stringObjectLen(argv[j]);// 构建成协议标准的字符串, 并添加到 backlog 中// $<len>\r\n<argv>\r\n aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';// 添加 $<len>\r\nfeedReplicationBacklog(aux,len+3);// 添加参数对象 <argv>feedReplicationBacklogWithObject(argv[j]);// 添加\r\nfeedReplicationBacklog(aux+len+1,2);}}listRewind(slaves,&li);while((ln = listNext(&li))) {client *slave = ln->value;// 从节点的状态为等待 bgsave 开始跳过if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;// 将命令写给正在等待初次 SYNC 的从节点 (所以这些命令在输出缓冲区中排队, 直到初始 SYNC 完成), 或已经与主节点同步// 添加回复的长度addReplyMultiBulkLen(slave,argc);// 将所有的参数列表添加到从节点的输出缓冲区, 发送给从节点for (j = 0; j < argc; j++)addReplyBulk(slave,argv[j]);}
}
和 AOF 持久化一样, 再给从节点 client 写命令时, 会将 SELECT 命令强制写入, 以保证命令正确读到数据库中。
同时不仅写入了从节点 client 的输出缓冲区, 而且还会将命令记录到主节点服务器的复制积压缓冲区 server.repl_backlog 中, 这是为了网络闪断后进行部分复制做准备。
11 心跳机制
如果有留意的话, 可以发现上面有一行代码 slave->repl_put_online_on_ack = 0, 可以简单的猜到主从节点之间是有一个应答 (心跳) 的机制的。
在主从节点建立连接后, 他们之间都维护着长连接并彼此发送心跳命令。
主从节点彼此都有心跳机制, 各自模拟成对方的客户端进行通信。
主节点默认每隔 10 秒发送 PING 命令, 判断从节点的连接状态, 可以通过 repl-ping-salve-period 进行时间的配置, 默认为 10 秒
在定时函数 serverCron 中
// 如果当前节点是某个节点的主节点, 那么发送 PING 给从节点
if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {// 创建 PING 命令对象ping_argv[0] = createStringObject("PING",4);// 将 PING 发送给从服务器replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);// 对象的引用次数 - 1decrRefCount(ping_argv[0]);
}
从节点在主线程中每隔 1 秒发送 REPLCONF ACK 命令, 给主节点报告自己当前复制偏移量
// 定期发送 ack 给主节点, 旧版本的 Redis 除外
if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC))// 发送一个 replconf ack 命令给主节点 报告自身的 repl_offset// 具体逻辑可以查看 replication 文章的 replicationSendAck 方法解析replicationSendAck();
主节点 收到后, 同样是在 replconfCommand 中处理
void replconfCommand(client *c) {int j;if ((c->argc % 2) == 0) {addReply(c,shared.syntaxerr);return;}for (j = 1; j < c->argc; j+=2) {if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {// 省略} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {long long offset;// 不是从节点不做处理if (!(c->flags & CLIENT_SLAVE)) return;// 获取第二个参数 repl_offsetif ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))return;// 更新客户端对应的偏移量if (offset > c->repl_ack_off)c->repl_ack_off = offset; // 更新收到 ack 的时间为当前时间c->repl_ack_time = server.unixtime;// 客户端设置了收到 ack 时需要变更为在线状态 同时当前客户端的状态为 SLAVE_STATE_ONLINEif (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)putSlaveOnline(c);// 结束, 这个命令不需要响应任何信息return;} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {// 省略} else {addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr);return;}}addReply(c,shared.ok);
}
从节点还会在周期性函数 replicationCron() 中, 每次都要检查和主节点处于连接状态的从节点和主节点的交互时间是否超时,
如果超时则会调用 cancelReplicationHandshake() 函数, 取消和主节点的连接。 等到下一个周期在和主节点重新建立连接, 进行复制。
12 参考
Redis 复制(replicate)实现