使用python设计并实现一个web性能测试工具,在此过程中,需要解决一系列技术问题,每一个问题的解决,都意味着在某一方面有所提升,下面罗列的是需要考虑和解决技术点:
以上既是技术点,也是需求点,接下来,逐个解决。下图是最终实现的效果图
想要获取源码,关注我微信公众号并回复:001
python有很多命令行工具库,我最喜欢的是click,简单实用。在编写setup.py文件时,需要编写entry_points,安装时,会自动生成可执行文件,linux系统下会将可执行文件放在/usr/bin目录下,windows下,则会放在python安装包的Scripts文件夹下。
开发这个性能测试工具,我们已经卖出了坚实的第一步,虽然一行代码都没有写,但工具如何使用这一关键问题已经被解决,也应当为这个项目起一个响亮的名字了:hibiscus
参考其他性能测试工具的设计与实现,我们的这个性能测试工具,应当支持以下功能:
接下来,使用click库完成对参数的接收和解析
import click
@click.command()
@click.option('-m', '--method', type=click.Choice(['GET', 'POST']), default='GET', help='选择GET或者POST')
@click.option('-p', '--params', multiple=True, help='以key=value形式设置请求参数,支持多个')
@click.option('-h', '--headers', multiple=True, help='以key=value形式设置header,支持多个')
@click.option('-t', '--timeout', type=int, default=3, help='请求超时时间')
@click.option('-n', '--count', type=int, default=100, help='请求数量')
@click.option('-c', '--concurrency', type=int, default=10, help='并发数量')
@click.option('-l', '--limit', type=int, default=0, help='每秒钟请求数量,默认为0,不限制')
@click.argument('url')
def main(method, params, headers, timeout, count, concurrency, limit, url):
print(method, params, headers, timeout, count, concurrency, limit, url)
if __name__ == '__main__':
main()
测试一下,执行命令 python hibiscus.py --help,屏幕上输出内容
Usage: hibiscus.py [OPTIONS] URL
Options:
-m, --method [GET|POST] 选择GET或者POST
-p, --params TEXT 以key=value形式设置请求参数,支持多个
-h, --headers TEXT 以key=value形式设置header,支持多个
-t, --timeout INTEGER 请求超时时间
-n, --count INTEGER 请求数量
-c, --concurrency INTEGER 并发数量
-l, --limit INTEGER 每秒钟请求数量,默认为0,不限制
--help Show this message and exit.
这些提示信息可以很好的指导使用者了解工具的功能,接下来再测试一下参数的解析,执行命令
python hibiscus.py -m GET -c 5 -n 300 -p name=xiaoming http://www.baidu.com
屏幕输出
GET ('name=xiaoming',) () 3 300 5 0 http://www.baidu.com
verygood!
作为一个web性能测试工具,需要每秒钟向被测试接口发送成千上万的请求。如果使用多线程,恐怕难以支撑,因此我选择使用协程。协程是轻量级线程,协程的并发,发生在一个线程之内,因此没有线程上下文切换的开销,性能更强。
python支持协程的库,比较有名的有gevent,此外还有eventlet,gevent我以前有过使用,还从来没有用过eventlet,这次就用它吧。
import eventlet
eventlet.monkey_patch()
class Hibiscus():
def __init__(self, *args, **kwargs):
self.method = kwargs['method']
self.params = self._parse_key_values(kwargs['params'])
self.headers = self._parse_key_values(kwargs['headers'])
self.timeout = kwargs['timeout']
self.count = kwargs['count']
self.concurrency = kwargs['concurrency']
self.limit = kwargs['limit']
self.url = kwargs['url']
def _parse_key_values(self, key_value_tuple):
"""
解析 params 和 headers
:param key_value_tuple:
:return:
"""
data = {}
for tup in key_value_tuple:
arrs = tup.split('=')
data[arrs[0]] = arrs[1]
return data
只是定义了Hibiscus这个类,解析了params和headers这两个参数,接下来,我需要实现request方法,发送请求。
发送请求,我使用requests这个库,原本它是不支持异步请求的,但是由于我执行了eventlet.monkey_patch(),打了补丁,所有的socket操作都将支持异步。
def _request(self):
if self.method == 'GET':
res = requests.get(self.url, params=self.params, headers=self.headers, timeout=self.timeout)
else:
res = requests.post(self.url, json=self.params, headers=self.headers, timeout=self.timeout)
这个方法实现的并不完整,后面会补充,接下来要做的是疯狂的发送请求,我需要实现一个run方法,使用协程发送请求。
def run(self):
pool = eventlet.GreenPool(self.concurrency)
if self.limit == 0:
for _ in pool.imap(lambda x: self._request(), range(self.count)):
pass
else: # 每秒中发送指定数量的请求
interval = 1.0 / self.limit
for i in range(self.count):
pool.spawn_n(self._request
time.sleep(interval)
至此,我已经实现了一个简单的web性能测试工具,它实在是太简单了,以至于没有任何输出,但是基本的轮廓已经有了,输出的部分在下一小节来补充,现在,我先来测试一下它的功能是否正确,能否跑的通。
执行命令,首先修改main函数
def main(method, params, headers, timeout, count, concurrency, limit, url):
hibisucs = Hibiscus(**locals())
hibisucs.run()
执行命令
python hibiscus.py -m GET -c 5 -n 300 http://www.baidu.com
。
有哪些接口测试信息需要输出呢,参考其他性能测试工具,总结如下:
为此,我们需要设计一个类,专门用来存储请求的结果并分析输出以上信息
from collections import defaultdict
SUCCESS_STATUS = 1
FAILED_STATUS = 2
class HttpResult():
def __init__(self, hibiscus):
self.hibiscus = hibiscus
self.responses = defaultdict(list)
def append_success(self, time_cost):
self.responses[SUCCESS_STATUS].append(time_cost)
def append_failed(self, time_cost):
self.responses[FAILED_STATUS].append(time_cost)
def out_put(self):
pass
为了实现测试信息的输出,我封装了HttpResult类,需要在Hibiscus类里修改三处
实例化HttpResult
修改Hibiscus的初始化方法,增加一行代码
self.http_result = HttpResult(self)
修改_request方法
def _request(self):
time_cost = 0
status = FAILED_STATUS
try:
if self.method == 'GET':
res = requests.get(self.url, params=self.params, headers=self.headers, timeout=self.timeout)
else:
res = requests.post(self.url, json=self.params, headers=self.headers, timeout=self.timeout)
time_cost = res.elapsed.total_seconds()
if 200 <= res.status_code < 400:
status = SUCCESS_STATUS
except:
pass # 失败的请求,耗时设置为0,做时间统计时,不处理
if status == SUCCESS_STATUS:
self.http_result.append_success(time_cost)
else:
self.http_result.append_failed(time_cost)
修改run方法
在run方法里,增加两行代码
pool.waitall()
self.http_result.out_put()
有三份信息需要输出
def out_put(self):
self._out_hibiscus_info()
self._out_time_cost()
self._out_distribution()
def _out_hibiscus_info(self):
"""
输出性能测试基本信息
:return:
"""
info = {
'concurrency': str(self.hibiscus.concurrency),
'count': str(self.hibiscus.count),
'success': str(len(self.responses[SUCCESS_STATUS])),
'fail': str(len(self.responses[FAILED_STATUS]))
}
line = "并发度:{concurrency:10} 请求总数:{count:10} 成功数量:{success:10} 失败数量:{fail}"
line = line.format(**info)
print(line)
def _out_time_cost(self):
"""
输出平均耗时,最大耗时,最小耗时
:return:
"""
success_lst = self.responses[SUCCESS_STATUS]
min_cost = str(round(min(success_lst), 4))
max_cost = str(round(max(success_lst), 4))
avg_cost = str(round(float(sum(success_lst))/len(success_lst), 4))
line = "平均耗时:{avg_cost:10} 最大耗时{max_cost:10} 最小耗时{min_cost}"
print(line.format(min_cost=min_cost, max_cost=max_cost, avg_cost=avg_cost))
def _out_distribution(self):
elapsed_sorted = sorted(self.responses[SUCCESS_STATUS])
print("响应时间分布情况:\n")
for p in [50, 60, 70, 80, 90, 95, 98, 99, 100, ]:
c = (len(elapsed_sorted) * p / 100) - 1
print("{:>12}%{:>10.2f}ms".format(p, elapsed_sorted[int(c)] * 1000))
在测试过程中,通过进度条来提示用户测试进度,我选用了一款比较酷的进度条工具---Rich
import time
from rich.live import Live
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
class HttpProgress():
def __init__(self, total):
self.job_progress = Progress(
"{task.description}",
SpinnerColumn(),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
)
self.job_id = self.job_progress.add_task("[green]hibiscus", total=total)
self.live = Live(self.job_progress, refresh_per_second=10)
def start(self):
self.live.start()
def stop(self):
self.live.stop()
def update_completed(self, completed):
self.job_progress.update(self.job_id, completed=completed)
再次修改run方法
def run(self):
self.progress = HttpProgress(self.count)
self.progress.start()
pool = eventlet.GreenPool(self.concurrency)
if self.limit == 0:
for _ in pool.imap(lambda x: self._request(), range(self.count)):
pass
else: # 每秒中发送指定数量的请求
interval = 1.0 / self.limit
for i in range(self.count):
pool.spawn_n(self._request)
time.sleep(interval)
pool.waitall()
self.progress.stop()
self.http_result.out_put()
更新进度条的逻辑,放在_request方法里,在方法末尾加一行代码
self.progress.update_completed(self.http_result.completed())
在编写setup.py文件时,设置console_scripts参数,这样在安装python包时,会生成一个可执行文件
from setuptools import find_packages, setup
setup(
name="hibiscus",
version='0.0.1',
description='web接口性能测试工具',
author='zhangdongsheng',
author_email='xigongda200608@163.com',
package_dir={"": "src"},
packages=find_packages(
where="src",
),
entry_points={
'console_scripts': [
'hibiscus = hibiscus.hibiscus:main',
]
}
)
安装时执行命令
python setup.py install
完成安装后,除了site-packages目录下会增加hibiscus包,在python的安装路径的Scripts文件夹下,还有两个名为hibiscus.exe和hibiscus-script.py文件,Scripts路径已经在环境变量path里做了配置,因此,在CMD命令窗口里,可以直接执行hibiscus这个命令,等同于调用hibiscus.py里的main函数
hibiscus -m GET -c 5 -n 300 http://www.baidu.com
输出结果
C:\Users\zhangdongsheng>hibiscus -m GET -c 5 -n 300 http://www.baidu.com
hibiscus ---------------------------------------- 100% 0:00:03
并发度:5 请求总数:300 成功数量:300 失败数量:0
平均耗时:0.045 最大耗时:1.0377 最小耗时0.0256
响应时间分布情况:
50% 31.35ms
60% 32.30ms
70% 33.35ms
80% 35.49ms
90% 38.91ms
95% 50.20ms
98% 133.57ms
99% 267.16ms
100% 1037.74ms
QQ交流群: 211426309