Akka-CQRS(3)- 再想多点,全面点

   上篇我介绍了CQRS模式存写部分的具体实现和akka-persistence一些函数和消息的用法。在这篇本来是准备直接用一个具体的例子来示范CQRS模式编程,主要是写端,或者是数据采集端。想着模拟收银机的后端操作,可以使用集群分片(cluster-sharding),每个分片shard代表一部POS机控制系统。在写这段程序之前首先把示例功能实现、cluster-sharding, persistence-actor,actor-passivation, backoff-supervisor, ClusterSharding.start和ClusterSharding.startProxy等技术细节搞清楚:

1、构建几个测试销售的产品信息

2、设计一套简单但功能完整的操作指令command

3、设计运算状态,即一单未结算销售单据的状态。相关的指令-事件command-event转换和状态更新机制

4、单据状态初始化

5、业务逻辑部分,从接到各项指令、指令-事件转换、处理副作用、存写事件、更新单据状态

6、结束单据处理

以一单支付金额大于等于应付金额作为整单结束状态。此时应进行下面的处理:

     1)增加单号  2)清除所有交易项目  3)saveSnapshot  (重启就不用恢复前面的事件persistent-events)

7、资源释放策略及处理 passivation

如果一个shard-entity暂不使用时最好先停掉stop以释放它占用的资源。但用常规的方式停止entity会造成mailbox里未处理的消息丢失,所以cluster-sharding有一套特别的机制ClusterSharding.Passivate(actorRef)来实现shard-entity的安全停用,即:目标entity向ShardRegion发送Passivate(stopMessage)消息、ShardRegion向目标entity发送包嵌在消息里的stopMessage。目标entity在收到消息后可以自行停止。ShardRegion会保留收到Passivate消息到目标entity停止之间收到的消息,还给再启动的entity。在本例子里passivation的应用场景如下:每单支付后如果一段时间没有收到新的开单指令,这个shard-entity可以通过向ShardRegion发送Passivate消息或按空转时间段设定自动passivate自己,这时ShardRegion在entity空转超出时间后自动发送ClusterSharding.start(...)里定义的handOffStopMessage(PoisonPill),如下:

def passivate(entity: ActorRef, stopMessage: Any): Unit = { idByRef.get(entity) match { case Some(id) ⇒ if (!messageBuffers.contains(id)) { passivating = passivating + entity messageBuffers.add(id) entity ! stopMessage } else { log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity.", entity) } case None ⇒ log.debug("Unknown entity {}. Not sending stopMessage back to entity.", entity) } } def passivateIdleEntities(): Unit = { val deadline = System.nanoTime() - settings.passivateIdleEntityAfter.toNanos val refsToPassivate = lastMessageTimestamp.collect { case (entityId, lastMessageTimestamp) if lastMessageTimestamp < deadline ⇒ refById(entityId) } if (refsToPassivate.nonEmpty) { log.debug("Passivating [{}] idle entities", refsToPassivate.size) refsToPassivate.foreach(passivate(_, handOffStopMessage)) } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdxpx.html