安装python,不会的请参考Linux安装配置python3.6环境
安装pika模块
pip install pika 实例介绍先从一个最简单的生产者/消费者说起
send.py
import pika HOST = '127.0.0.1' PORT = '5672' QUEUENAME = 'eeg' FILENAME = './eeg2.txt' EXCHANGE = 'eegs' ROUT_KEY = 'eeg' USERNAME = 'user' PASSWD = 'passwd' class Client(object): def __init__(self,host,queuename,filename,username=USERNAME,passwd=PASSWD): self.__host = host self.__name = queuename self.__filename = filename self.__username = username self.__passwd = passwd self.connect = self.connect_mq() # 连接mq def connect_mq(self): # 添加用户名和密码 credentials = pika.PlainCredentials(self.__username, self.__passwd) # 配置连接参数 parameters = pika.ConnectionParameters(host=self.__host,credentials=credentials) try: # 创建一个连接对象 connection = pika.BlockingConnection(parameters) except Exception as e: print(e) else: return connection # 创建一个信道 def channel_mq(self): channel = self.connect.channel() return channel # 打开文件 def open_data(self): try: with open(self.__filename,'r',encoding='utf-8') as f: data = f.read() return data except Exception as e: print(e) def run(self,channel): # 声明一个队列,durable参数声明队列持久化 channel.queue_declare(queue='eeg1',durable=True) # 使用默认交换机投递消息,返回TRUE或False channel.basic_publish(exchange='', routing_key='eeg1', body="hello wrold!", properties=pika.BasicProperties(delivery_mode=2)) # 关闭tcp连接 def close_connect(self): self.connect.close() # 关闭信道 def close_channel(self,channel): channel.close() if __name__ == '__main__': client = Client(HOST,MQ_NAME,FILENAME) channel = client.channel_mq() client.run(channel) client.close_connect()receiver.py
import pika HOST = '127.0.0.1' PORT = '5672' QUEUENAME = 'eeg' EXCHANGE = 'eegs' ROUT_KEY = 'eeg' USERNAME = 'user' PASSWD = 'passwd' class Receive(object): def __init__(self,host,queuename,username=USERNAME,passwd=PASSWD): self.__host = host self.__name = queuename self.__username = username self.__passwd = passwd # 连接mq队列 def connect_mq(self): # 添加用户名和密码 credentials = pika.PlainCredentials(self.__username, self.__passwd) # 配置连接参数 parameters = pika.ConnectionParameters(host=self.__host, credentials=credentials) # 创建一个连接对象 connection = pika.BlockingConnection(parameters) # 创建一个信道 channel = connection.channel() return channel def run(self): channel = self.connect_mq() # 订阅消息 channel.basic_consume(self.callback, queue=self.__name, no_ack=False) # print(' [*] Waiting for messages. To exit press CTRL+C') # 循环等待 channel.start_consuming() # 接收消息处理函数 def callback(self,ch, method, properties, body): print(" [x] Received %r" % str(body.decode('utf-8')) print('接收成功!') # 发送确认 ch.basic_ack(delivery_tag=method.delivery_tag) if __name__ == '__main__': receive = Receive(HOST,QUEUENAME) receive.run()我们来分析每一步的所使用的方法
创建一个连接connection
# 添加用户名和密码 credentials = pika.PlainCredentials(self.__username, self.__passwd) # 配置连接参数 parameters = pika.ConnectionParameters(host=self.__host, credentials=credentials) # 创建一个连接对象 connection = pika.BlockingConnection(parameters)