当该接口的参数数据较多时,为维护方便,可将其保存在一个单独的json文件中,比如上面用例中的data_parkinside.json,就是保存该接口参数数据的一个文件,与测试用例文件在同一个目录下。测试执行时,通过解析该json文件中的test_name字段,获取属于自身用例的参数,参数文件的内容格式如下:
[ { "test_name": "parkinside_1", "parameter": { "token": "asdgfhh32456asfgrsfss", "vpl": "AJ3585" } }, { "test_name": "parkinside_3", "parameter": { "vpl": "AJ3585" } } ]该json文件保存了两条用例的参数,通过用例名parkinside_1获取到第一条用例的参数,通过用例名parkinside_3获取到第三条用例的参数(json参数文件中的用例名需与yaml用例文件中的用例名一致)。
当该接口的期望结果较长时,为维护方便,可将其保存在一个单独的json文件中,比如上面用例中的result_parkinside.json,就是保存该接口期望结果的一个文件,与测试用例文件在同一目录下。测试执行时,通过解析该json文件中的test_name字段,获取属于自身用例的期望结果,期望结果文件的内容格式如下:
[ { "json": { "vplInfo": { "userID":22, "username":"wuya", "vpl":"京AJ3585" }, "Parking_time_long":"20小时18分钟", "Parking fee":"20$" }, "test_name": "parkinside_1" } ]该json文件保存了一条用例的期望结果,通过用例parkinside_1获取到第一条用例的期望结果(json文件中的用例名需与yaml用例文件中的用例名一致)。
若该接口的测试用例需要引用函数或者变量,则可先在一个单独的relevance.ini关联配置文件中,定义好相关的变量和函数名,并进行拼接,后续可通过变量名,引入测试用例中,比如上面用例中的 ${sign}$ ,就是引用了关联配置文件中的 sign 变量值,relevance.ini关联配置文件的内容格式如下:
[relevance] nonce=$RandomString(5)$ timestamp = $GetTime(time_type=now,layout=13timestamp,unit=0,0,0,0,0)$ sign = $SHA1(asdh, ${nonce}$, ${timestamp}$)$上面配置中的nonce变量,引用了随机函数RandomString,该随机函数产生长度为5的随机数,这些函数的定义都已封装在functions模块中,在这里只需要通过对应的函数名,并存入参数即可引用相关函数。变量timestamp引用了时间戳函数,在这里将生成一个13位的时间戳,并传给变量timestamp。变量sign则是引用了加密函数SHA1,这里将会把字符串asdh、变量nonce的值和变量timestamp的值先拼接起来,然后再将拼接好的字符串传给加密函数SHA1加密。然后即可在用例中引用变量sign,如下:
# 请求参数 parameter: sign: ${sign}$ # 通过变量引用关联值 vpl: AJ3585若该接口的测试用例的期望结果中,需要引用变量来传递SQL语句,则可先在一个单独的sql_check.ini关联配置文件中,定义好相关的变量,并赋予SQL,后续可通过变量名,将SQL语句引入测试用例中,比如上面用例中的 ${common_sql}$,就是引用了关联配置文件中的 common_sql 变量值,这里可以定义一些共用的sql语句,避免冗余,方便维护,relevance.ini关联配置文件的内容格式如下:
[relevance] common_sql=select name,age,sex from user where id=1 parkinside_6_sql=select name,age,sex from user where id=2 4、单接口用例执行脚本单接口测试用例执行脚本,由程序根据yaml格式的测试用例文件自动生成,并根据相应yaml格式的测试用例文件所在的路径生成当前用例执行脚本的保存路径,且该用例执行脚本平时不需要人工维护,如下是接口parkinside的执行脚本test_parkinside.py的格式:
# -*- coding: utf-8 -*- import allure import pytest import time from Main import root_path, case_level, product_version, run_interval from common.unit.initializeYamlFile import ini_yaml from common.unit.initializePremise import ini_request from common.unit.apiSendCheck import api_send_check from common.unit.initializeRelevance import ini_relevance from common.unit import setupTest case_path = root_path + "/tests/TestCases/parkinsideApi" relevance_path = root_path + "/common/configModel/relevance" case_dict = ini_yaml(case_path, "parkinside") @allure.feature(case_dict["test_info"]["title"]) class TestParkinside: @pytest.fixture(scope="class") def setupClass(self): """ :rel: 获取关联文件得到的字典 :return: """ self.rel = ini_relevance(case_path, \'relevance\') #获取本用例初始公共关联值 self.relevance = ini_request(case_dict, case_path, self.rel) #执行完前置条件后,得到的本用例最新全部关联值 return self.relevance, self.rel @pytest.mark.skipif(case_dict["test_info"]["product_version"] in product_version, reason="该用例所属版本为:{0},在本次排除版本{1}内".format(case_dict["test_info"]["product_version"], product_version)) @pytest.mark.skipif(case_dict["test_info"]["case_level"] not in case_level, reason="该用例的用例等级为:{0},不在本次运行级别{1}内".format(case_dict["test_info"]["case_level"], case_level)) @pytest.mark.run(order=case_dict["test_info"]["run_order"]) @pytest.mark.parametrize("case_data", case_dict["test_case"], ids=[]) @allure.severity(case_dict["test_info"]["case_level"]) @pytest.mark.parkinside @allure.story("parkinside") @allure.issue("http://www.bugjira.com") # bug地址 @allure.testcase("http://www.testlink.com") # 用例连接地址 def test_parkinside(self, case_data, setupClass): """ 测试接口为:parkinside :param case_data: 测试用例 :return: """ self.relevance = setupTest.setupTest(relevance_path, case_data, setupClass) # 发送测试请求 api_send_check(case_data, case_dict, case_path, self.relevance) time.sleep(run_interval) if __name__ == \'__main__\': import subprocess subprocess.call([\'pytest\', \'-v\']) 5、封装请求协议apiMethod.py def post(header, address, request_parameter_type, timeout=8, data=None, files=None, cookie=None): """ post请求 :param header: 请求头 :param address: 请求地址 :param request_parameter_type: 请求参数格式(form_data,raw) :param timeout: 超时时间 :param data: 请求参数 :param files: 文件路径 :return: """ if \'form_data\' in request_parameter_type: for i in files: value = files[i] if \'/\' in value: file_parm = i files[file_parm] = (os.path.basename(value), open(value, \'rb\')) enc = MultipartEncoder( fields=files, boundary=\'--------------\' + str(random.randint(1e28, 1e29 - 1)) ) header[\'Content-Type\'] = enc.content_type response = requests.post(url=address, data=enc, headers=header, timeout=timeout, cookies=cookie) elif \'data\' in request_parameter_type: response = requests.post(url=address, data=data, headers=header, timeout=timeout, files=files, cookies=cookie) elif \'json\' in request_parameter_type: response = requests.post(url=address, json=data, headers=header, timeout=timeout, files=files, cookies=cookie) try: if response.status_code != 200: return response.status_code, response.text else: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, \'\' except simplejson.errors.JSONDecodeError: return response.status_code, \'\' except Exception as e: logging.exception(\'ERROR\') logging.error(e) raise def get(header, address, data, timeout=8, cookie=None): """ get请求 :param header: 请求头 :param address: 请求地址 :param data: 请求参数 :param timeout: 超时时间 :return: """ response = requests.get(url=address, params=data, headers=header, timeout=timeout, cookies=cookie) if response.status_code == 301: response = requests.get(url=response.headers["location"]) try: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, \'\' except simplejson.errors.JSONDecodeError: return response.status_code, \'\' except Exception as e: logging.exception(\'ERROR\') logging.error(e) raise def put(header, address, request_parameter_type, timeout=8, data=None, files=None, cookie=None): """ put请求 :param header: 请求头 :param address: 请求地址 :param request_parameter_type: 请求参数格式(form_data,raw) :param timeout: 超时时间 :param data: 请求参数 :param files: 文件路径 :return: """ if request_parameter_type == \'raw\': data = json.dumps(data) response = requests.put(url=address, data=data, headers=header, timeout=timeout, files=files, cookies=cookie) try: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, \'\' except simplejson.errors.JSONDecodeError: return response.status_code, \'\' except Exception as e: logging.exception(\'ERROR\') logging.error(e) raise def delete(header, address, data, timeout=8, cookie=None): """ delete请求 :param header: 请求头 :param address: 请求地址 :param data: 请求参数 :param timeout: 超时时间 :return: """ response = requests.delete(url=address, params=data, headers=header, timeout=timeout, cookies=cookie) try: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, \'\' except simplejson.errors.JSONDecodeError: return response.status_code, \'\' except Exception as e: logging.exception(\'ERROR\') logging.error(e) raise def save_cookie(header, address, request_parameter_type, timeout=8, data=None, files=None, cookie=None): """ 保存cookie信息 :param header: 请求头 :param address: 请求地址 :param timeout: 超时时间 :param data: 请求参数 :param files: 文件路径 :return: """ cookie_path = root_path + \'/common/configModel/relevance/cookie.ini\' if \'data\' in request_parameter_type: response = requests.post(url=address, data=data, headers=header, timeout=timeout, files=files, cookies=cookie) elif \'json\' in request_parameter_type: response = requests.post(url=address, json=data, headers=header, timeout=timeout, files=files, cookies=cookie) try: if response.status_code != 200: return response.status_code, response.text else: re_cookie = response.cookies.get_dict() cf = Config(cookie_path) cf.add_section_option(\'relevance\', re_cookie) for i in re_cookie: values = re_cookie[i] logging.debug("cookies已保存,结果为:{}".format(i+"="+values)) return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, \'\' except simplejson.errors.JSONDecodeError: return response.status_code, \'\' except Exception as e: logging.exception(\'ERROR\') logging.error(e) raise …………………… 6、封装方法apiSend.py:处理测试用例,拼接请求并发送 def send_request(data, project_dict, _path, relevance=None): """ 封装请求 :param data: 测试用例 :param project_dict: 用例文件内容字典 :param relevance: 关联对象 :param _path: case路径 :return: """ logging.info("="*100) try: # 获取用例基本信息 get_header =project_dict["test_info"].get("headers") get_host = project_dict["test_info"].get("host") get_address = project_dict["test_info"].get("address") get_http_type = project_dict["test_info"].get("http_type") get_request_type = project_dict["test_info"].get("request_type") get_parameter_type = project_dict["test_info"].get("parameter_type") get_cookies = project_dict["test_info"].get("cookies") get_file = project_dict["test_info"].get("file") get_timeout = project_dict["test_info"].get("timeout") except Exception as e: logging.exception(\'获取用例基本信息失败,{}\'.format(e)) try: # 如果用例中写了headers关键字,则用用例中的headers值(若该关键字没有值,则会将其值置为none),否则用全局headers get_header = data["headers"] except KeyError: pass try: # 替换成用例中相应关键字的值,如果用例中写了host和address,则使用用例中的host和address,若没有则使用全局传入的默认值 get_host = data["host"] except KeyError: pass try: get_address = data["address"] except KeyError: pass try: get_http_type = data["http_type"] except KeyError: pass try: get_request_type = data["request_type"] except KeyError: pass try: get_parameter_type = data["parameter_type"] except KeyError: pass try: get_cookies = data["cookies"] except KeyError: pass try: get_file = data["file"] except KeyError: pass try: get_timeout = data["timeout"] except KeyError: pass Cookie = None header = get_header if get_header: if isinstance(get_header, str): header = confManage.conf_manage(get_header, "header") # 处理请求头中的变量 if header == get_header: pass else: var_list = re.findall(\'\$.*?\$\', header) header = literal_eval(header) # 将字典类型的字符串,转成字典 # 处理请求头中的变量和函数 if var_list: # 将关联对象里的键值对遍历出来,并替换掉字典值中的函数 rel = dict() for key, value in header.items(): rel[key] = replace_random(value) header = rel logging.debug("替换请求头中的函数处理结果为:{}".format(header)) str_header = str(header) var_list = re.findall(\'\${.*?}\$\', str_header) if var_list: # 用自身关联对象里的变量值,替换掉自身关联对象里的变量 header = replaceRelevance.replace(header, header) str_header = str(header) var_list = re.findall(\'\$.*?\$\', str_header) if var_list: # 再次将关联对象里的键值对遍历出来,并替换掉字典值中的函数 rel = dict() for key, value in header.items(): rel[key] = replace_random(value) header = rel else: pass else: pass else: pass else: pass else: pass logging.debug("请求头处理结果为:{}".format(header)) if get_cookies is True: cookie_path = root_path + "/common/configModel/relevance" Cookie = ini_relevance(cookie_path, \'cookie\') # 为字典类型的字符串 logging.debug("cookie处理结果为:{}".format(Cookie)) else: pass parameter = readParameter.read_param(data["test_name"], data["parameter"], _path, relevance) #处理请求参数(含参数为文件的情况) logging.debug("请求参数处理结果:{}".format(parameter)) get_address = str(replaceRelevance.replace(get_address, relevance)) # 处理请求地址中的变量 logging.debug("请求地址处理结果:{}".format(get_address)) get_host = str(confManage.conf_manage(get_host, "host")) # host处理,读取配置文件中的host logging.debug("host处理结果:{}".format(get_host)) if not get_host: raise Exception("接口请求地址为空 {}".format(get_host)) logging.info("请求接口:{}".format(data["test_name"])) logging.info("请求地址:{}".format((get_http_type + "://" + get_host + get_address))) logging.info("请求头: {}".format(header)) logging.info("请求参数: {}".format(parameter)) # 通过get_request_type来判断,如果get_request_type为post_cookie;如果get_request_type为get_cookie if get_request_type.lower() == \'post_cookie\': with allure.step("保存cookie信息"): allure.attach("请求接口:", data["test_name"]) allure.attach("用例描述:", data["info"]) allure.attach("请求地址", get_http_type + "://" + get_host + get_address) allure.attach("请求头", str(header)) allure.attach("请求参数", str(parameter)) result = apiMethod.save_cookie(header=header, address=get_http_type + "://" + get_host + get_address, request_parameter_type=get_parameter_type, data=parameter, cookie=Cookie, timeout=get_timeout) elif get_request_type.lower() == \'post\': logging.info("请求方法: POST") if get_file: with allure.step("POST上传文件"): allure.attach("请求接口:",data["test_name"]) allure.attach("用例描述:", data["info"]) allure.attach("请求地址", get_http_type + "://" + get_host + get_address) allure.attach("请求头", str(header)) allure.attach("请求参数", str(parameter)) result = apiMethod.post(header=header, address=get_http_type + "://" + get_host + get_address, request_parameter_type=get_parameter_type, files=parameter, cookie=Cookie, timeout=get_timeout) else: with allure.step("POST请求接口"): allure.attach("请求接口:", data["test_name"]) allure.attach("用例描述:", data["info"]) allure.attach("请求地址", get_http_type + "://" + get_host + get_address) allure.attach("请求头", str(header)) allure.attach("请求参数", str(parameter)) result = apiMethod.post(header=header, address=get_http_type + "://" + get_host + get_address, request_parameter_type=get_parameter_type, data=parameter, cookie=Cookie, timeout=get_timeout) elif get_request_type.lower() == \'get\': with allure.step("GET请求接口"): allure.attach("请求接口:", data["test_name"]) allure.attach("用例描述:", data["info"]) allure.attach("请求地址", get_http_type + "://" + get_host + get_address) allure.attach("请求头", str(header)) allure.attach("请求参数", str(parameter)) logging.info("请求方法: GET") result = apiMethod.get(header=header, address=get_http_type + "://" + get_host + get_address, data=parameter, cookie=Cookie, timeout=get_timeout) elif get_request_type.lower() == \'put\': logging.info("请求方法: PUT") if get_file: with allure.step("PUT上传文件"): allure.attach("请求接口:", data["test_name"]) allure.attach("用例描述:", data["info"]) allure.attach("请求地址", get_http_type + "://" + get_host + get_address) allure.attach("请求头", str(header)) allure.attach("请求参数", str(parameter)) result = apiMethod.put(header=header, address=get_http_type + "://" + get_host + get_address, request_parameter_type=get_parameter_type, files=parameter, cookie=Cookie, timeout=get_timeout) else: with allure.step("PUT请求接口"): allure.attach("请求接口:", data["test_name"]) allure.attach("用例描述:", data["info"]) allure.attach("请求地址", get_http_type + "://" + get_host + get_address) allure.attach("请求头", str(header)) allure.attach("请求参数", str(parameter)) result = apiMethod.put(header=header, address=get_http_type + "://" + get_host + get_address, request_parameter_type=get_parameter_type, data=parameter, cookie=Cookie, timeout=get_timeout) elif get_request_type.lower() == \'delete\': with allure.step("DELETE请求接口"): allure.attach("请求接口:", data["test_name"]) allure.attach("用例描述:", data["info"]) allure.attach("请求地址", get_http_type + "://" + get_host + get_address) allure.attach("请求头", str(header)) allure.attach("请求参数", str(parameter)) logging.info("请求方法: DELETE") result = apiMethod.delete(header=header, address=get_http_type + "://" + get_host + get_address, data=parameter, cookie=Cookie, timeout=get_timeout) ………………………… else: result = {"code": False, "data": False} logging.info("没有找到对应的请求方法!") logging.info("请求接口结果:\n {}".format(result)) return result 7、测试结果断言封装checkResult.py def check_json(src_data, dst_data): """ 校验的json :param src_data: 检验内容 :param dst_data: 接口返回的数据 :return: """ if isinstance(src_data, dict): for key in src_data: if key not in dst_data: raise Exception("JSON格式校验,关键字%s不在返回结果%s中" % (key, dst_data)) else: this_key = key if isinstance(src_data[this_key], dict) and isinstance(dst_data[this_key], dict): check_json(src_data[this_key], dst_data[this_key]) elif isinstance(type(src_data[this_key]), type(dst_data[this_key])): raise Exception("JSON格式校验,关键字 %s 与 %s 类型不符" % (src_data[this_key], dst_data[this_key])) else: pass else: raise Exception("JSON校验内容非dict格式") def check_result(test_name, case, code, data, _path, relevance=None): """ 校验测试结果 :param test_name: 测试名称 :param case: 测试用例 :param code: HTTP状态 :param data: 返回的接口json数据 :param relevance: 关联值对象 :param _path: case路径 :return: """ # 不校验结果 if case["check_type"] == \'no_check\': with allure.step("不校验结果"): pass # json格式校验 elif case["check_type"] == \'json\': expected_result = case["expected_result"] if isinstance(case["expected_result"], str): expected_result = readExpectedResult.read_json(test_name, expected_result, _path, relevance) with allure.step("JSON格式校验"): allure.attach("期望code", str(case["expected_code"])) allure.attach(\'期望data\', str(expected_result)) allure.attach("实际code", str(code)) allure.attach(\'实际data\', str(data)) if int(code) == case["expected_code"]: if not data: data = "{}" check_json(expected_result, data) else: raise Exception("http状态码错误!\n {0} != {1}".format(code, case["expected_code"])) # 只校验状态码 elif case["check_type"] == \'only_check_status\': with allure.step("校验HTTP状态"): allure.attach("期望code", str(case["expected_code"])) allure.attach("实际code", str(code)) allure.attach(\'实际data\', str(data)) if int(code) == case["expected_code"]: pass else: raise Exception("http状态码错误!\n {0} != {1}".format(code, case["expected_code"])) # 完全校验 elif case["check_type"] == \'entirely_check\': expected_result = case["expected_result"] if isinstance(case["expected_result"], str): expected_result = readExpectedResult.read_json(test_name, expected_result, _path, relevance) with allure.step("完全校验结果"): allure.attach("期望code", str(case["expected_code"])) allure.attach(\'期望data\', str(expected_result)) allure.attach("实际code", str(code)) allure.attach(\'实际data\', str(data)) if int(code) == case["expected_code"]: result = operator.eq(expected_result, data) if result: pass else: raise Exception("完全校验失败! {0} ! = {1}".format(expected_result, data)) else: raise Exception("http状态码错误!\n {0} != {1}".format(code, case["expected_code"])) # 正则校验 elif case["check_type"] == \'Regular_check\': if int(code) == case["expected_code"]: try: result = "" if isinstance(case["expected_result"], list): with allure.step("正则校验"): for i in case["expected_result"]: result = re.findall(i.replace("\"","\\'"), str(data)) allure.attach(\'正则校验结果\n\',str(result)) allure.attach(\'实际data\', str(data)) else: result = re.findall(case["expected_result"].replace("\"", "\\'"), str(data)) with allure.step("正则校验"): allure.attach("期望code", str(case["expected_code"])) allure.attach(\'正则表达式\', str(case["expected_result"]).replace("\\'", "\"")) allure.attach("实际code", str(code)) allure.attach(\'实际data\', str(data)) allure.attach(case["expected_result"].replace("\"", "\\'") + \'校验完成结果\', str(result).replace("\\'", "\"")) if not result: raise Exception("正则未校验到内容! {}".format(case["expected_result"])) except KeyError: raise Exception("正则校验执行失败! {}\n正则表达式为空时".format(case["expected_result"])) else: raise Exception("http状态码错误!\n {0} != {1}".format(code, case["expected_code"])) # 数据库校验 elif case["check_type"] == "datebase_check": if isinstance(case["result_row_num"],int) and case["result_row_num"] >= 0: get_sel_sql = case["execute_sql"] conf_path = os.path.join(_path, \'sql_check.ini\') sel_sql = conf_manage(get_sel_sql, \'relevance\', conf_path) DB_obj = MySqlDB() sel_result = DB_obj.sel_operation(sel_sql) result_row_num = len(sel_result) if result_row_num == case["result_row_num"]: if result_row_num == 1: try: if isinstance(case["expected_result"], list): with allure.step("数据库校验"): for i in case["expected_result"]: # 需将待匹配数据中的括号替换成转义的普通括号,否则会匹配失败,括号会被当成正则表达式的一部分 result = re.findall(i.replace("\"", "\\'").replace("(", "\(").replace(")", "\)"), str(sel_result)) allure.attach(\'数据库校验结果\n\', str(result)) if not result: allure.attach(\'数据库实际返回结果\', str(sel_result)) raise Exception("数据库未校验到期望内容! {}".format(i)) allure.attach(\'数据库实际返回结果\', str(sel_result)) else: result = re.findall( case["expected_result"].replace("\"", "\\'").replace("(", "\(").replace(")", "\)"), str(sel_result)) with allure.step("数据库校验"): allure.attach(\'数据库校验结果\n\', str(result)) allure.attach(\'数据库实际返回结果\', str(sel_result)) if not result: raise Exception("数据库未校验到期望内容! {}".format(case["expected_result"])) except KeyError: raise Exception("数据库校验执行失败! {}\n正则表达式为空时".format(case["expected_result"])) else: with allure.step("数据库返回结果行数校验"): allure.attach(\'期望行数\', str(case["result_row_num"])) allure.attach(\'实际行数\', str(result_row_num)) else: raise Exception("返回的结果行数不对!\n 实际行数:{0} != 期望行数:{1}".format(result_row_num, case["result_row_num"])) else: raise Exception("用例中的结果行数result_row_num填写格式不对!") else: raise Exception("无该校验方式:{}".format(case["check_type"])) 8、共享模块conftest.py(初始化测试环境,制造测试数据,并还原测试环境) import allure import pytest from common.configModel import confRead from Main import root_path from common.unit.initializeYamlFile import ini_yaml from common.unit.initializeRelevance import ini_relevance from common.unit.apiSendCheck import api_send_check from common.configModel.confRead import Config import logging import os conf_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config", "apiConfig.ini") case_path = root_path + "/tests/CommonApi/loginApi" relevance_path = root_path + "/common/configModel/relevance" @pytest.fixture(scope="session", autouse=True) def setup_env(): # 定义环境;定义报告中environment Host = confRead.Config(conf_path).read_apiConfig("host") allure.environment(测试环境="online", hostName=Host["host"], 执行人="XX", 测试项目="线上接口测试") case_dict = ini_yaml(case_path, "login") # 参数化 fixture @pytest.fixture(scope="session", autouse=True, params=case_dict["test_case"]) def login(request): # setup """ :param request: 上下文 :param request.param: 测试用例 :return: """ # 清空关联配置 for i in ["GlobalRelevance.ini", "ModuleRelevance.ini"]: relevance_file = os.path.join(relevance_path, i) cf = Config(relevance_file) cf.add_conf("relevance") logging.info("执行全局用例依赖接口,初始化数据!") relevance = ini_relevance(relevance_path, "ModuleRelevance") if request.param["case_id"] == 1: relevance = ini_relevance(case_path, "relevance") logging.info("本用例最终的关联数据为:{}".format(relevance)) # 发送测试请求 api_send_check(request.param, case_dict, case_path, relevance) logging.info("初始化数据完成!") yield # teardown # 还原测试环境部分代码 …… …… logging.info("本轮测试已结束,正在还原测试环境!") 9、测试执行总入口Main.py(收集测试用例,批量执行并生成测试报告) import os import shutil import subprocess import pytest import logging from common.unit.initializeYamlFile import ini_yaml from common.utils.logs import LogConfig from common.script.writeCase import write_case from common.script.writeCaseScript import write_caseScript from common.utils.formatChange import formatChange from common.utils.emailModel.runSendEmail import sendEailMock root_path = os.path.split(os.path.realpath(__file__))[0] xml_report_path = root_path + "\\report\\xml" detail_report_path = root_path + "\\report\\detail_report" summary_report_path = root_path + "\\report\\summary_report\\summary_report.html" runConf_path = os.path.join(root_path, "config") # 获取运行配置信息 runConfig_dict = ini_yaml(runConf_path, "runConfig") case_level = runConfig_dict["case_level"] if not case_level: case_level = ["blocker", "critical", "normal", "minor", "trivial"] else: pass product_version = runConfig_dict["product_version"] if not product_version: product_version = [] else: pass isRun_switch = runConfig_dict["isRun_switch"] run_interval = runConfig_dict["run_interval"] writeCase_switch = runConfig_dict["writeCase_switch"] ProjectAndFunction_path = runConfig_dict["ProjectAndFunction_path"] if not ProjectAndFunction_path: ProjectAndFunction_path = "" else: pass scan_path = runConfig_dict["scan_path"] if not scan_path: scan_path = "" else: pass runTest_switch = runConfig_dict["runTest_switch"] reruns = str(runConfig_dict["reruns"]) reruns_delay = str(runConfig_dict["reruns_delay"]) log = runConfig_dict["log"] def batch(CMD): output, errors = subprocess.Popen(CMD, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() outs = output.decode("utf-8") return outs if __name__ == "__main__": try: LogConfig(root_path, log) if writeCase_switch == 1: # 根据har_path里的文件,自动生成用例文件yml和用例执行文件py,若已存在相关文件,则不再创建 write_case(root_path, ProjectAndFunction_path) elif writeCase_switch == 2: write_caseScript(root_path, scan_path) else: logging.info("="*20+"本次测试自动生成测试用例功能已关闭!"+"="*20+"\n") if runTest_switch == 1: # 清空目录和文件 email_target_dir = root_path + "/report/zip_report" # 压缩文件保存路径 shutil.rmtree(email_target_dir) if os.path.exists(summary_report_path): os.remove(summary_report_path) else: pass os.mkdir(email_target_dir) args = ["-k", runConfig_dict["Project"], "-m", runConfig_dict["markers"], "--maxfail=%s" % runConfig_dict["maxfail"], "--durations=%s" % runConfig_dict["slowestNum"], "--reruns", reruns, "--reruns-delay", reruns_delay, "--alluredir", xml_report_path, "--html=%s" % summary_report_path] test_result = pytest.main(args) # 全部通过,返回0;有失败或者错误,则返回1 cmd = "allure generate %s -o %s --clean" % (xml_report_path, detail_report_path) reportResult = batch(cmd) logging.debug("生成html的报告结果为:{}".format(reportResult)) # 发送report到邮件 emailFunction = runConfig_dict["emailSwitch"] if emailFunction == 1: if test_result == 0: ReportResult = "测试通过!" else: ReportResult = "测试不通过!" # 将字符中的反斜杠转成正斜杠 fileUrl_PATH = root_path.replace("\\", "http://www.likecs.com/") logging.debug("基础路径的反斜杠转成正斜杠为:{}".format(fileUrl_PATH)) fileUrl = "file:///{}/report/summary_report/summary_report.html".format(fileUrl_PATH) logging.info("html测试报告的url为:{}".format(fileUrl)) save_fn = r"{}\report\zip_report\summary_report.png".format(root_path) logging.debug("转成图片报告后保存的目标路径为:{}".format(save_fn)) formatChange_obj = formatChange() formatChange_obj.html_to_image(fileUrl, save_fn) email_folder_dir = root_path + "/report/detail_report" # 待压缩文件夹 logging.debug("待压缩文件夹为:{}".format(email_folder_dir)) sendEailMock_obj = sendEailMock() sendEailMock_obj.send_email(email_folder_dir, email_target_dir, runConfig_dict, ReportResult, save_fn) else: logging.info("="*20+"本次测试的邮件功能已关闭!"+"="*20+"\n") else: logging.info("="*20+"本次运行测试开关已关闭!"+"="*20+"\n") except Exception as err: logging.error("本次测试有异常为:{}".format(err)) 10、结合Allure生成报告
好的测试报告在整个测试框架是至关重要的部分,Allure是一个很好用的报告框架,不仅报告美观,而且方便CI集成。
Allure中对严重级别的定义:
Blocker级别:中断缺陷(客户端程序无响应,无法执行下一步操作)
Critical级别:临界缺陷(功能点缺失)
Normal级别:普通缺陷(数值计算错误)
Minor级别:次要缺陷(界面错误与UI需求不符)
Trivial级别:轻微缺陷(必输项无提示,或者提示不规范)