分布式改造剧集3:Ehcache分布式改造

第三集:分布式Ehcache缓存改造 前言

​ 好久没有写博客了,大有半途而废的趋势。忙不是借口,这个好习惯还是要继续坚持。前面我承诺的第一期的DIY分布式,是时候上终篇了---DIY分布式缓存。

探索之路

​ 在前面的文章中,我给大家大致说过项目背景:项目中的缓存使用的是Ehcache。因为前面使用Ehcache的应用就一台,所以这种单机的Ehcache并不会有什么问题。现在分布式部署之后,如果各个应用之间的缓存不能共享,那么其实各自就是一个孤岛。可能在一个业务跑下来,请求了不同的应用,结果在缓存中取出来的值不一样,

造成数据不一致。所以需要重新设计缓存的实现。

​ 因为尽量不要引入新的中间件,所以改造仍然是围绕Ehcache来进行的。搜集了各种资料之后,发现Ehcache实现分布式缓存基本有以下两种思路:

客户端实现分布式算法: 在使用Ehcache的客户端自己实现分布式算法。

算法的基本思路就是取模:即假设有三台应用(编号假设分别为0,1,2),对于一个要缓存的对象,首先计算其key的hash值,然后将hash值模3,得到的余数是几,就将数据缓存到哪台机器。

同步冗余数据: Ehcache是支持集群配置的,集群的各个节点之间支持按照一定的协议进行数据同步。这样每台应用其实缓存了一整份数据,不同节点之间的数据是一致的。

​ 虽然冗余的办法显得有点浪费资源,但是我最终还是选择了冗余。具体原因有以下几点:

分布式算法的复杂性: 前面所讲的分布式算法只是最基本的实现。事实上实现要比这个复杂的多。需要考虑增加或者删除节点的情况,需要使用更加复杂的一致性hash算法

可能导致整个应用不可用: 当删除节点之后,如果算法不能够感知进行自动调整,仍然去请求那个已经被删除的节点,可能导致整个系统不可用。

Demo

​ 最终我的实现采用RMI的方式进行同步

配置ehcache

​ spring-ehcache-cache.xml

<?xml version="1.0" encoding="UTF-8"?> <ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd"> <diskStore path="java.io.tmpdir/ehcache"/> <cache maxElementsInMemory="10000000" eternal="true" overflowToDisk="false" memoryStoreEvictionPolicy="LRU"> <cacheEventListenerFactory/> </cache> <cache maxElementsInMemory="100" eternal="true" overflowToDisk="false" memoryStoreEvictionPolicy="LRU"> <cacheEventListenerFactory/> </cache> <!-- cache发布信息配置,人工发现peerDiscovery=manual,cacheNames可配置多个缓存名称,以|分割 ) --> <cacheManagerPeerProviderFactory properties="peerDiscovery=manual, cacheNames=business1Cache|business2Cache" /> <!-- 接收同步cache信息的地址 --> <cacheManagerPeerListenerFactory properties="socketTimeoutMillis=2000" /> </ehcache>

​ spring-cache.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cache="http://www.springframework.org/schema/cache" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/cache http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop/spring-aop-4.2.xsd" default-autowire="byName"> <!-- 包扫描 --> <context:component-scan base-package="com.rampage.cache" /> <!-- 启用Cache注解 --> <cache:annotation-driven cache-manager="cacheManager" key-generator="keyGenerator" proxy-target-class="true" /> <!-- 自定义的缓存key生成类,需实现org.springframework.cache.interceptor.KeyGenerator接口 --> <bean /> <!-- 替换slite的ehcache实现 --> <bean> <property value="classpath:spring/cache/sppay-ehcache-cache.xml"/> <!-- value对应前面ehcache文件定义的manager名称 --> <property value="businessCaches" /> </bean> <bean> <property ref="ehCacheManagerFactory"/> </bean> <bean> <property> <list> <ref bean="ehCacheManager" /> </list> </property> <property value="true" /> </bean> </beans> 实现自定义转发和监听

​ 细心的读者应该不难发现,前面xml配置中cacheManagerPeerProviderFactory和cacheManagerPeerListenerFactory我使用的都是自定义的类。之所以使用自定义的类,是为了在初始化的时候发布的地址和端口,监听的地址端口可以在配置文件配置。具体类的实现如下:

