在前面几篇讨论里我们介绍了在集群环境里的一些编程模式、分布式数据结构及具体实现方式。到目前为止,我们已经实现了把程序任务分配给处于很多服务器上的actor,能够最大程度的利用整体系统的硬件资源。这是因为通过akka-cluster能够把很多服务器组合成一个虚拟的整体系统,编程人员不需要知道负责运算的actor具体在那台服务器上运行。当然,我所指的整体系统是一种分布式的系统,实质底层还是各集群节点作为完整个体独立运行的,所以核心理念还是需要将程序分割成能独立运算的任务,然后分派给可能分布在很多服务器上的actor去运算。在上一篇的cluster-load-balance里我们采用了一种fire-and-forget模式把多项独立任务分配给集群节点上的actor,然后任由它们各自完成运算,中途不做任何交互、控制。这也是一种典型的无内部状态的运算模式。对外界来讲就是开始、完成,中间没有关于运算进展或当前状态的交流需要。但在现实里,很多任务是无法完全进行独立细分的,或者再细分会影响系统效率。比如网上购物网站每个客户的购物车:它记录了客户在网上的所有商品拣选过程,每一个拣选动作都代表更新的购物车状态,直到完成结算。那么在一个可能有几十万用户同时在线购物的网站,保留在内存的购物车状态应该是任何机器都无法容纳的,只有回到传统的数据库模式了,还是要面对无法解决的多并发系统效率问题。这么分析,集群分片技术可能是最好的解决方法了。
简单讲:集群分片技术就是把一堆带唯一标识identifier的actor,即entity分布到集群节点上去。控制程序可以通过唯一ID与entityr进行交互,控制整个运算过程。这样,我们可以把程序分成相对合理的包含多个过程状态的细分任务。这些细分任务是由分布在集群节点上的entity来运算的,产生的状态当然也使用的是各集群节点上的资源,如此解决上面所提到的内存容量问题。akka-cluster提供的actor位置透明化机制能在系统崩溃、增减集群节点时自动重新部署所有的actor以达到负责均衡。而用户通过固定的ID就能联络目标entity,无论它被转移到任何集群节点上。
集群分片由分片管理ShardRegion和分片定位ShardCoordinator共同协作实现,目标是把消息正确传递给指定ID的entity。分片定位负责确定分片所在集群节点,分片管理则对每个集群节点上分片内的entity进行定位。ShardCoordinator是个cluster-singleton,而ShardRegion则必须部署在每个集群节点上。每个分片内的entity必须是一个类型的actor。发给entity的消息内部必须包含分片编号和entity ID。通过从消息中解析位置信息后由ShardCoordinator确定负责传递消息的ShardRegion,相关的ShardRegion按ID把消息发送至目标entity。
每个节点上的ShardRegion是通过下面这个start函数构建的:
/** * Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * * @param typeName the name of the entity type * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` * @param settings configuration settings, see [[ClusterShardingSettings]] * @param extractEntityId partial function to extract the entity id and the message to send to the * entity from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream * @param extractShardId function to determine the shard id for an incoming message, only messages * that passed the `extractEntityId` will be used * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic * @param handOffStopMessage the message that will be sent to entities when they are to be stopped * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`. * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, entityProps: Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = {...}