EOS源码分析:transaction的一生 (4)

最终的建立socket连接并发送数据的过程在注释中已体现:enqueue_buffer -> queue_write -> do_queue_write -> boost::asio::async_write,不再深入源码详细讨论。

process_incoming_transaction_async

void net_plugin_impl::transaction_ack方法中的参数二元组对象results是由process_incoming_transaction_async方法体中对transaction_ack频道发布的数据。上一小节详细分析了transaction_ack频道的订阅处理,这一小节回到process_incoming_transaction_async方法分析transaction_ack频道的信息发布。该方法体内部首先定义了一个send_response方法。

auto send_response = [this, &trx, &chain, &next](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& response) { next(response); // 通过next方法将response传回客户端。 if (response.contains<fc::exception_ptr>()) { // 响应内容中有异常情况出现,则发布数据中的第一个元素为异常对象,作为transaction_ack在net插件中的result.first数据。 _transaction_ack_channel.publish(priority::low, std::pair<fc::exception_ptr, transaction_metadata_ptr>(response.get<fc::exception_ptr>(), trx)); if (_pending_block_mode == pending_block_mode::producing) { // 如果当前节点正在出块,则打印日志区块拒绝该事务。 fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is REJECTING tx: ${txid} : ${why} ", ("block_num", chain.head_block_num() + 1) ("prod", chain.pending_block_producer()) ("txid", trx->id) ("why",response.get<fc::exception_ptr>()->what())); // why的值为拒绝该事务的原因,即打印出异常对象的可读信息。 } else { // 如果当前节点尚未出块,则打印未出块节点的推测执行:拒绝该事务。 fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${txid} : ${why} ", ("txid", trx->id) ("why",response.get<fc::exception_ptr>()->what())); // 同样打印异常 } } else { // 如果响应内容中无异常,说明成功执行,则第一个元素为空。 _transaction_ack_channel.publish(priority::low, std::pair<fc::exception_ptr, transaction_metadata_ptr>(nullptr, trx)); if (_pending_block_mode == pending_block_mode::producing) { // 如果当前节点正在出块,则打印日志区块接收该事务。 fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING tx: ${txid}", ("block_num", chain.head_block_num() + 1) ("prod", chain.pending_block_producer()) ("txid", trx->id)); } else { // 如果当前节点尚未出块,则打印未出块节点的推测执行:接收该事务。 fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is ACCEPTING tx: ${txid}", ("txid", trx->id)); } } };

从send_response方法的定义可以看出,第二个参数永远是事务体本身,这是不变的。而第一个参数是否包含异常信息是不确定的,取决于调用者的传入情况。所以接下来实际上是对事务状态的判断,从而影响传给send_response方法的第一个参数是否包含异常。这些异常情况包括:

事务超时过期,通过将事务过期时间与当前最新区块时间对比即可,若小于最新区块时间则判定事务过期。

事务重复,在当前节点的db中寻找是否有相同事务id的存在,若存在则说明事务重复。

事务执行时出错:

全节点配置为只读模式的,不可以处理推送事务。

不允许忽略检查以及延迟事务。

内部执行错误,例如权限问题,资源问题,事务进入合约内部校验错误等,详细内容看下面对controller::push_transaction方法的分析。

controller::push_transaction /** * @brief 这是新事务进入区块状态的进入点。将会检查权限,是否立即执行或延迟执行。 * 最后,将事务返回体插入到等待中的区块。 * * @param trx 事务体 * @param deadline 截止时间 * @param billed_cpu_time_us CPU抵押时间 * @param explicit_billed_cpu_time CPU抵押时间是否明确,一般是false,未显式指定 * * @return transaction_trace_ptr 事务跟踪,返回的结构体对象 */ transaction_trace_ptr push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us, bool explicit_billed_cpu_time = false ) { EOS_ASSERT(deadline != fc::time_point(), transaction_exception, "deadline cannot be uninitialized"); // 截止时间的格式出现问题 transaction_trace_ptr trace; // 定义事务跟踪实例。 try { auto start = fc::time_point::now(); const bool check_auth = !self.skip_auth_check() && !trx->implicit; // implicit事务会忽略检查也可以自己设置跳过auth检查,则check_auth 为false。 // 得到要使用的cpu的时间值。 const fc::microseconds sig_cpu_usage = check_auth ? std::get<0>( trx->recover_keys( chain_id ) ) : fc::microseconds(); // 得到权限的公钥 const flat_set<public_key_type>& recovered_keys = check_auth ? std::get<1>( trx->recover_keys( chain_id ) ) : flat_set<public_key_type>(); if( !explicit_billed_cpu_time ) { // 未显式指定CPU抵押时间。 // 计算已消费CPU时间 fc::microseconds already_consumed_time( EOS_PERCENT(sig_cpu_usage.count(), conf.sig_cpu_bill_pct) ); if( start.time_since_epoch() < already_consumed_time ) { start = fc::time_point(); } else { start -= already_consumed_time; } } const signed_transaction& trn = trx->packed_trx->get_signed_transaction(); transaction_context trx_context(self, trn, trx->id, start); if ((bool)subjective_cpu_leeway && pending->_block_status == controller::block_status::incomplete) { trx_context.leeway = *subjective_cpu_leeway; } trx_context.deadline = deadline; trx_context.explicit_billed_cpu_time = explicit_billed_cpu_time; trx_context.billed_cpu_time_us = billed_cpu_time_us; trace = trx_context.trace; try { if( trx->implicit ) { // 忽略检查的事务的处理办法 trx_context.init_for_implicit_trx(); // 检查事务资源(CPU和NET)可用性。 trx_context.enforce_whiteblacklist = false; } else { bool skip_recording = replay_head_time && (time_point(trn.expiration) <= *replay_head_time); // 检查事务资源(CPU和NET)可用性。 trx_context.init_for_input_trx( trx->packed_trx->get_unprunable_size(), trx->packed_trx->get_prunable_size(), skip_recording); } trx_context.delay = fc::seconds(trn.delay_sec); if( check_auth ) { authorization.check_authorization( // 权限校验 trn.actions, recovered_keys, {}, trx_context.delay, [&trx_context](){ trx_context.checktime(); }, false ); } trx_context.exec(); // 执行事务上下文,合约方法内部的校验错误会在这里抛出,使事务行为在当前节点的链上生效。 trx_context.finalize(); // 资源处理,四舍五入,自动扣除并更新账户的资源情况。 auto restore = make_block_restore_point(); if (!trx->implicit) { transaction_receipt::status_enum s = (trx_context.delay == fc::seconds(0)) ? transaction_receipt::executed : transaction_receipt::delayed; trace->receipt = push_receipt(*trx->packed_trx, s, trx_context.billed_cpu_time_us, trace->net_usage); pending->_block_stage.get<building_block>()._pending_trx_metas.emplace_back(trx); } else { // 以上代码段都包含在try异常监控的作用域中,因此如果到此仍未发生异常而中断,则判断执行成功。 transaction_receipt_header r; r.status = transaction_receipt::executed; // 注意:这就是客户端接收到的那个非常重要的状态executed。 r.cpu_usage_us = trx_context.billed_cpu_time_us; r.net_usage_words = trace->net_usage / 8; trace->receipt = r; } fc::move_append(pending->_block_stage.get<building_block>()._actions, move(trx_context.executed)); if (!trx->accepted) { trx->accepted = true; emit( self.accepted_transaction, trx); // 发射接收事务的信号 } emit(self.applied_transaction, std::tie(trace, trn)); if ( read_mode != db_read_mode::SPECULATIVE && pending->_block_status == controller::block_status::incomplete ) { trx_context.undo(); // 析构器,undo撤销操作。 } else { restore.cancel(); trx_context.squash(); // 上下文刷新 } if (!trx->implicit) { unapplied_transactions.erase( trx->signed_id ); } return trace; } catch( const disallowed_transaction_extensions_bad_block_exception& ) { throw; } catch( const protocol_feature_bad_block_exception& ) { throw; } catch (const fc::exception& e) { trace->error_code = controller::convert_exception_to_error_code( e ); trace->except = e; trace->except_ptr = std::current_exception(); } if (!failure_is_subjective(*trace->except)) { unapplied_transactions.erase( trx->signed_id ); } emit( self.accepted_transaction, trx ); // 发射接收事务的信号,触发controller相关信号操作 emit( self.applied_transaction, std::tie(trace, trn) ); // 发射应用事务的信号,触发controller相关信号操作 return trace; } FC_CAPTURE_AND_RETHROW((trace)) } /// push_transaction

信号方面的内容请转到controller的信号。

小结

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsszzf.html