/** * 分布式EhCache监听工厂 * @author secondWorld * */ public class DisRMICacheManagerPeerListenerFactory extends RMICacheManagerPeerListenerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerListenerFactory.class); /** * 配置文件中配置的监听地址,可以不配置,默认为本机地址 */ private static final String LISTEN_HOST = "distribute.ehcache.listenIP"; /** * 配置文件中配置的监听端口 */ private static final String LISTEN_PORT = "distribute.ehache.listenPort"; @Override protected CacheManagerPeerListener doCreateCachePeerListener(String hostName, Integer port, Integer remoteObjectPort, CacheManager cacheManager, Integer socketTimeoutMillis) { // xml中hostName为空,则读取配置文件(app-config.properties)中的值 if (StringUtils.isEmpty(hostName)) { String propHost = AppConfigPropertyUtils.get(LISTEN_HOST); if (StringUtils.isNotEmpty(propHost)) { hostName = propHost; } } // 端口采用默认端口0,则去读取配置文件(app-config.properties)中的值 if (port != null && port == 0) { Integer propPort = null; try { propPort = Integer.parseInt(AppConfigPropertyUtils.get(LISTEN_PORT)); } catch (NumberFormatException e) { } if (propPort != null) { port = propPort; } } LOGGER.info( "Initiliazing DisRMICacheManagerPeerListenerFactory:cacheManager[{}], hostName[{}], port[{}], remoteObjectPort[{}], socketTimeoutMillis[{}]......", cacheManager, hostName, port, remoteObjectPort, socketTimeoutMillis); return super.doCreateCachePeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis); } } /** * 分布式EhCache发布工厂 * * @author secondWorld * */ public class DisRMICacheManagerPeerProviderFactory extends RMICacheManagerPeerProviderFactory { private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerProviderFactory.class); private static final String CACHENAME_DELIMITER = "|"; private static final String PROVIDER_ADDRESSES = "distribute.ehcache.providerAddresses"; private static final String CACHE_NAMES = "cacheNames"; /** * rmi地址格式: //127.0.0.1:4447/Cache1|//127.0.0.1:4447/Cache2 */ @Override protected CacheManagerPeerProvider createManuallyConfiguredCachePeerProvider(Properties properties) { // 从app-config.properties中读取发布地址列表 String providerAddresses = AppConfigPropertyUtils.get(PROVIDER_ADDRESSES, StringUtils.EMPTY); // 从ehcache配置文件读取缓存名称 String cacheNames = PropertyUtil.extractAndLogProperty(CACHE_NAMES, properties); // 参数校验,这里发布地址和缓存名称都不能为空 if (StringUtils.isEmpty(providerAddresses) || StringUtils.isEmpty(cacheNames)) { throw new IllegalArgumentException("Elements \"providerAddresses\" and \"cacheNames\" are needed!"); } // 解析地址列表 List<String> cachesNameList = getCacheNameList(cacheNames); List<String> providerAddressList = getProviderAddressList(providerAddresses); // 注册发布节点 RMICacheManagerPeerProvider rmiPeerProvider = new ManualRMICacheManagerPeerProvider(); StringBuilder sb = new StringBuilder(); for (String cacheName : cachesNameList) { for (String providerAddress : providerAddressList) { sb.setLength(0); sb.append("//").append(providerAddress).append("http://www.likecs.com/").append(cacheName); rmiPeerProvider.registerPeer(sb.toString()); LOGGER.info("Registering peer provider [{}]", sb); } } return rmiPeerProvider; } /** * 得到发布地址列表 * @param providerAddresses 发布地址字符串 * @return 发布地址列表 */ private List<String> getProviderAddressList(String providerAddresses) { StringTokenizer stringTokenizer = new StringTokenizer(providerAddresses, AppConfigPropertyUtils.APP_ITEM_DELIMITER); List<String> ProviderAddressList = new ArrayList<String>(stringTokenizer.countTokens()); while (stringTokenizer.hasMoreTokens()) { String providerAddress = stringTokenizer.nextToken(); providerAddress = providerAddress.trim(); ProviderAddressList.add(providerAddress); } return ProviderAddressList; } /** * 得到缓存名称列表 * @param cacheNames 缓存名称字符串 * @return 缓存名称列表 */ private List<String> getCacheNameList(String cacheNames) { StringTokenizer stringTokenizer = new StringTokenizer(cacheNames, CACHENAME_DELIMITER); List<String> cacheNameList = new ArrayList<String>(stringTokenizer.countTokens()); while (stringTokenizer.hasMoreTokens()) { String cacheName = stringTokenizer.nextToken(); cacheName = cacheName.trim(); cacheNameList.add(cacheName); } return cacheNameList; } @Override protected CacheManagerPeerProvider createAutomaticallyConfiguredCachePeerProvider(CacheManager cacheManager, Properties properties) throws IOException { throw new UnsupportedOperationException("Not supported automatic distribute cache!"); } } 配置

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgzzgf.html