源码较多不粘贴,上面wipe_database函数,我们删除了六张表,在init函数中,我们要对应的建立这六张表。表名初始化:
const std::string mongo_db_plugin_impl::block_states_col = "block_states"; const std::string mongo_db_plugin_impl::blocks_col = "blocks"; const std::string mongo_db_plugin_impl::trans_col = "transactions"; const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces"; const std::string mongo_db_plugin_impl::actions_col = "actions"; const std::string mongo_db_plugin_impl::accounts_col = "accounts";这就是刘张表对应的名字。这六张表在初始化建立时是一个整体操作,也就是说互为依赖关系,accounts表先创建,通过
accounts = mongo_conn[db_name][accounts_col];即可创建成功accounts表,其他表亦然,后面不赘述。表数据是由make_document进行组装的。首先我们向accounts表插入一条数据,结构是name为eosio,createAt是当前时间。
chain::config::system_account_name ).to_string()
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});
通过insert_one方法将该条数据插入accounts表中。
接下来通过create_index方法对五张表建立索引,注意transaction_traces是没有索引的,init操作时不涉及transaction_traces表。
auto blocks = mongo_conn[db_name][blocks_col]; // Blocks blocks.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); blocks.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block建立了两个索引 auto block_stats = mongo_conn[db_name][block_states_col]; block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block_stats建立了两个索引 // accounts indexes accounts.create_index( bsoncxx::from_json( R"xxx({ "name" : 1 })xxx" )); // transactions indexes auto trans = mongo_conn[db_name][trans_col]; // Transactions trans.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); auto actions = mongo_conn[db_name][actions_col]; actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" ));初始化准备就完成了,接下来要建立线程监听出块消息,同步到mongo数据库中来。
ilog("starting db plugin thread"); consume_thread = boost::thread([this] { consume_blocks(); }); startup = false;// 结束,调用析构函数,关闭mongo_db_plugin:设定标志位done = true; (6)mongo_db_plugin_impl::consume_blocks()上面init函数执行到最后时,开启了一个线程,执行的是consume_blocks()函数,如字面含义这是消费区块的函数。这个函数是一个无限循环,保持线程的存活,监听queue的数据随时消费同步到mongo数据库中去,而queue的数据的是由上面plugin_initialize函数中的connect信号槽机制连接chain的出块信号往queue里面插入/同步链上数据。
condition无线循环第一部分就是对condition.wait(lock)的操作,condition在上面queue的源码中有一个notify_one()的调用,实际上就是与wait相互应的操作。
boost::mutex::scoped_lock lock(mtx); while ( transaction_metadata_queue.empty() && transaction_trace_queue.empty() && block_state_queue.empty() && irreversible_block_state_queue.empty() && !done ) { condition.wait(lock); }消费区块占用了一个线程,这个线程在上面四个queue是空的时候并且done也没有完成是flase的时候,该线程会通过condition来阻塞线程,参数是mutex的一个锁。
condition.notify_one()会重新唤起这个阻塞的线程,而在mongo_db_plugin中,condition.notify_one()出现了3次:
queue模板类型,有了新的数据插入的时候。
当queue模板类型的队列超过设置值的时候,要主动唤起consume_block开启消费线程加速消费(上面介绍queue的时候也谈到了队列大小超限时会增加queue插入的睡眠等待时间,这两方面相当于针对中间队列对两边进行开源节流,从而控制了队列的大小)
~mongo_db_plugin_impl()析构函数中
mongo_db_plugin_impl::~mongo_db_plugin_impl() { if (!startup) {//标志位,上面init函数结尾有这个值的赋值。 try { ilog( "mongo_db_plugin shutdown in process please be patient this can take a few minutes" ); done = true;//设定标志位done,consume_block()会使用到。 condition.notify_one();// 唤醒consume_thread线程继续消费掉queue中残余数据。 consume_thread.join();// 挂起主线程,等待consume_thread线程执行完毕再唤起主线程。 } catch( std::exception& e ) { elog( "Exception on mongo_db_plugin shutdown of consume thread: ${e}", ("e", e.what())); } } } process_queue准备