Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量、灵活的offset是其它消息系统所没有的。
Kafka发送消息主要有三种方式:
1.发送并忘记 2.同步发送 3.异步发送+回调函数
下面以单节点的方式分别用三种方法发送1w条消息测试:
方式一:发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)
发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性:
1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer(bootstrap_servers=['192.168.33.11:9092'], 6 key_serializer=lambda k: pickle.dumps(k), 7 value_serializer=lambda v: pickle.dumps(v)) 8 9 start_time = time.time() 10 for i in range(0, 10000): 11 print('------{}---------'.format(i)) 12 future = producer.send('test_topic', key='num', value=i, partition=0) 13 14 # 将缓冲区的全部消息push到broker当中 15 producer.flush() 16 producer.close() 17 18 end_time = time.time() 19 time_counts = end_time - start_time 20 print(time_counts)