Debug EOS:nodeos + mongo_db_plugin (6)

我们要将链上的数据同步至mongo,是通过上面判断是否为空的那四个queue来做,为了增加消费效率,进入consume_block函数以后,要先将数据move导入到一个process_queue中来慢慢处理,相当于一个中转站。

size_t transaction_metadata_size = transaction_metadata_queue.size(); if (transaction_metadata_size > 0) { transaction_metadata_process_queue = move(transaction_metadata_queue); transaction_metadata_queue.clear(); } size_t transaction_trace_size = transaction_trace_queue.size(); if (transaction_trace_size > 0) { transaction_trace_process_queue = move(transaction_trace_queue); transaction_trace_queue.clear(); } size_t block_state_size = block_state_queue.size(); if (block_state_size > 0) { block_state_process_queue = move(block_state_queue); block_state_queue.clear(); } size_t irreversible_block_size = irreversible_block_state_queue.size(); if (irreversible_block_size > 0) { irreversible_block_state_process_queue = move(irreversible_block_state_queue); irreversible_block_state_queue.clear(); } 队列大小报警器

接下来是一个针对四个源队列的大小进行一个监控,当任意超过限额的75%时,会触发报警,打印到控制台。

分发到具体执行函数消费队列

接下来,就是将上面的四个中转的process_queue的数据分别分发到不同的函数(对应下面四个_process函数)中去消费处理。最后每个中转队列处理一条,就pop出去一条,都处理结束以后,会再次判断四个源队列的大小是否为空,都消费完了,同时也得有析构函数的done标志位为true,才会中断consume_thread线程的consume_block()的无线循环。

1. mongo_db_plugin_impl::_process_accepted_transaction()
执行接收交易,需要start_block_reached标识位为true。源码较长不粘贴,语言介绍一下,该函数的主要工作是获得mongo的连接以及库表对象,同时解析传入的const chain::transaction_metadata_ptr& t 对象,该对象的路线是:

chain=>signal=>mongo_db_plugin connect signal=>queue=>process_queue=>遍历出一条数据即是t

获得这个对象以后,也准备好了mongo数据库的连接库表对象,剩下的工作就是从t解析导入mongo库表了。

mongo作为列存储的nosql文件数据库,这里只接收document类型

这里创建了一个它的对象act_doc,解析过程:

链数据对象的解析

const auto trx_id = t->id; const auto trx_id_str = trx_id.str(); const auto& trx = t->trx; const chain::transaction_header& trx_header = trx;

mongo数据库存储结构的定义,值数据的传入,通过process_action函数进行处理,

act_doc.append( kvp( "action_num", b_int32{act_num} ), kvp( "trx_id", trx_id_str )); act_doc.append( kvp( "cfa", b_bool{cfa} )); act_doc.append( kvp( "account", act.account.to_string())); act_doc.append( kvp( "name", act.name.to_string())); act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) { for( const auto& auth : act.authorization ) { subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) { subdoc.append( kvp( "actor", auth.actor.to_string()), kvp( "permission", auth.permission.to_string())); } ); } } ));

process_action函数处理的是action数据的匹配,而如果action涉及到新账户的创建,这部分要在process_action函数中继续通过update_account函数进行处理。update_account函数只会过滤由system合约执行的newaccount动作,system合约默认是由chain::config::system_account_name(就是eosio)来创建的。所以过滤后的action的结构如下:

field value
account   eosio  
name   newaccount  

然后会同步在mongo的accounts表中添加一条记录,要有当时的添加时间createdAt。添加之前,要根据这个用户名去mongo中查找,通过函数find_account,如果查找到了则update,未查到就insert。

auto find_account(mongocxx::collection& accounts, const account_name& name) { using bsoncxx::builder::basic::make_document; using bsoncxx::builder::basic::kvp; return accounts.find_one( make_document( kvp( "name", name.to_string()))); }

接着,是transaction表的数据插入,这个工作是对trans_doc文本类型变量的设置:

trx_id设置

irreversible设置

transaction_header设置

signing_keys设置

actions设置:遍历源trx的actions,每一项去调用上面定义的process_action函数执行action数据的处理发到action_array变量中,赋给actions。

context_free_actions,与action的处理过程差不多。

transaction_extensions设置

signatures

context_free_data

createdAt

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssjyw.html