从PyMongo看MongoDB Read Preference (2)

  当某个server的monitor获取到了在server对应的mongod上的复制集信息信息时,调用Tolopogy.on_change更新复制集的拓扑信息:

def on_change(self, server_description): """Process a new ServerDescription after an ismaster call completes.""" if self._description.has_server(server_description.address): self._description = updated_topology_description( self._description, server_description) self._update_servers() # 根据信息,连接到新增的节点,移除(断开)已经不存在的节点 self._receive_cluster_time_no_lock( server_description.cluster_time) # Wake waiters in select_servers(). self._condition.notify_all()

  核心在updated_topology_description, 根据本地记录的topology信息,以及收到的server_description(来自IsMaster- ismaster command response),来调整本地的topology信息。以一种情况为例:收到一个ismaster command response,对方自称自己是primary,不管当前topology有没有primary,都会进入调用以下函数

def _update_rs_from_primary( sds, replica_set_name, server_description, max_set_version, max_election_id): """Update topology description from a primary's ismaster response. Pass in a dict of ServerDescriptions, current replica set name, the ServerDescription we are processing, and the TopologyDescription's max_set_version and max_election_id if any. Returns (new topology type, new replica_set_name, new max_set_version, new max_election_id). """ if replica_set_name is None: replica_set_name = server_description.replica_set_name elif replica_set_name != server_description.replica_set_name: # 不是来自同一个复制集 # We found a primary but it doesn't have the replica_set_name # provided by the user. sds.pop(server_description.address) return (_check_has_primary(sds), replica_set_name, max_set_version, max_election_id) max_election_tuple = max_set_version, max_election_id if None not in server_description.election_tuple: if (None not in max_election_tuple and max_election_tuple > server_description.election_tuple): # 节点是priamry,但比topology中记录的旧 # Stale primary, set to type Unknown. address = server_description.address sds[address] = ServerDescription(address) # 传入空dict,则server-type为UnKnown return (_check_has_primary(sds), replica_set_name, max_set_version, max_election_id) max_election_id = server_description.election_id if (server_description.set_version is not None and # 节点的config version版本更高 (max_set_version is None or server_description.set_version > max_set_version)): max_set_version = server_description.set_version # We've heard from the primary. Is it the same primary as before? for server in sds.values(): if (server.server_type is SERVER_TYPE.RSPrimary and server.address != server_description.address): # Reset old primary's type to Unknown. sds[server.address] = ServerDescription(server.address) # There can be only one prior primary. break # Discover new hosts from this primary's response. for new_address in server_description.all_hosts: if new_address not in sds: sds[new_address] = ServerDescription(new_address) # Remove hosts not in the response. for addr in set(sds) - server_description.all_hosts: sds.pop(addr) # If the host list differs from the seed list, we may not have a primary # after all. return (_check_has_primary(sds), replica_set_name, max_set_version, max_election_id)

  注意看docstring中的Returns,都是返回新的复制集信息

  那么整个函数从上往下检查

是不是同一个复制集

新节点(自认为是primary)与topology记录的primary相比,谁更新。比较(set_version, election_id)

比较set_servion

如果topology中已经有stale primary,那么将其server-type改成Unknown

从Primary节点的all_hosts中取出新加入复制集的节点

移除已经不存在于复制集中的节点

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjwxy.html