bootstrap_servers是kafka集群地址信息,下面事项主题user-event发送一条消息,send发送消息是异步的,会马上返回,因此我们要通过阻塞的方式等待消息发送成功(或者flush()也可以,flush会阻塞知道所有log都发送成功),否则消息可能会发送失败,但也不会有提示,关于上面这个可以通过删除send之后的语句试试,会发现broker不会收到消息,然后在send后加上time.sleep(10)之后,会看到broker收到消息。
from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers=[ "localhost:9093", "localhost:9094" ] ) future = producer.send("user-event", b'I am rito yan') try: record_metadata = future.get(timeout=10) print_r(record_metadata) except KafkaError as e: print(e)阻塞等待发送成功之后,会看到返回插入记录的信息:
RecordMetadata(topic='user-event', partition=7, topic_partition=TopicPartition(topic='user-event', partition=7), offset=1, timestamp=1528034253757, checksum=None, serialized_key_size=-1, serialized_value_size=13),里面包括了插入log的主题、分区等信息。
创建producer的时候可以通过value_serializer指定格式化函数,比如我们数据是个dict,可以指定格式化函数,将dict转化为byte:
import json producer = KafkaProducer( bootstrap_servers=[ "localhost:9093", "localhost:9094" ], value_serializer=lambda m: json.dumps(m).encode('ascii') ) future = producer.send("user-event", { "name": "燕睿涛", "age": 26, "friends": [ "ritoyan", "luluyrt" ] })这样就可以将格式化之后的信息发送给broker,不用每次发送的时候都自己格式化,真是不要太好用。
consumer 消费数据创建一个consumer,其中group_id是分组,broker中的每一个数据只能被consumer组中的一个consumer消费。
from kafka import KafkaConsumer consumer = KafkaConsumer( "user-event", group_id = "user-event-test", bootstrap_servers = [ "localhost:9093", "localhost:9094" ] ) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))启动之后,进程会一直阻塞在哪里,等broker中有消息的时候就会去消费,启动多个进程,只要保证group_id一致,就可以保证消息只被组内的一个consumer消费,上面的程序会输出:
user-event:8:2: key=None value=b'{"name": "\\u71d5\\u777f\\u6d9b", "age": 26, "friends": ["ritoyan", "luluyrt"]}'同样,进入的时候有value_serializer,出来的时候对应的也有value_deserializer,消费者可以配置value_deserializer来格式化内容,跟producer对应起来
consumer = KafkaConsumer( "user-event", group_id = "user-event-test", bootstrap_servers = [ "localhost:9093", "localhost:9094" ], value_deserializer=lambda m: json.loads(m.decode('ascii')) )输出内容user-event:8:3: key=None value={'name': '燕睿涛', 'age': 26, 'friends': ['ritoyan', 'luluyrt']}
kafka其他命令 查看分组我们的consumer可能有很多分组,可以通过西面的命令查看分组信息:
cd /path/to/kafka bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093,localhost:9094 --list可以看到我使用中的分组有4个,分别如下所示
clock-tick-test3 user-event-test clock-tick-test2 clock-tick-test 查看特定分组信息可以通过bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9093 --group user-event-test --describe,查看分组user-event-test的信息,可以看到西面的信息,包含消费的主题、分区信息,以及consumer在分区中的offset和分区的总offset。(为了格式化显示,删了部分列的部分字母)
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID user-event 3 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 0 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 1 1 1 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 2 1 1 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 4 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python user-event 9 1 1 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 8 4 4 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 7 2 2 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 6 1 1 0 kafka-python-78517 /127.0.0.1 kafka-python user-event 5 0 0 0 kafka-python-78517 /127.0.0.1 kafka-python 结语至此,kafka的基本使用算是掌握了,以后要是有机会在项目中实践就好了,在实际工程中的各种问题可以更加深刻的理解其中的原理。