自定义实现一个loghub(或kafka)的动态分片消费者负载均衡?

  一般地,像kafka之类的消息中间件,作为一个可以保持历史消息的组件,其消费模型一般是主动拉取方式。这是为了给消费者足够的自由,回滚或者前进。

  然而,也正是由于将消费消息的权力交给了消费者,所以,消费者往往需要承担更多的责任。比如:需要自行保存消费偏移量,以便后续可以知道从哪里继续。而当这一点处理不好时,则可能带来一些麻烦。

  不管怎么样,解决方案也都是现成的,咱们也不用担心。

 

  今天我们要谈论的是一个场景: 如何让n个机器消费m个分片数据?

 

  这在消息中间件的解决方案里,明白地写着,使用消费者群组就可以实现了。具体来说就是,每个分片至多会被一机器消费,每个机器则可以消费多个分片数据。即机器数据小于分片数时,分片会被均衡地分配到消费者中。当机器数大于分片数时,多余的机器将不做任何事情。

  好吧,既然官方已经说明白了,那咱们应该就不再需要自己搞一个轮子了吧。

  但是,我还有个场景:如果我要求在机器做负载重平衡时,需要保证被抽取出去的机器分片,至少保留一段时间,不允许任何机器消费该分片,因为可能还有数据需要备份。

  针对这种场景,我想官方也许是有提供回调函数之类的解决方案的吧。不管了,反正我没找到,只能自己先造个轮子了。

 

本文场景前提:

  1. 使用loghub作为消息中间件(原理同kafka);
  2. 整个数据有m个分片shard;
  3. 整个消费者集群有n台机器;
  4. 每个分片的数据需要集中到一机器上做有状态处理;
  5. 可以借助redis保存有状态数据,以便消费者机器做优雅停机;

  最简单的方案是,使 n=m, 每台机器消费一个shard, 这样状态永远不会错乱。

  但是这样明显可扩展能力太差了!

    比如有时数据量小了,虽然分片还在,但是完全不用那么多机器的时候,如何缩减机器?
    比如由于数据压力大了,我想增加下分片数,以提高发送者性能,但是消费者我还不想理他,消费慢点无所谓?

  其实,我们可以使用官方的消费者群组方法,可以动态缩减机器。

  但是这个有状态就比较难做到了。

  以上痛点,总结下来就是,可扩展性问题。

 

想象中的轮子是怎么样的?

  1. 需要有个注册中心,管理机器的上下线监控;
  2. 需要有负载均衡器,负载将shard的负载均衡的分布到在线机器中;
  3. 需要有每个机器自己消费的分片记录,以使机器自身有据可查;
  4. 需要有每个分片的消费情况,以判定出哪些分片已分配给哪些机器;

 

我们来细看下实现:

【1】平衡协调器主框架:

