Python 使用python-kafka类库开发kafka生产者客户端

使用python-kafka类库开发kafka生产者&消费者&客户端

  By: 授客 QQ:1033553122

 

 

 

1.测试环境

python 3.4

 

zookeeper-3.4.13.tar.gz

下载地址1:

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下载地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下载地址1:

下载地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pip-18.1.tar.gz

下载地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

说明:实践中发现,pip版本比较旧的话,没法安装whl文件

 

kafka_python-1.4.4-py2.py3-none-any.whl

下载地址1:

https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl

 

下载地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

下载地址1:

https://www.lfd.uci.edu/~gohlke/pythonlibs/

 

下载地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

说明:

kafka-python支持gzip压缩/解压缩。如果要消费lz4方式压缩的消息,则需要安装python-lz4,如果要支持snappy方式压缩/解压缩则需要安装,否则可能会报错:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

构建生产者对象时,可通过compression_type 参数指定由对应生产者生产的消息数据的压缩方式,或者在producer.properties配置中配置compression.type参数。

 

参考链接:

https://pypi.org/project/kafka-python/#description

https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install

 

2.代码实践

生产者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

from kafka import KafkaProducer

import json

 

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

 

 

for i in range(0, 100):

    producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

 

# Block直到单条消息发送完或者超时

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print(result)

 

# Block直到所有阻塞的消息发送到网络

# 注意: 该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It is really only useful if you configure internal batching using linger_ms

 

 

# 序列化json数据

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

 

# 序列化字符串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

 

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range(2):

    producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

 

# 消息记录携带header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

 

# 获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时

metrics = producer.metrics()

print(metrics)

 

producer.flush()

 

实践中遇到错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解决方案如下:

进入到配置目录(config),编辑server.properties文件,

查找并设置listener,配置监听端口,格式:listeners = listener_name://host_name:port,供kafka客户端连接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

 

 

API及常用参数说明:

class kafka.KafkaProducer(**configs)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyxf.html