使用python-kafka类库开发kafka生产者&消费者&客户端
By: 授客 QQ:1033553122
1.测试环境
python 3.4
zookeeper-3.4.13.tar.gz
下载地址1:
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
下载地址2:
https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ
kafka_2.12-2.1.0.tgz
下载地址1:
下载地址2:
https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw
pip-18.1.tar.gz
下载地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw
说明:实践中发现,pip版本比较旧的话,没法安装whl文件
kafka_python-1.4.4-py2.py3-none-any.whl
下载地址1:
https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl
下载地址2:
https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg
python_snappy-0.5.3-cp34-cp34m-win_amd64.whl
下载地址1:
https://www.lfd.uci.edu/~gohlke/pythonlibs/
下载地址2:
https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg
说明:
kafka-python支持gzip压缩/解压缩。如果要消费lz4方式压缩的消息,则需要安装python-lz4,如果要支持snappy方式压缩/解压缩则需要安装,否则可能会报错:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.
构建生产者对象时,可通过compression_type 参数指定由对应生产者生产的消息数据的压缩方式,或者在producer.properties配置中配置compression.type参数。
参考链接:
https://pypi.org/project/kafka-python/#description
https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install
2.代码实践
生产者#-*- encoding:utf-8 -*-
__author__ = 'shouke'
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(0, 100):
producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)
# Block直到单条消息发送完或者超时
future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')
result = future.get(timeout=60)
print(result)
# Block直到所有阻塞的消息发送到网络
# 注意: 该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It is really only useful if you configure internal batching using linger_ms
# 序列化json数据
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('MY_TOPIC1', {'shouke':'kafka'})
# 序列化字符串key
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)
producer.send('MY_TOPIC1', b'shouke', key='strKey')
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')
for i in range(2):
producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))
# 消息记录携带header
producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])
# 获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时
metrics = producer.metrics()
print(metrics)
producer.flush()
实践中遇到错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解决方案如下:
进入到配置目录(config),编辑server.properties文件,
查找并设置listener,配置监听端口,格式:listeners = listener_name://host_name:port,供kafka客户端连接用的ip和端口,例中配置如下:
listeners=PLAINTEXT://127.0.0.1:9092
API及常用参数说明:
class kafka.KafkaProducer(**configs)