Redis 主从复制(Replication) (4)

master 接收到PSYNC命令后,会调用syncCommand开启同步流程:

void syncCommand(client *c) { // 接收到 slave 发送的 PSYNC 命令 if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { return; // 无需全量同步,直接返回 } } // 若代码运行至此处,意味着部分同步失败,需要执行全量同步 // master 会执行 BGSAVE 命令生成快照并传输给 slave // 同步 RDB 快照的方式有两种: // 基于磁盘(Disk-backed):在磁盘生成 RDB 快照文件,然后再传输给 slave // 无盘(Diskless):直接将 RDB 快照数据写入 slave socket } int masterTryPartialResynchronization(client *c) { long long psync_offset; // 该 slave 最新的同步偏移量 char *master_replid; // slave 同步偏移量对应的 master 的 runid /* * 以下情况可以避免全量同步: * 1. slave 最近一次同步的 master 是当前实例(网络抖动) * 2. slave 与当前节点原本是同个 master 的从节点,且当前节点的同步偏移量 second_replid_offset 较大(维护重启、故障切换)*/ if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) ||psync_offset > server.second_replid_offset)) { goto need_full_resync; // 不满足 PSYNC 条件,需要执行全量同步 } /* * 以下情况只能执行全量同步: * 1. master 没有初始化积压缓冲 * 2. slave 的同步偏移量落后于积压缓冲 */ if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { goto need_full_resync; // 进行全量同步 } // 若代码运行至此处,意味着可以执行部分同步 listAddNodeTail(server.slaves,c); // 根据客户端是否兼容 PSYNC2,返回不同的 CONTINUE 响应 if (c->slave_capa & SLAVE_CAPA_PSYNC2) { buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); } else { buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); } // CONTINUE 命令后面,紧接着就是 server.repl_backlog 的内容 psync_len = addReplyReplicationBacklog(c,psync_offset); // ... } 心跳 & 命令传播

Redis 每秒会执行一次定时任务replicationCron,其中就包含主从同步间的心跳,可以发现主从双方的心跳频率是不一致的:

void replicationCron(void) { // slave 定时向 master 发送 REPLCONF ACK 命令 if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC)) { addReplyArrayLen(c,3); addReplyBulkCString(c,"REPLCONF"); addReplyBulkCString(c,"ACK"); addReplyBulkLongLong(c,c->reploff); } // master 定时向 slave 发送 PING 命令 if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && listLength(server.slaves)) { robj *ping_argv[1]; ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); } }

master 在调用call函数执行客户端传过来的命令时,会将命令传播给 slave 并同时写入积压缓冲:

void call(client *c, int flags) { // ... if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { // 当前命令是否需要传播 if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } } void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { // ... if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { // 如果当前节点没有 slave 节点或复制积压缓冲,立即返回 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; // 向 repl_backlog 中批量写入命令 if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; // 命令缓冲,用于序列化 redis 命令 /* 写入当前批次的命令数量 */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); /* 逐个遍历命令,将其序列化后写入 repl_backlog */ for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } // 将命令批量传播给所有 slaves 对应的 client listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; /* 写入当前批次的命令数量 */ addReplyArrayLen(slave,argc); /* 逐个遍历命令,将传播给 slave 节点 */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } } 相关参数 slave-serve-stale-data

 主从节点断开时或同步未完成时,slave 如何响应客户端请求

yes:正常响应命令,但是不保证数据质量

no:拒绝响应命令,返回 SYNC with master in progress

repl-diskless-sync

 执行全量同步时,master 如何将 RDB 快照传输给 slave

no:先在磁盘生成 RDB 文件再进行传输(低带宽网络)

yes:直接将 RDB 快照写入 slave 的 socket(低速磁盘 + 高带宽网络)

repl-ping-slave-period

 master 向 slave 发送 PING 心跳的间隔,默认 10s 发送一次

repl-backlog-size

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdyyw.html