import json import re import traceback import jsonpath import requests from common.database_util import DatabaseUtil from common.log_util import error_log, logs, warring_log from common.yaml_util import write_yaml, read_yaml, read_config_yaml, write_yaml_projectCode from random1111 import DubugTalk class RequestUtil: def __init__(self, two_node): self.base_url = read_config_yaml("base", two_node) # 定义一个全局变量,类变量,通过类名调用 sess = requests.session() def send_request(self, name, method, url, **kwargs): try: # 请求方式处理 method = str(method).lower() # 基础路径拼接以及替换 url = self.base_url + self.replace_value(url) # 参数替换 for key, value in kwargs.items(): if key in ['params', 'data', 'json', 'headers']: kwargs[key] = self.replace_value(value) elif key == 'files': for file_key, file_path in value.items(): value[file_key] = open(file_path, 'rb') # 输出日志 logs("请求名称:%s" % name) logs("请求方式:%s" % method) logs("请求路径:%s" % url) if "headers" in kwargs.keys(): logs("请求头:%s" % kwargs["headers"]) if "params" in kwargs.keys(): logs("请求params参数:%s" % kwargs["params"]) elif "data" in kwargs.keys(): logs("请求data参数:%s" % kwargs["data"]) elif "json" in kwargs.keys(): logs("请求json参数:%s" % kwargs["json"]) if "files" in kwargs.keys(): logs("文件上传:%s" % kwargs["files"]) # 请求 res = RequestUtil.sess.request(method, url, **kwargs) logs("实际结果: %s" % res) return res except Exception as e: error_log("发送请求send_request异常:%s" % str(traceback.format_exc())) # 替换值的方法 # 问题1:(替换url,params,data,json,headers) # 问题2:(string,float,int,dict) def replace_value(self, data): if data: # 保存数据类型 data_type = type(data) # 判断数据类型 if isinstance(data, dict) or isinstance(data, list): str_data = json.dumps(data) else: str_data = str(data) # 替换 for cs in range(1, str_data.count('${') + 1): if "${" in str_data and "}" in str_data: start_index = str_data.index("${") end_index = str_data.index("}", start_index) old_index = str_data[start_index:end_index + 1] # 反射,通过类的对象和方法,字符串调用方法 function_name = old_index[2:old_index.index('(')] args_value = old_index[old_index.index("(") + 1:old_index.index(")")] new_index = "" if args_value != "": args_value2 = args_value.split(',') new_index = getattr(DubugTalk(), function_name)(*args_value2) else: new_index = getattr(DubugTalk(), function_name)() if isinstance(new_index, int) or isinstance(new_index, float): str_data = str_data.replace('"' + old_index + '"', str(new_index)) else: str_data = str_data.replace(old_index, str(new_index)) # 还原数据类型 if isinstance(data, dict) or isinstance(data, list): data = json.loads(str_data) else: data = data_type(str_data) return data # 规范yaml文件 def standard_yaml(self, arg_names): try: logs("--------接口测试开始--------") arg_names_keys = arg_names.keys() # 判断一级关键字是否存在:name,request,validate if "name" in arg_names_keys and "request" in arg_names_keys and "validate" in arg_names_keys: # 判断requests下面是否包含method,url request_keys = arg_names['request'].keys() if "method" in request_keys and "url" in request_keys: logs("yaml框架检查基本通过") # pop返回该值,并删除 name = arg_names.pop("name") method = arg_names['request'].pop("method") url = arg_names['request'].pop("url") # 发送请求 res = self.send_request(name, method, url, **arg_names['request']) #获取接口的请求时间 res_time = round(res.elapsed.total_seconds() * 1000, 2) act_time = read_config_yaml("request", "request_out_time") # 判断响应时间 if res_time >= act_time: warring_log( "接口实际请求时间{res_time}毫秒,请求时间大于{act_time}毫秒,请关注".format(res_time=res_time, act_time=act_time)) # TODO 发送通知 # print(res.json()) return_text = res.text # print(return_text) return_json = "" return_code = res.status_code # 提取值并写入extract.yaml文件里面 try: return_json = res.json() except Exception as e: error_log("返回结果不是json格式,不能用这种方法") if "extract" in arg_names_keys: for key, value in arg_names['extract'].items(): if "(.*?)" in value or "(.+?)" in value: # 正则 # print("-------"+ value) zz_value = re.search(value, return_text) if zz_value: extract_value = {key: zz_value.group(1)} write_yaml(extract_value) else: # jsonpath # print("+++++"+value) js_value = jsonpath.jsonpath(return_json, value) if js_value: extract_value = {key: js_value[0]} write_yaml(extract_value) try: return_json = res.json() data = return_json["data"]["result"] for i in range(len(data)): write_yaml_projectCode({("projectCode" + str(i)): data[i]['projectCode']}) except Exception as e: pass # 断言结果 self.assert_result(arg_names['validate'], return_json, return_code) else: error_log("request下面是否包含method,url") else: error_log("一级关键字必须包括:name, request, validate") except Exception as e: error_log("规范yaml文件standard_yaml异常:%s" % str(traceback.format_exc())) # 断言方法 def assert_result(self, yq_result, sj_result, return_code): try: logs("预期结果:%s" % yq_result) logs("实际结果:%s" % sj_result) # logs("实际结果:%s" % json.loads(json.dumps(sj_result).replace(r"\\", "\\"))) for yq in yq_result: for key, value in yq.items(): # print(key, value) if key == "equals": self.assert_equals(value, sj_result, return_code) elif key == "contains": # self.assert_contains(value, json.loads(json.dumps(sj_result).replace(r"\\", "\\"))) self.assert_contains(value, sj_result) elif key == 'db_equals': self.database_assert(value, sj_result) else: error_log("框架暂时不支持此框架断言") logs("接口测试成功") logs("------接口测试结束————————\n") except Exception as e: logs("接口测试失败!!!") logs("------接口测试结束————————") error_log("断言assert_result异常:%s" % str(traceback.format_exc())) # 相等断言 def assert_equals(self, value, sj_result, return_code): for assert_key, assert_value in value.items(): # print(assert_value) if assert_key == "status_code": if assert_value != return_code: error_log("断言失败:状态码不等于%s" % assert_value) else: lists = jsonpath.jsonpath(sj_result, '$..%s' % assert_key) if lists: if assert_value not in lists: error_log("断言失败:" + assert_key + "不等于" + str(assert_value)) else: error_log("断言失败:返回中的结果不存在:" + assert_key) # 包含断言 def assert_contains(self, value, sj_result): if str(value).replace("{", "").replace("}", "") not in str(sj_result): error_log("断言失败:返回结果中不包含" + str(value)) # 数据库断言 def database_assert(self, value, sj_result): for sql, key in value.items(): if key not in sj_result: error_log("数据库断言失败:返回结果中不包含" + str(key)) else: res = None try: res = DatabaseUtil().execute_sql(sql) except: error_log("数据库断言失败:SQL查询异常:%s" % str(traceback.format_exc())) if res is None: error_log("数据库断言失败:SQL语句有语法错误或者查询结果为空") else: res_new = [] for i in res: for k in i: res_new.append(k) if str(sj_result[key]) in res_new: logs("数据库断言成功:数据库查询结果" + str(res) + ",返回结果:" + str(sj_result[key])) else: error_log("数据库断言失败:SQL查询出的结果不等于接口返回结果")