在大多数情况下,压测一般适用于IO密集型场景(如访问接口并等待返回),在这种场景下多线程多进程的区分并不明显(详情请参见GIL相关)。不过一旦出现词表参数加密、返回内容校验等事情的话,多进程对发送效率的提升还是很明显的。
可以指定发送QPS
可以指定发压的QPS,根据并行度和请求相应时间,可以估算出可发送QPS峰值。例如并行度是10,响应时间是100ms,那么QPS峰值应该是(1s/100ms * 10)=100,此工具可以将QPS稳定的维持在小于峰值的一个量上。
便于扩展
为什么要DIY压测工具了?一般的服务端压测工具,例如http_load和jmeter,不是http协议的,就是需要通过代码进行扩展。例如在压测thrift接口的时候,即使通过jmeter扩展Java程序也很麻烦。但是当涉及到场景化压测,或者是奇怪的SDK,例如本文要压测的接口是通过java代码自动生成的python消息类SDK,并且涉及到场景化的压测,很难通过一般的服务端压测工具搞定。
1、发压代码
解耦
下面是压测代码的实现,可以看到,我这里使用abc包,做了一个抽象类。
业务测试代码,例如自动化case,只要继承了这个抽象类,就获得压测的能力,做到压测和自动化测试的解耦。
这里有两个抽象方法
vocab() - 构造词表
press() - 发压逻辑
是被@abc.abstractmethod装饰器装饰,在子类中,是一定要被实现的。
run()方法是压测执行的方法,实现子类的词表方法和发压逻辑之后,直接调用run()方法就可以压测了。
固定QPS
固定QPS是通过管理进程实现的。可以看到有两种进程:
一种是worker_process进程,调用了press()发压逻辑函数,并且这个进程可以指定并发度concurrent,是实际的发压进程,值得注意的是在worker_process中使用了time.sleep(),是为了控制发送速度。
另一种是manager_process进程,这个进程每隔一段时间计算实际的qps,并和设置的qps比较,然后调整worker_process中的sleep时间,例如实际qps小于设定qps,那么就少睡一会儿。
这里不得不提到的是,多进程如何共享变量?
这里使用的是multiprocessing中的Manager包,这个包提供了多进程共享变量的能力,我这里用到的是Namespace数据结构来存储多进程的计数。在使用过程中我怀疑Manager Namespace是通过读写文件的形式进行进程间共享变量的,这个我没有深入的研究。
# -*- coding:utf-8 -*-
import abc
import time
from multiprocessing import Lock, Process, Manager
class Press(object):
__metaclass__ = abc.ABCMeta
def __init__(self, qps=100, concurrent=10):
self.qps = qps
self.concurrent = concurrent
self.mutex = Lock()
self.local = Manager().Namespace()
self.local.count = 0
self.local.sleep = 0.1
self.manager_gap = 0.5
self.precision = 0.1
self.vocab_list = list()
self.vocab()
def manager_process(self):
while True:
with self.mutex:
current_qps = self.local.count / self.manager_gap
self.local.count = 0
print self.local.sleep, current_qps
if current_qps < self.qps:
self.local.sleep = self.local.sleep * (1.0 - self.precision)
else:
self.local.sleep = self.local.sleep * (1.0 + self.precision)
time.sleep(self.manager_gap)
def worker_process(self):
while True:
with self.mutex:
self.local.count += 1
time.sleep(self.local.sleep)
self.press()
@abc.abstractmethod
def vocab(self):
return
@abc.abstractmethod
def press(self):
return
def run(self):
processes = [Process(target=self.worker_process) for index in range(self.concurrent)]
processes.append(Process(target=self.manager_process))
for process in processes:
process.start()
for process in processes:
process.join()
2、实际压测
给出一个发压的例子。分三步~
QueryVmPress继承了Press类,获得了发压能力。
然后实现了vocab方法,构造了词表。