从PyMongo看MongoDB Read Preference (3)

  PyMongo关于复制集的状态都来自于所有节点的ismaster消息,Source of Truth在于复制集,而且这个Truth来自于majority 节点。因此,某个节点返回给driver的信息可能是过期的、错误的,driver通过有限的信息判断复制集的状态,如果判断失误,比如将写操作发到了stale primary上,那么会在复制集上再次判断,保证正确性。

PyMongo read preference

  前面详细介绍了PyMongo是如何更新复制集的信息,那么这一部分来看看基于拓扑信息具体是如何根据read preference路由到某个节点上的。

  我们从Collection.find出发,一路跟踪, 会调用MongoClient._send_message_with_response

def _send_message_with_response(self, operation, read_preference=None, exhaust=False, address=None): topology = self._get_topology() if address: server = topology.select_server_by_address(address) if not server: raise AutoReconnect('server %s:%d no longer available' % address) else: selector = read_preference or writable_server_selector server = topology.select_server(selector) return self._reset_on_error( server, server.send_message_with_response, operation, set_slave_ok, self.__all_credentials, self._event_listeners, exhaust)

  代码很清晰,根据指定的address或者read_preference, 选择出server,然后通过server发请求,等待回复。topology.select_server一路调用到下面这个函数

def _select_servers_loop(self, selector, timeout, address): """select_servers() guts. Hold the lock when calling this.""" now = _time() end_time = now + timeout server_descriptions = self._description.apply_selector( # _description是TopologyDescription selector, address) while not server_descriptions: # No suitable servers. if timeout == 0 or now > end_time: raise ServerSelectionTimeoutError( self._error_message(selector)) self._ensure_opened() self._request_check_all() # Release the lock and wait for the topology description to # change, or for a timeout. We won't miss any changes that # came after our most recent apply_selector call, since we've # held the lock until now. self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) # Conditional.wait self._description.check_compatible() now = _time() server_descriptions = self._description.apply_selector( selector, address) self._description.check_compatible() return server_descriptions

  可以看到,不一定能一次选出来,如果选不出server,意味着此时还没有连接到足够多的mongod节点,那么等待一段时间(_condition.wait)重试。在上面Topology.on_change 可以看到,会调用_condition.notify_all唤醒。

def apply_selector(self, selector, address): def apply_local_threshold(selection): if not selection: return [] settings = self._topology_settings # Round trip time in seconds. fastest = min( s.round_trip_time for s in selection.server_descriptions) threshold = settings.local_threshold_ms / 1000.0 return [s for s in selection.server_descriptions if (s.round_trip_time - fastest) <= threshold] # 省略了无关代码... return apply_local_threshold( selector(Selection.from_topology_description(self)))

  上面selector就是read_preference._ServerMode的某一个子类,以Nearest为例

class Nearest(_ServerMode): def __call__(self, selection): """Apply this read preference to Selection.""" return member_with_tags_server_selector( self.tag_sets, max_staleness_selectors.select( self.max_staleness, selection))

  首先要受到maxStalenessSeconds的约束,然后再用tagsets过滤一遍,这里只关注前者。
关于maxStalenessSeconds

The read preference maxStalenessSeconds option lets you specify a maximum replication lag, or “staleness”, for reads from secondaries. When a secondary’s estimated staleness exceeds maxStalenessSeconds, the client stops using it for read operations.

  怎么计算的,如果节点有primary,则调用下面这个函数

def _with_primary(max_staleness, selection): """Apply max_staleness, in seconds, to a Selection with a known primary.""" primary = selection.primary sds = [] for s in selection.server_descriptions: if s.server_type == SERVER_TYPE.RSSecondary: # See max-staleness.rst for explanation of this formula. staleness = ( (s.last_update_time - s.last_write_date) - (primary.last_update_time - primary.last_write_date) + selection.heartbeat_frequency) if staleness <= max_staleness: sds.append(s) else: sds.append(s) return selection.with_server_descriptions(sds)

  上面的代码用到了IsMaster的last_write_date属性,正是用这个属性来判断staleness。

  公式的解释可参考max-staleness.rst

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjwxy.html