Akka-CQRS(8)- CQRS Reader Actor 应用实例

  前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。

假如我们有个业务系统也是在cassandra上的,那么reader就需要把从日志读出来的事件恢复成cassandra表里的数据行row。首先,我们需要在cassandra上创建相关的keyspace和table。下面是在scala中使用cassandra-java-driver的例子:

import com.datastax.driver.core._ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import sdp.cql.engine._ import CQLEngine._ import CQLHelpers._ import monix.execution.Scheduler.Implicits.global import scala.util._ object CQLCreatTables extends App { //#init-mat implicit val cqlsys = ActorSystem("cqlSystem") implicit val mat = ActorMaterializer() // implicit val ec = cqlsys.dispatcher val cluster = new Cluster .Builder() .addContactPoints("192.168.11.189") .withPort(9042) .build() useJava8DateTime(cluster) implicit val session = cluster.connect() val createKeyspace = """ CREATE KEYSPACE pos_on_cloud WITH replication = { //pos业务数据库 'class': 'SimpleStrategy', 'replication_factor': '3' }""" val createVchLog =""" CREATE TABLE pos_on_cloud.vch_log ( //销售单号日志 (可以从某日开始重新运算交易) terminal text, txndate text, vchnum int, begin_seq bigint, end_seq bigint, PRIMARY KEY (terminal,txndate,vchnum) )""" val createTxnItems =""" CREATE TABLE pos_on_cloud.txn_log ( //pos交易记录表 terminal text, txndate text, txntime text, opr text, num int, seq int, txntype int, salestype int, qty int, price int, amount int, disc int, dscamt int, member text, code text, acct text, dpt text, PRIMARY KEY (terminal,txndate,num,seq) )""" val createTxnSuspend =""" CREATE TABLE pos_on_cloud.txn_hold ( //临时挂单表 terminal text, txndate text, txntime text, opr text, num int, seq int, txntype int, salestype int, qty int, price int, amount int, disc int dscamt int, member text, code text, acct text, dpt text, PRIMARY KEY (terminal,txndate,num,seq) )""" val ctxKeyspace = CQLContext().setCommand(createKeyspace) val ctxVchlog = CQLContext().setCommand(createVchLog) val ctxTxnlog = CQLContext().setCommand(createTxnItems) val ctxTxnhold = CQLContext().setCommand(createTxnSuspend) val results = for { stsKeyspace <- cqlExecute(ctxKeyspace) stsVchlog <- cqlExecute(ctxVchlog) stsTxnlog <- cqlExecute(ctxTxnlog) stsTxnhold <- cqlExecute(ctxTxnhold) } yield (stsKeyspace,stsVchlog,stsTxnlog,stsTxnhold) val task = results.value.value val cancellableFut = task.runToFuture cancellableFut.onComplete { case Success(value) => println(s"returned status: $value") case Failure(ex) => System.err.println(s"ERROR: ${ex.getMessage}") } // cancellableFut.cancel() /* val cancelable = task.runAsync { result => result match { case Right(value) => println(value) case Left(ex) => System.err.println(s"ERROR: ${ex.getMessage}") } } */ scala.io.StdIn.readLine() session.close() cluster.close() cqlsys.terminate() }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspsyf.html