RabbitMQ入门(二)工作队列 (3)

exam_user数据库表结构

  接下来,我们需要往这张表中插入随机创建的数据。如果我们利用Python的第三方模块pymysql,每一次插入一条记录,那么一分钟插入53237条记录。
  利用RabbitMQ,我们的生产者代码如下:

# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:23 import pika from random import choice names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark', 'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David', 'Ella', 'Lake', 'Moon', 'Nake', 'Zoo'] places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao'] types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008', 'DG009', 'DG010', 'DG020'] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for id in range(1, 20000001): name = choice(names) place = choice(places) type2 = choice(types) message = "insert into exam_users values(%s, '%s', '%s', '%s');" % (id, name, place, type2) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()

  消费者代码如下:

# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import time import pymysql # 打开数据库连接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) cursor.execute(body) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()

我们开启9个终端,其中8个消费者1个生产者,先启动消费者,然后生产者,按照上面的数据导入方式,一分钟插入了133084条记录,是普通方式的2.50倍,效率有大幅度提升!
  让我们稍微修改下生产者和消费者的代码,一次提交插入多条记录,减少每提交一次就插入一条记录的消耗时间。新的生产者代码如下:

# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:23 import pika from random import choice import json names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark', 'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David', 'Ella', 'Lake', 'Moon', 'Nake', 'Zoo'] places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao'] types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008', 'DG009', 'DG010', 'DG020'] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for _ in range(1, 200001): values = [] for i in range(100): name = choice(names) place = choice(places) type2 = choice(types) values.append([100*_+i+1, name, place, type2]) message = json.dumps(values) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()

  新的消费者的代码如下:

# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import json import time import pymysql # 打开数据库连接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) sql = 'insert into exam_users values(%s, %s, %s, %s)' cursor.executemany(sql, json.loads(body)) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwgpjd.html