master 端维护了一个定长的积压缓冲队列backlog。
master 向 slave 传播命令时会同时将命令放入该队列,因此缓冲区里会保留一部分最新的命令。
slave 发出同步请求时,如果 slave 的偏移量之后 (offset+1) 的数据存在于积压缓冲,master 才会执行部分同步。
slave 接收到SLAVEOF命令后,会调用replicaofCommand开始执行主从同步:
void replicaofCommand(client *c) { // ... if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { // 如果接收到的命令是 SLAVE NO ONE 则断开主从同步 // ... } } else { if (c->flags & CLIENT_SLAVE) { return; // 如果已经是客户端是一个 slave 节点,则拒绝该命令 } if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { return; // 如果已经连接上 SLAVEOF 中指定的 master 节点,则直接返回 } // 如果尚未连接任意 master 节点,则根据 masterhost 与 masterport 建立 TCP 连接 // 并注册监听函数 syncWithMaster } } void syncWithMaster(connection *conn) { // 向 master 节点发送 PING 命令 if (server.repl_state == REPL_STATE_CONNECTING) { server.repl_state = REPL_STATE_RECEIVE_PONG; err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL); // 发送 PING 命令 // ... } // 监听到 master 对 PING 命令的响应 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-NOPERM",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) { goto error; } server.repl_state = REPL_STATE_SEND_AUTH; // 只处理 master 响应值为 PONG、NOAUTH、NOPERM 的情况 } // 根据 master 对 PING 的响应值,判断是否需要授权 if (server.repl_state == REPL_STATE_SEND_AUTH) { if (server.masteruser && server.masterauth) { err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH", server.masteruser,server.masterauth,NULL); // 发送 AUTH 命令 // ... server.repl_state = REPL_STATE_RECEIVE_AUTH; } else { // 如果没有设置 server.masteruser 与 server.masterauth 授权信息,则跳过 AUTH server.repl_state = REPL_STATE_SEND_PORT; } } // 此处略过以下步骤: // 使用 REPLCONF listening-port 命令将 slave 的端口告知 master // 使用 REPLCONF ip-address 命令将 slave 的 IP 告知 master // 使用 REPLCONF capa eof / capa psync2 命令将 slave 兼容性(支持的特性)告知 master // 开始发送 PSYNC 命令 if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC; return; } // 读取 PSYNC 命令的响应 psync_result = slaveTryPartialResynchronization(conn,1); // 如果监听到 CONTINUE 响应,跳过全量同步 if (psync_result == PSYNC_CONTINUE) return; // 如果返回值为 PSYNC_FULLRESYNC 或 PSYNC_NOT_SUPPORTED // 开始执行执行全量同步,注册 readSyncBulkPayload 监听 RDB 文件下载 if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { // ... goto error; } server.repl_state = REPL_STATE_TRANSFER; // ... } int slaveTryPartialResynchronization(connection *conn, int read_reply) { if (!read_reply) { if (server.cached_master) { // server.cached_master 中存在记录,尝试执行部分同步 psync_replid = server.cached_master->replid; } else { psync_replid = "?"; // server.cached_master 中不存在记录,只能执行全量同步 } // 发起 PSYNC 命令 reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL); // ... return PSYNC_WAIT_REPLY; } reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); // 读取 PSYNC 响应 // 如果 master 响应 FULLRESYNC 则直接进行全量同步 if (!strncmp(reply,"+FULLRESYNC",11)) { // ... return PSYNC_FULLRESYNC; } // 如果 master 响应 CONTINUE 则尝试执行部分同步 if (!strncmp(reply,"+CONTINUE",9)) { // ... return PSYNC_CONTINUE; } // master 暂时无法处理 PSYNC 命令 —> PSYNC_TRY_LATER // master 不支持 PSYNC 命令 -> PSYNC_NOT_SUPPORTED } MASTER 视角