Python服务端多进程压测工具

在大多数情况下,压测一般适用于IO密集型场景(如访问接口并等待返回),在这种场景下多线程多进程的区分并不明显(详情请参见GIL相关)。不过一旦出现词表参数加密、返回内容校验等事情的话,多进程对发送效率的提升还是很明显的。

可以指定发送QPS

可以指定发压的QPS,根据并行度和请求相应时间,可以估算出可发送QPS峰值。例如并行度是10,响应时间是100ms,那么QPS峰值应该是(1s/100ms * 10)=100,此工具可以将QPS稳定的维持在小于峰值的一个量上。

便于扩展

为什么要DIY压测工具了?一般的服务端压测工具,例如http_load和jmeter,不是http协议的,就是需要通过代码进行扩展。例如在压测thrift接口的时候,即使通过jmeter扩展Java程序也很麻烦。但是当涉及到场景化压测,或者是奇怪的SDK,例如本文要压测的接口是通过java代码自动生成的python消息类SDK,并且涉及到场景化的压测,很难通过一般的服务端压测工具搞定。

1、发压代码

解耦

下面是压测代码的实现,可以看到,我这里使用abc包,做了一个抽象类。

业务测试代码,例如自动化case,只要继承了这个抽象类,就获得压测的能力,做到压测和自动化测试的解耦。

这里有两个抽象方法

vocab() - 构造词表

press() - 发压逻辑

是被@abc.abstractmethod装饰器装饰,在子类中,是一定要被实现的。

run()方法是压测执行的方法,实现子类的词表方法和发压逻辑之后,直接调用run()方法就可以压测了。

固定QPS

固定QPS是通过管理进程实现的。可以看到有两种进程:

一种是worker_process进程,调用了press()发压逻辑函数,并且这个进程可以指定并发度concurrent,是实际的发压进程,值得注意的是在worker_process中使用了time.sleep(),是为了控制发送速度。

另一种是manager_process进程,这个进程每隔一段时间计算实际的qps,并和设置的qps比较,然后调整worker_process中的sleep时间,例如实际qps小于设定qps,那么就少睡一会儿。

这里不得不提到的是,多进程如何共享变量?

这里使用的是multiprocessing中的Manager包,这个包提供了多进程共享变量的能力,我这里用到的是Namespace数据结构来存储多进程的计数。在使用过程中我怀疑Manager Namespace是通过读写文件的形式进行进程间共享变量的,这个我没有深入的研究。

# -*- coding:utf-8 -*-
import abc
import time
from multiprocessing import Lock, Process, Manager


class Press(object):

__metaclass__ = abc.ABCMeta

def __init__(self, qps=100, concurrent=10):
        self.qps = qps
        self.concurrent = concurrent
        self.mutex = Lock()
        self.local = Manager().Namespace()
        self.local.count = 0
        self.local.sleep = 0.1
        self.manager_gap = 0.5
        self.precision = 0.1
        self.vocab_list = list()
        self.vocab()

def manager_process(self):
        while True:
            with self.mutex:
                current_qps = self.local.count / self.manager_gap
                self.local.count = 0
                print self.local.sleep, current_qps

if current_qps < self.qps:
                self.local.sleep = self.local.sleep * (1.0 - self.precision)
            else:
                self.local.sleep = self.local.sleep * (1.0 + self.precision)
            time.sleep(self.manager_gap)

def worker_process(self):
        while True:
            with self.mutex:
                self.local.count += 1
            time.sleep(self.local.sleep)
            self.press()

@abc.abstractmethod
    def vocab(self):
        return

@abc.abstractmethod
    def press(self):
        return

def run(self):
        processes = [Process(target=self.worker_process) for index in range(self.concurrent)]
        processes.append(Process(target=self.manager_process))
        for process in processes:
            process.start()
        for process in processes:
            process.join()

2、实际压测

给出一个发压的例子。分三步~

QueryVmPress继承了Press类,获得了发压能力。

然后实现了vocab方法,构造了词表。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/3ca0ee3300e5d6b8274865891ceeef6f.html