详细解析kafka之kafka分区和副本 (2)

kafka提供了两种让我们自己选择分区的方法,第一种是在发送producer的时候,在ProducerRecord中直接指定,但需要知道具体发送的分区index,所以并不推荐。

第二种则是需要实现Partitioner.class类,并重写类中的partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) 方法。后面在生成kafka producer客户端的时候直接指定新的分区类就可以了。

package kafkaconf; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; public class MyParatitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //key不能空,如果key为空的会通过轮询的方式 选择分区 if(keyBytes == null || (!(key instanceof String))){ throw new RuntimeException("key is null"); } //获取分区列表 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //以下是上述各种策略的实现,不能共存 //随机策略 return ThreadLocalRandom.current().nextInt(partitions.size()); //按消息键保存策略 return Math.abs(key.hashCode()) % partitions.size(); //自定义分区策略, 比如key为123的消息,选择放入最后一个分区 if(key.toString().equals("123")){ return partitions.size()-1; }else{ //否则随机 ThreadLocalRandom.current().nextInt(partitions.size()); } } @Override public void close() { } }

然后需要在生成kafka producer客户端的时候指定该类就行:

val properties = new Properties() ...... props.put("partitioner.class", "kafkaconf.MyParatitioner"); //主要这个配置指定分区类 ......其他配置 val producer = new KafkaProducer[String, String](properties) 2.kafka副本机制

说完了分区,再来说说副本。先说说副本的基本内容,在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。这多个副本中,只有一个是leader,而其他的都是follower副本。仅有leader副本可以对外提供服务。

多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。

这里通过问题来整理这部分内容。

kafka的副本都有哪些作用?

在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。

说说follower副本为什么不对外提供服务?

这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。

比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。

看吧,为了提高那么些性能而导致出现数据不一致问题,那显然是不值得的。

leader副本挂掉后,如何选举新副本?

如果你对zookeeper选举机制有所了解,就知道zookeeper每次leader节点挂掉时,都会通过内置id,来选举处理了最新事务的那个follower节点。

从结果上来说,kafka分区副本的选举也是类似的,都是选择最新的那个follower副本,但它是通过一个In-sync(ISR)副本集合实现。

kafka会将与leader副本保持同步的副本放到ISR副本集合中。当然,leader副本是一直存在于ISR副本集合中的,在某些特殊情况下,ISR副本中甚至只有leader一个副本。

当leader挂掉时,kakfa通过zookeeper感知到这一情况,在ISR副本中选取新的副本成为leader,对外提供服务。

但这样还有一个问题,前面提到过,有可能ISR副本集合中,只有leader,当leader副本挂掉后,ISR集合就为空,这时候怎么办呢?这时候如果设置unclean.leader.election.enable参数为true,那么kafka会在非同步,也就是不在ISR副本集合中的副本中,选取出副本成为leader,但这样意味这消息会丢失,这又是可用性和一致性的一个取舍了。

ISR副本集合保存的副本的条件是什么?

上面一直说ISR副本集合中的副本就是和leader副本是同步的,那这个同步的标准又是什么呢?

答案其实跟一个参数有关:replica.lag.time.max.ms。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygjjs.html