EOS源码分析:transaction的一生 (3)

当接收全节点同步过来的事务的出块节点处于当值轮次时,会将接收的事务立即向其他节点(包括非出块节点)进行广播,主要通过channel机制跳转到net_plugin中。
目前事务停留在当值出块节点的producer_plugin的process_incoming_transaction_async方法中。transaction_ack作为channel号被声明在producer插件的compat::channels::transaction_ack命名空间下。这个channel是由net_plugin订阅。

channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;

这个频道的订阅器是net插件确认正在进来的事务。订阅器的实现方法绑定在net_plugin_impl::transaction_ack方法上。

my->incoming_transaction_ack_subscription = app().get_channel<channels::transaction_ack>().subscribe(boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1));

进入net_plugin_impl::transaction_ack方法。

/** * @brief 出块节点确认事务 * * @param results 二元组pair类型,第一个元素为异常信息,第二个元素为事务数据。 */ void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, transaction_metadata_ptr>& results) { const auto& id = results.second->id; // 从事务体中得到事务id。 if (results.first) { //如果存在异常情况则拒绝广播该事务。 fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string())); dispatcher->rejected_transaction(id); } else { // 无异常情况,广播该事务。打印事务确认消息,到这一步就说明当前节点完成了确认 fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id)); dispatcher->bcast_transaction(results.second); } }

成功确认以后,调用bcast_transaction方法继续广播该事务。

/** * @brief 事务广播给其他节点 * * @param ptrx 事务体 */ void dispatch_manager::bcast_transaction(const transaction_metadata_ptr& ptrx) { std::set<connection_ptr> skips; // 相当于连接黑名单,从连接集合中跳过广播。 const auto& id = ptrx->id; // 获取事务id auto range = received_transactions.equal_range(id); // 已接收事务集是接收其他节点广播的事务,而不是自己发起广播的事务 for (auto org = range.first; org != range.second; ++org) { skips.insert(org->second); // 如果找到该事务,说明该事务已被其他节点优先广播,则自己不必额外处理。将事务连接插入skips集合。 } received_transactions.erase(range.first, range.second); // 删除已接收事务集中该事务,逻辑清空。 // 在本地事务集local_txns中查询,若找到则直接退出,说明该事务已完成广播共识。 if( my_impl->local_txns.get<by_id>().find( id ) != my_impl->local_txns.end() ) { fc_dlog(logger, "found trxid in local_trxs" ); return; } // 将事务插入到本地事务集local_txns time_point_sec trx_expiration = ptrx->packed_trx->expiration(); const packed_transaction& trx = *ptrx->packed_trx; auto buff = create_send_buffer( trx ); node_transaction_state nts = {id, trx_expiration, 0, buff}; my_impl->local_txns.insert(std::move(nts)); // 符合广播条件,开始广播。 my_impl->send_transaction_to_all( buff, [&id, &skips, trx_expiration](const connection_ptr& c) -> bool { if( skips.find(c) != skips.end() || c->syncing ) { return false; // 若该事务已被其他节点优先广播,则自己不做处理。 } const auto& bs = c->trx_state.find(id); bool unknown = bs == c->trx_state.end(); if( unknown ) { // trx_state未找到事务,则插入。 c->trx_state.insert(transaction_state({id,0,trx_expiration})); fc_dlog(logger, "sending trx to ${n}", ("n",c->peer_name() ) ); } return unknown; }); }

继续,进入send_transaction_to_all方法,查看广播的具体实现。net插件维护了一个connections集合,该集合动态维护了全网节点的p2p连接情况。

/** * @brief 模板方法:发送事务给全体成员 * * @tparam VerifierFunc 模板类 * @param send_buffer 事务数据 * @param verify 模板类实例 */ template<typename VerifierFunc> void net_plugin_impl::send_transaction_to_all(const std::shared_ptr<std::vector<char>>& send_buffer, VerifierFunc verify) { for( auto &c : connections) { if( c->current() && verify( c )) { // 在上面的使用中,就是检查是否在skips集合中。 // 进入连接队列,建立连接,发送消息。 c->enqueue_buffer( send_buffer, true, priority::low, no_reason ); // enqueue_buffer->queue_write->do_queue_write->boost::asio::async_write } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsszzf.html