bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
key_serializer (可调用对象) –用于转换用户提供的key值为字节,必须返回字节数据。 如果为None,则等同调用f(key)。 默认值: None.
value_serializer(可调用对象) – 用于转换用户提供的value消息值为字节,必须返回字节数据。 如果为None,则等同调用f(value)。 默认值: None.
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
topic(str) – 设置消息将要发布到的主题,即消息所属主题
value(可选) – 消息内容,必须为字节数据,或者通过value_serializer序列化后的字节数据。如果为None,则key必填,消息等同于“删除”。( If value is None, key is required and message acts as a ‘delete’)
partition (int, 可选) – 指定分区。如果未设置,则使用配置的partitioner
key (可选) – 和消息对应的key,可用于决定消息发送到哪个分区。如果平partition为None,则相同key的消息会被发布到相同分区(但是如果key为None,则随机选取分区)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必须为字节数据或者通过配置的key_serializer序列化后的字节数据.
headers (可选) – 设置消息header,header-value键值对表示的list。list项为元组:格式 (str_header,bytes_value)
timestamp_ms (int, 可选) –毫秒数 (从1970 1月1日 UTC算起) ,作为消息时间戳。默认为当前时间
函数返回FutureRecordMetadata类型的RecordMetadata数据
flush(timeout=None)
发送所有可以立即获取的缓冲消息(即时linger_ms大于0),线程block直到这些记录发送完成。当一个线程等待flush调用完成而block时,其它线程可以继续发送消息。
注意:flush调用不保证记录发送成功
metrics(raw=False)
获取生产者性能指标。
参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
注:生产者代码是线程安全的,支持多线程,而消费者则不然
消费者
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
from kafka import KafkaConsumer
from kafka import TopicPartition
import json