接下来,我们需要往这张表中插入随机创建的数据。如果我们利用Python的第三方模块pymysql,每一次插入一条记录,那么一分钟插入53237条记录。
利用RabbitMQ,我们的生产者代码如下:
消费者代码如下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import time import pymysql # 打开数据库连接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) cursor.execute(body) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()我们开启9个终端,其中8个消费者1个生产者,先启动消费者,然后生产者,按照上面的数据导入方式,一分钟插入了133084条记录,是普通方式的2.50倍,效率有大幅度提升!
让我们稍微修改下生产者和消费者的代码,一次提交插入多条记录,减少每提交一次就插入一条记录的消耗时间。新的生产者代码如下:
新的消费者的代码如下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import json import time import pymysql # 打开数据库连接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) sql = 'insert into exam_users values(%s, %s, %s, %s)' cursor.executemany(sql, json.loads(body)) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()