Openfire集群源码分析

如果用户量增加后为了解决吞吐量问题,需要引入集群,在openfire中提供了集群的支持,另外也实现了两个集群插件:hazelcast和clustering。为了了解情况集群的工作原理,我就沿着openfire的源代码进行了分析,也是一次学习的过程。

首先理解集群的一些简单概念

集群的目的是让多个实例像一个实例一样运行,这样就可以通过增长实例来增长计算能力。也就是所谓的分布式计算问题,这其中最为关注的一个特性就是——CAP理论,也就是所谓的一致性、可用性、分区容错性。集群中最核心解决的问题就是CAP。

CAP综合理解就是我上面写的,多个实例像一个实例一样运行。

 

所以所谓集群就是把一些数据共享或者同步到不同的实例上,这样系统使用同样的算法,取的结果当然应该是相同啦。所以一些数据库的主从复制,缓存数据集群都是类似这种解决方法。只是代码实现质量和处理规模的问题。

 

有了这个基础我们再来看看openfire是怎么解决这个问题的。

 

openfire的集群设计

 

1、哪些需要进行集群间的同步

 对于openfire而言,有这几方面的数据需要进行保证集群间的同步:数据库存的数据、缓存数据、session。貌似就这些吧?

数据库

因为对于openfire来说基本上是透明的,所以这块就交给数据库本身来实现。

缓存数据

缓存是存在内存里的,所以这部分是要同步的

session

session在openfire并不需要所有实例同步,但是需要做用户路由缓存,否则发消息时找不到对应的会话。由此用户路由还是要同步的。

 

2、缓存的设计

缓存接口

openfire里对缓存的数据容器提供了一个包装接口,这个接口提供了缓存数据的基本方法,用于统一数据操作。

public interface Cache<K,V> extends java.util.Map<K,V>

如果不开启集群时缓存的默认缓存容器类是:public class DefaultCache<K, V> ,实际上DefaultCache就是用一个Hashmap来存数据的。

缓存工厂类

为了保证缓存是可以扩展的,提供了一个工厂类:

public class CacheFactory

CacheFactory类中会管理所有的缓存容器,如下代码:

/** * Returns the named cache, creating it as necessary. * * @param name the name of the cache to create. * @return the named cache, creating it as necessary. */ @SuppressWarnings("unchecked") public static synchronized <T extends Cache> T createCache(String name) { T cache = (T) caches.get(name); if (cache != null) { return cache; } cache = (T) cacheFactoryStrategy.createCache(name); log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name); return wrapCache(cache, name); }

上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后warpCache方法会将此容器放入到caches中。

缓存工厂类的策略

在CacheFactory中默认是使用一个DefaultLocalCacheStrategy来完成缓存创建的。另外还提供了在集群条件下的缓存策略接入。也就是通过实例化不同的策略来切换缓存管理方案。比如后面要提到的hazelcast就是通过这个来替换了本地缓存策略的。从接口的设计上来看,openfire的缓存策略也就是为了集群与非集群的实现。

 

3、集群的设计

在openfire中的集群主要包括:集群管理、数据同步管理、集群计算任务。

 

集群管理者

在openfire中主要是一个类来实现:ClusterManager,在ClusterManager中实现了集群实例的加入、退出管理,因为没有使用主从结构,所以ClusterManager实现了一个无中心管理,不知道我理解的对不对。因为只要当前实实例启用了集群,ClusterManager就会主动的加载集群管理并与其他的集群进行同步。

 

startup

startup是启动集群的方法,代码:

 

public static synchronized void startup() { if (isClusteringEnabled() && !isClusteringStarted()) { initEventDispatcher(); CacheFactory.startClustering(); } }

首先要判断是否开启了集群并且当前集群实例未运行时才去启动。

先是初始化了事件分发器,用于处理集群的同步事情。

 

然后就是调用CacheFactory的startClustering来运行集群。在startClustering方法中主要是这几个事情:

会使用集群的缓存工厂策略来启动,同时使自己加入到集群中。

开启一个线程用于同步缓存的状态

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/f117b1ea52dcc7cb46de089fe4e62f21.html