EOS行为核心:解析插件chain_plugin (12)

channel的订阅是要依赖频道本身的内容发布的,也就是说频道是要先存在的,主体A可以来订阅,主体C、主体D都可以来订阅,而与作为发布方的主体B无关,主体B不用知道有谁订阅了。而method的注册和调用正好是相反的。实际上对于本文研究到的channel和method,主体A都是producer_plugin。本例中,一个区块被广播出来,需要所有的订阅者来执行本地的区块接收操作,因此需要采用channel机制。

下面搜索transaction_ack频道的订阅处:

my->incoming_transaction_ack_subscription = app().get_channel<channels::transaction_ack>().subscribe(boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1));

延伸到实际的调用函数net_plugin_impl::transaction_ack

void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>& results) { transaction_id_type id = results.second->id(); if (results.first) { // first位置是用来放异常信息的,如果first不为空则说明有异常。 fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string())); dispatcher->rejected_transaction(id);// 调用rejected_transaction,从received_transactions接收事务集合中将其删除。 } else { fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id)); dispatcher->bcast_transaction(*results.second); } } bcast_transaction函数

调用广播事务函数dispatch_manager::bcast_transaction。

void dispatch_manager::bcast_transaction (const packed_transaction& trx) { std::set<connection_ptr> skips; // 跳过的数据集合 transaction_id_type id = trx.id(); auto range = received_transactions.equal_range(id); for (auto org = range.first; org != range.second; ++org) { skips.insert(org->second); // 在接收事务集合中找到对应id的事务数据遍历放于skips。 } received_transactions.erase(range.first, range.second); // 从received_transactions接收事务集合中将其删除。 for (auto ref = req_trx.begin(); ref != req_trx.end(); ++ref) { if (*ref == id) { // 本地请求事务集合中,找到目标事务删除 req_trx.erase(ref); break; } } if( my_impl->local_txns.get<by_id>().find( id ) != my_impl->local_txns.end( ) ) { fc_dlog(logger, "found trxid in local_trxs" ); return;// 在本地事务集合中找到目标事务了,终止不必重复处理。 } uint32_t packsiz = 0; uint32_t bufsiz = 0; time_point_sec trx_expiration = trx.expiration(); net_message msg(trx); packsiz = fc::raw::pack_size(msg); bufsiz = packsiz + sizeof(packsiz); vector<char> buff(bufsiz); fc::datastream<char*> ds( buff.data(), bufsiz); ds.write( reinterpret_cast<char*>(&packsiz), sizeof(packsiz) ); fc::raw::pack( ds, msg );// trx转为net_message结构,打包通过数据流ds到缓存buff中。 node_transaction_state nts = {id, trx_expiration, trx, std::move(buff), 0, 0, 0}; my_impl->local_txns.insert(std::move(nts)); // 插入到本地事务集,net_plugin自定义的多索引库node_transaction_index中。 if( !large_msg_notify || bufsiz <= just_send_it_max) { // max-implicit-request参数决定just_send_it_max,最大请求数量 my_impl->send_all( trx, [id, &skips, trx_expiration](connection_ptr c) -> bool { if( skips.find(c) != skips.end() || c->syncing ) {// skips中一旦有了当前连接,或者connection正在同步中,则退出。 return false; } const auto& bs = c->trx_state.find(id); // 连接中的事务状态多索引库中寻找目标事务,返回事务数据 bool unknown = bs == c->trx_state.end(); if( unknown) {// 没找到则插入 c->trx_state.insert(transaction_state({id,true,true,0,trx_expiration,time_point() })); fc_dlog(logger, "sending whole trx to ${n}", ("n",c->peer_name() ) ); } else { // 找到则更新过期时间、状态库数据 update_txn_expiry ute(trx_expiration); c->trx_state.modify(bs, ute); } return unknown; }); }else {// 超过最大请求数量以后,不处理trx,而是pending_notify notice_message pending_notify; pending_notify.known_trx.mode = normal; pending_notify.known_trx.ids.push_back( id ); pending_notify.known_blocks.mode = none; my_impl->send_all(pending_notify, [id, &skips, trx_expiration](connection_ptr c) -> bool { if (skips.find(c) != skips.end() || c->syncing) { return false; } const auto& bs = c->trx_state.find(id); bool unknown = bs == c->trx_state.end(); if( unknown) { fc_dlog(logger, "sending notice to ${n}", ("n",c->peer_name() ) ); c->trx_state.insert(transaction_state({id,false,true,0,trx_expiration,time_point() })); } else { update_txn_expiry ute(trx_expiration); c->trx_state.modify(bs, ute); } return unknown; }); } } 23. 异步读写操作:推送事务数组 push_transactions

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssfzz.html