Openfire集群源码分析(2)

在前面startup中的initEventDispatcher方法,在这里会注册一个分发线程监听到集群事件,收到事件后会执行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。

 

在joinedCluster时会将本地的缓存容器都转换为集群缓存。由此便完成了集群的初始化并加入到集群中了。

 

shutdown

shutdown相对简单点就是退出集群,并且将缓存工厂恢复为本地缓存。

 

同步管理

上面主要是讲了如何管理集群,接着比较重要的就是如何在集群间同步数据呢?这部分主要是看具体的分布式计算系统的实现了,从openfire来说就是将数据放到集群缓存中,然后通过集群组件来完成的,比如使用hazelcast。

 

因为使用缓存来解决,所以在CacheFactory中才会有这些么多关于集群的处理代码,特别是对于缓存策略的切换,以及集群任务处理都在CacheFactory作为接口方法向外公开。这样也把集群的实现透明了。

 

集群计算任务 

在这之前一直没有提到集群中的计算问题,因为既然有了集群是不是可以利用集群的优势进行一些并行计算呢?这部分我倒没有太过确定,只是看到相关的代码所以简单列一下。

 

在CacheFactory类中有几个方法:doClusterTask、doSynchronousClusterTask,这两个都是overload方法,参数有所不同而已。这几个方法就是用于执行一些计算任务的。就看一下doClusterTask:

public static void doClusterTask(final ClusterTask<?> task) { cacheFactoryStrategy.doClusterTask(task); }

这里有个限定就是必须是ClusterTask派生的类才行,看看它的定义:

public interface ClusterTask<V> extends Runnable, Externalizable { V getResult(); }

主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。

再看CacheFactory的doClusterTask方法可以发现,它只不过是代理了缓存策略工厂的doClusterTask,具体的实现还是要看集群实现的。

 

看一看hazelcast的实现简单理解openfire集群

在openfire中有集群的插件实现,这里就以hazelcast为例子简单的做一下分析与学习。

 

缓存策略工厂类(ClusteredCacheFactory)

 

ClusteredCacheFactory实现了CacheFactoryStrategy,代码如下:

public class ClusteredCacheFactory implements CacheFactoryStrategy {

首先是startCluster方法用于启动集群,主要完成几件事情:

设置缓存序列化工具类,ClusterExternalizableUtil。这个是用于集群间数据复制时的序列化工具

设置远程session定位器,RemoteSessionLocator,因为session不同步,所以它主要是用于多实例间的session读取

设置远程包路由器ClusterPacketRouter,这样就可以在集群中发送消息了

加载Hazelcast的实例设置NodeID,以及设置ClusterListener

 

在前面说起集群启动时提到了缓存切换,那具体实现时是如何做的呢?

 

因为集群启动后就要是CacheFactory.joinedCluster方法来加入集群的。看一下加入的代码:

/** * Notification message indicating that this JVM has joined a cluster. */ @SuppressWarnings("unchecked") public static synchronized void joinedCluster() { cacheFactoryStrategy = clusteredCacheFactoryStrategy; // Loop through local caches and switch them to clustered cache (copy content) for (Cache cache : getAllCaches()) { // skip local-only caches if (localOnly.contains(cache.getName())) continue; CacheWrapper cacheWrapper = ((CacheWrapper) cache); Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName()); clusteredCache.putAll(cache); cacheWrapper.setWrappedCache(clusteredCache); } clusteringStarting = false; clusteringStarted = true; log.info("Clustering started; cache migration complete"); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/f117b1ea52dcc7cb46de089fe4e62f21.html