这个 Kafka 的专题,我会从系统整体架构,设计到代码落地。和大家一起杠源码,学技巧,涨知识。希望大家持续关注一起见证成长!
我相信:技术的道路,十年如一日!十年磨一剑!
往期文章
Kafka 探险 - 架构简介
Kafka 探险 - 源码环境搭建
前言
我们说 Kafka 是一个消息队列,其实更加确切的说:是 Broker 这个核心部件。为何这么说?你会发现我们可以通过控制台、 Java 代码、 C++ 代码、甚至是 Socket 向 Broker 写入消息,只要我们遵从了 Kafka 写入消息的协议,就可以将消息发送到 Kafka 队列中。
用专业一点的话术来说,Kafka 定义了一个应用层的网络协议,只要我们基于传输层构造出符合这个协议的数据,就是合法的 Kafka 消息。JavaProducer%22%7D%2C%7B%22x%22%3A1346.8406%2C%22y%22%3A414.28574%2C%22width%22%3A112.40290000000005%2C%22height%22%3A33.82388000000003%2C%22text%22%3A%22Broker%22%7D%2C%7B%22x%22%3A1000.9422%2C%22y%22%3A435.37033%2C%22width%22%3A175.31680000000006%2C%22height%22%3A34.42904999999996%2C%22text%22%3A%22msgProtocol%22%7D%2C%7B%22x%22%3A372.95627%2C%22y%22%3A452.45132%2C%22width%22%3A174.04476999999997%2C%22height%22%3A26.167329999999993%2C%22text%22%3A%22c%2B%2BProducer%22%7D%2C%7B%22x%22%3A815.04877%2C%22y%22%3A527.6074%2C%22width%22%3A178.67102999999997%2C%22height%22%3A25.92680000000007%2C%22text%22%3A%22inPlement%22%7D%2C%7B%22x%22%3A364.34558%2C%22y%22%3A614.1824%2C%22width%22%3A204.36992000000004%2C%22height%22%3A38.55813999999998%2C%22text%22%3A%22PythonProducer%22%7D%2C%7B%22x%22%3A374.32007%2C%22y%22%3A779.4053%2C%22width%22%3A182.90683%2C%22height%22%3A24.75909999999999%2C%22text%22%3A%22ScalaProducex%22%7D%5D%2C%22style%22%3A%22none%22%2C%22search%22%3A%22JavaProducer%20Broker%20msgProtocol%20c%2B%2BProducer%20inPlement%20PythonProducer%20ScalaProducex%22%2C%22margin%22%3A%7B%22top%22%3Atrue%2C%22bottom%22%3Atrue%7D%2C%22width%22%3A960%2C%22height%22%3A540%7D">
所以说我们写入 Kafka 消息的只是一个生产者的客户端,他的形式多种多样,有 Java ,Python,C++ 等多种实现,那么我们每次发消息难道还需要自己去实现这套发送消息的协议么?显然 Kafka 官方已经考虑到这个问题了,为了给我们提供 开箱即用 的消息队列,官方已经帮我们写好了各种语言的优质生产者实现,例如我们今天要讨论的 Java 版本的实现。
思考
前面提到 Kafka 帮我们实现了各个版本的生产者代码,其实他也可以完全不提供这份代码,因为核心的队列的功能已经实现了,这些客户端的代码也可以完全交由用户自己实现。
那么假如没有官方代码,我们又该实现一些什么功能,有哪些接口,哪些方法,以及如何组织这些代码呢。带着这样的问题我们一起来思考一下!一般对于这种带有数据流转的设计,我会从 由谁产生? 什么数据? 通往哪去? 如何保证通路可靠? 这几个方面来考虑。
消息自然是通过应用程序构造出来并提供给生产者,生产者首先要知道需要将消息发送到哪个 Broker 的哪个 Topic,以及 Topic 的具体 Partition 。那么必然需要配置客户端的 Broker集群地址 ,需要发送的 Topic 名称 ,以及 消息的分区策略 ,是指定到具体的分区还是通过某个 key hash 到不同的分区。
知道了消息要通往哪,还需要知道发送的是什么格式的消息,是字符串还是数字或是被序列化的二进制对象。 消息序列化 将需要消息序列化成字节数组才方便在网络上传输,所以要配置生产者的消息序列化策略,最好是可以通过传递枚举或者类名的方式自动构造序列化器,便于后续序列化过程的扩展。
从上面一篇文章 了解到:消息队列常常用于多个系统之间的异步调用,那么这种调用关系就没有强实时依赖。由于发消息到 Kafka 会产生 网络 I/O ,相对来说比较耗时,那么消息发送这一动作除了同步调用, 是否也可以设置为异步,提高生产者的吞吐呢? 。并且大量消息发送场景, 我们可以设置一个窗口,窗口可以是时间维度也可以是消息数量维度,将消息积攒起来批次发送,减少网络 I/O 次数,提高吞吐量。
最后呢为了保证消息可以最大程度的成功发送到 Broker ,我们还需要一些 失败重试机制 ,例如失败后放到重试队列中,隔一段时间尝试再次发送。
理清思路
通过上面的分析,我们会有一个大致的认识,应该会有哪些方法,以及底层的大致的设计会分为哪几个部分。但是不够清楚,不够明晰。
首先总结一下实现客户端的几个要点在于:
配置 Broker 基础信息:集群地址、Topic、Partition
消息序列化,通过可扩展的序列化器实现
消息异步写入缓冲区,网络 I/O 线程实现消息发送