import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.Shard; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.response.ListShardResponse; import com.test.common.config.LogHubProperties; import com.test.utils.RedisPoolUtil; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.test.dispatcher.work.RedisKeyConstants.MAX_CONSUMER_SHARD_LOAD; /** * loghub动态消费者 shard分配shard 协调器 * */ public class LoghubConsumerShardCoWorker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(LoghubConsumerShardCoWorker.class); private LogHubProperties logHubProperties; private RedisPoolUtil redisPoolUtil; private Client mClient; private ShardAssignMaster shardAssignMaster; private String HOST_NAME; public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties) { this(redisPoolUtil, logHubProperties, null); } public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties, String hostName) { this.redisPoolUtil = redisPoolUtil; this.logHubProperties = logHubProperties; this.HOST_NAME = hostName; initSharedVars(); initConsumerClient(); initShardAssigner(); getAllShardList(); registerSelfConsumer(); startHeartBeatThread(); } /** * 开启心跳线程,保活 */ private void startHeartBeatThread() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(() -> { String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME; redisPoolUtil.expire(serverConsumeCacheKey, 30); shardAssignMaster.sendHeartbeat(HOST_NAME); }, 30, 25, TimeUnit.SECONDS); } /** * 初始化客户端实例 */ private void initConsumerClient() { this.mClient = new Client(logHubProperties.getEndpoint(), logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey()); } /** * 初始化分片分配控制器 */ private void initShardAssigner() { shardAssignMaster = new ShardAssignMaster(redisPoolUtil); } /** * 初始化公共变量 */ private void initSharedVars() { try { if(HOST_NAME != null) { return; } HOST_NAME = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { logger.error("init error : 获取服务器主机名失败", e); throw new RuntimeException("init error : 获取服务器主机名失败"); } } /** * 将自己作为消费者注册到消费者列表中,以判定后续可以进行消费 */ private void registerSelfConsumer() { shardAssignMaster.registerConsumer(HOST_NAME); shardAssignMaster.sendHeartbeat(HOST_NAME); } @Override public void run() { try { checkConsumerSharding(); } catch (Exception e) { logger.error("动态分配shard 发生异常", e); } } /** * job 只做一件事,即检查 shard 的消费情况,不平衡则处理 */ private void checkConsumerSharding() { try { if (tryCoWorkerLock()) { // step1. 检查是否需要进行shard分配 // 集群消费loghub数据动态伸缩策略 // 1. 启动时先去获取部分片数,备用; // 2. 应用启动后,把自己注册到注册中心或redis中; // 3. 根据注册上来的机器列表,按平均分配策略分配shard(只能由一个机器来分配,其他机器处理分布式锁竞争失败,等待状态); // 4. 分配好后,释放锁,各机器开始消费,如机器A消费shard 0/3,则机器1以轮询的方式依次从shard 0/3 摘取数据消费; // 5. 分配好的数据结构为:prefix+ip保存具体数据,另外将自己的key添加到另一个zset中,标识自己存活;自己的key有效期为30秒;使用另一维度 shard,保存每个shard被占用情况,使用hash保存,key为shard,value为当有占用时为机器ip或主机名,当无占用时为null或空串; // 6. 以上数据刷入,将在机器抢占到shard更新数据;shard总数信息暂时不允许在运行期间进行变更;(即如果变理shard必须重启服务器) // 7. 机器下线时,占用的key将自动过期;(考虑是否主动删除) // 8. 各机器上启动一个后台扫描线程,每隔30秒扫描一次。扫描zset,取出所有值后查看是否存在相应的key,如果不存在说明机器已下线,需要重新分配其占用的shard; // 9. 重新分配策略,使用一致性hash算法实现; // 10. 机器上线时,使用一致性hash算法重新平衡shard; // 11. 使用分布式锁保证分配进程只有一个; CheckConsumerShardingResultContainer resultContainer = checkShardConsumerReBalanceStatus(); if(resultContainer.getStatusResultType() != ReBalanceStatusResultEnum.OK) { reBalanceConsumerShard(resultContainer); } } } finally { releaseCoWorkerLock(); } } /** * 确认机器和shard是否需要再平衡 * * @return 结果状态集 */ private CheckConsumerShardingResultContainer checkShardConsumerReBalanceStatus() { // step1. 检查自身是否存在shard, 不存在则立即进行一次重分配(消费者机器数大于分片数时,重平衡动作将是无效动作) // step2. 检查所有shard列表,是否有未被分配的shard,如有,立即触发一次重分配 // step3. 检查是否有负荷比较高的机器,如有触发平衡(功能预留,此功能需要基于统计信息) CheckConsumerShardingResultContainer resultContainer = new CheckConsumerShardingResultContainer(); final List<String> activeServersList = shardAssignMaster.getAllOnlineServerList(); final List<String> allShardList = getAllShardList(); // 计算空闲机器 Map<String, Integer> hostConsumeLoadCountMap = new HashMap<>(); List<String> idleServerList = filterIdleServerList(activeServersList, hostConsumeLoadCountMap); // 计算未被分配的shard List<String> unAssignedShardList = filterUnAssignedShardList(allShardList); // 根据资源信息,得出目前的负载状态 ReBalanceStatusResultEnum statusResult = computeReBalanceStatusOnResources( unAssignedShardList, idleServerList, hostConsumeLoadCountMap); resultContainer.setAllServerList(activeServersList); resultContainer.setAllShardList(allShardList); resultContainer.setIdleServerList(idleServerList); resultContainer.setUnAssignedShardList(unAssignedShardList); resultContainer.setServerConsumeShardLoad(hostConsumeLoadCountMap); resultContainer.setStatusResultType(statusResult); return resultContainer; } /** * 根据给定资源信息,计算出目前的负载状态 * * @param unAssignedShardList 未分配的shard列表 * @param idleServerList 空闲机器列表 * @param hostConsumeLoadMap 机器消费计数容器(负载情况) * @return 状态值 */ private ReBalanceStatusResultEnum computeReBalanceStatusOnResources( List<String> unAssignedShardList, List<String> idleServerList, Map<String, Integer> hostConsumeLoadMap) { // 没有未分配的shard,检测是否平衡即可 // 0. 有空闲机器,则直接分配给空闲机器即可 // 1. 最大消费shard-最小消费shard数 >= 2, 则说明有机器消费过多shard,需重分配 // 2. 机器负载平衡,无须调整 if(unAssignedShardList.isEmpty()) { int minConsume = MAX_CONSUMER_SHARD_LOAD; int maxConsume = 0; for (Map.Entry<String, Integer> entry : hostConsumeLoadMap.entrySet()) { int gotCount = entry.getValue(); if(gotCount > maxConsume) { maxConsume = gotCount; } if(gotCount < minConsume) { minConsume = gotCount; } } // 因有未分配的机器,假如现有的机器消费都是2,则需要重分配的大压力的机器 shard 给空闲机器 if(!idleServerList.isEmpty()) { if (maxConsume > 1) { return ReBalanceStatusResultEnum.HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED; } } // 有消费相差2的机器,重新分配,从大数上借调到小数上 if(maxConsume > minConsume + 1) { return ReBalanceStatusResultEnum.HEAVY_LOAD_BALANCE_NEEDED; } return ReBalanceStatusResultEnum.OK; } // 有可用shard // 3. 有空闲机器,直接让空闲shard分配给这些空闲机器就ok了 // 4. 没有空闲机器,须将空闲shard 分配给负载小的机器 if(idleServerList.isEmpty()) { return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS; } return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS; } /** * 过滤出空闲的机器列表 * * @param activeServersList 所有机器列表 * @return 空闲机器集, 且将各自消费数放入计数容器 */ private List<String> filterIdleServerList(List<String> activeServersList, Map<String, Integer> hostConsumeCountMap) { List<String> idleServerList = new ArrayList<>(); for (String hostname1 : activeServersList) { if(!shardAssignMaster.isConsumerServerAlive(hostname1)) { shardAssignMaster.invalidateOfflineServer(hostname1); continue; } int consumeCount; Set<String> consumeShardSet = shardAssignMaster.getServerDutyConsumeShardSet(hostname1); if(consumeShardSet == null || consumeShardSet.isEmpty()) { idleServerList.add(hostname1); consumeCount = 0; } else { consumeCount = consumeShardSet.size(); } hostConsumeCountMap.put(hostname1, consumeCount); } return idleServerList; } /** * 过滤出未分配的shard列表 * * @param allShardList 所有shard * @return 未分配的shard */ private List<String> filterUnAssignedShardList(List<String> allShardList) { List<String> unAssignedShardList = new ArrayList<>(); for (String shardId1 : allShardList) { String consumeHostname = shardAssignMaster.getShardAssignedServer(shardId1); // 如果不为空,则之前分配过,检查机器是否下线 // 如果为空,则是第一次分配 if(!StringUtils.isBlank(consumeHostname)) { if(!shardAssignMaster.isConsumerServerAlive(consumeHostname)) { // 清除下线机器信息,将当前shard置为空闲 shardAssignMaster.invalidateOfflineServer(consumeHostname); shardAssignMaster.invalidateShardAssignInfo(shardId1); unAssignedShardList.add(shardId1); } } else { unAssignedShardList.add(shardId1); } } return unAssignedShardList; } /** * 尝试获取协调者协调锁 * * 在集群环境中,只允许有一个协调器在运行 * * @return true:成功, false:失败,不得进行协调分配工作 */ private boolean tryCoWorkerLock() { return redisPoolUtil.getDistributedLock("distributedLock", HOST_NAME, 30); } /** * 释放协调锁,以便下次再竞争 */ private void releaseCoWorkerLock() { redisPoolUtil.releaseDistributedLock("distributedLock", HOST_NAME); } /** * 重新平衡消费者和shard的关系 * * @param resultContainer 待重平衡状态 */ private void reBalanceConsumerShard(CheckConsumerShardingResultContainer resultContainer) { // 集群消费loghub数据动态伸缩策略,根据负载状态,调用相应策略进行重平衡 StatusReBalanceStrategy strategy = StatusReBalanceStrategyFactory.createStatusReBalanceAlgorithm(resultContainer, shardAssignMaster); strategy.loadBalance(); } /** * 获取分片列表 * * @return 分片列表,如: 0,1,2,3 */ private List<String> getAllShardList() { // 实时读取列表 List<String> shardList = Lists.newArrayList(); try { ListShardResponse listShardResponse = mClient.ListShard(logHubProperties.getProjectName(), logHubProperties.getEventlogStore()); ArrayList<Shard> getShards = listShardResponse.GetShards(); for (Shard shard : getShards) { shardList.add(shard.GetShardId() + ""); } } catch (LogException e) { logger.error("loghub 获取shard列表 error :", e); } return shardList; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsspsp.html