|
@@ -1,239 +0,0 @@
|
|
|
-import json
|
|
|
-import re
|
|
|
-import traceback
|
|
|
-import jsonpath
|
|
|
-import requests
|
|
|
-
|
|
|
-from common.database_util import DatabaseUtil
|
|
|
-from common.log_util import error_log, logs, warring_log
|
|
|
-from common.yaml_util import write_yaml, read_yaml, read_config_yaml, write_yaml_projectCode
|
|
|
-from random1111 import DubugTalk
|
|
|
-
|
|
|
-
|
|
|
-class RequestUtil:
|
|
|
- def __init__(self, two_node):
|
|
|
- self.base_url = read_config_yaml("base", two_node)
|
|
|
-
|
|
|
- # 定义一个全局变量,类变量,通过类名调用
|
|
|
- sess = requests.session()
|
|
|
-
|
|
|
- def send_request(self, name, method, url, **kwargs):
|
|
|
- try:
|
|
|
- # 请求方式处理
|
|
|
- method = str(method).lower()
|
|
|
- # 基础路径拼接以及替换
|
|
|
- url = self.base_url + self.replace_value(url)
|
|
|
- # 参数替换
|
|
|
- for key, value in kwargs.items():
|
|
|
- if key in ['params', 'data', 'json', 'headers']:
|
|
|
- kwargs[key] = self.replace_value(value)
|
|
|
- elif key == 'files':
|
|
|
- for file_key, file_path in value.items():
|
|
|
- value[file_key] = open(file_path, 'rb')
|
|
|
- # 输出日志
|
|
|
-
|
|
|
- logs("请求名称:%s" % name)
|
|
|
- logs("请求方式:%s" % method)
|
|
|
- logs("请求路径:%s" % url)
|
|
|
- if "headers" in kwargs.keys():
|
|
|
- logs("请求头:%s" % kwargs["headers"])
|
|
|
- if "params" in kwargs.keys():
|
|
|
- logs("请求params参数:%s" % kwargs["params"])
|
|
|
- elif "data" in kwargs.keys():
|
|
|
- logs("请求data参数:%s" % kwargs["data"])
|
|
|
- elif "json" in kwargs.keys():
|
|
|
- logs("请求json参数:%s" % kwargs["json"])
|
|
|
- if "files" in kwargs.keys():
|
|
|
- logs("文件上传:%s" % kwargs["files"])
|
|
|
- # 请求
|
|
|
- res = RequestUtil.sess.request(method, url, **kwargs)
|
|
|
- logs("实际结果: %s" % res)
|
|
|
- return res
|
|
|
-
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- error_log("发送请求send_request异常:%s" % str(traceback.format_exc()))
|
|
|
-
|
|
|
- # 替换值的方法
|
|
|
- # 问题1:(替换url,params,data,json,headers)
|
|
|
- # 问题2:(string,float,int,dict)
|
|
|
- def replace_value(self, data):
|
|
|
- if data:
|
|
|
- # 保存数据类型
|
|
|
- data_type = type(data)
|
|
|
-
|
|
|
- # 判断数据类型
|
|
|
- if isinstance(data, dict) or isinstance(data, list):
|
|
|
- str_data = json.dumps(data)
|
|
|
- else:
|
|
|
- str_data = str(data)
|
|
|
- # 替换
|
|
|
- for cs in range(1, str_data.count('${') + 1):
|
|
|
- if "${" in str_data and "}" in str_data:
|
|
|
- start_index = str_data.index("${")
|
|
|
- end_index = str_data.index("}", start_index)
|
|
|
- old_index = str_data[start_index:end_index + 1]
|
|
|
- # 反射,通过类的对象和方法,字符串调用方法
|
|
|
- function_name = old_index[2:old_index.index('(')]
|
|
|
- args_value = old_index[old_index.index("(") + 1:old_index.index(")")]
|
|
|
- new_index = ""
|
|
|
- if args_value != "":
|
|
|
- args_value2 = args_value.split(',')
|
|
|
- new_index = getattr(DubugTalk(), function_name)(*args_value2)
|
|
|
- else:
|
|
|
- new_index = getattr(DubugTalk(), function_name)()
|
|
|
- if isinstance(new_index, int) or isinstance(new_index, float):
|
|
|
- str_data = str_data.replace('"' + old_index + '"', str(new_index))
|
|
|
- else:
|
|
|
- str_data = str_data.replace(old_index, str(new_index))
|
|
|
- # 还原数据类型
|
|
|
- if isinstance(data, dict) or isinstance(data, list):
|
|
|
- data = json.loads(str_data)
|
|
|
- else:
|
|
|
- data = data_type(str_data)
|
|
|
- return data
|
|
|
-
|
|
|
- # 规范yaml文件
|
|
|
- def standard_yaml(self, arg_names):
|
|
|
- try:
|
|
|
- logs("--------接口测试开始--------")
|
|
|
- arg_names_keys = arg_names.keys()
|
|
|
- # 判断一级关键字是否存在:name,request,validate
|
|
|
- if "name" in arg_names_keys and "request" in arg_names_keys and "validate" in arg_names_keys:
|
|
|
- # 判断requests下面是否包含method,url
|
|
|
- request_keys = arg_names['request'].keys()
|
|
|
- if "method" in request_keys and "url" in request_keys:
|
|
|
- logs("yaml框架检查基本通过")
|
|
|
- # pop返回该值,并删除
|
|
|
- name = arg_names.pop("name")
|
|
|
- method = arg_names['request'].pop("method")
|
|
|
- url = arg_names['request'].pop("url")
|
|
|
- # 发送请求
|
|
|
- res = self.send_request(name, method, url, **arg_names['request'])
|
|
|
- #获取接口的请求时间
|
|
|
- res_time = round(res.elapsed.total_seconds() * 1000, 2)
|
|
|
- act_time = read_config_yaml("request", "request_out_time")
|
|
|
- # 判断响应时间
|
|
|
- if res_time >= act_time:
|
|
|
- warring_log(
|
|
|
- "接口实际请求时间{res_time}毫秒,请求时间大于{act_time}毫秒,请关注".format(res_time=res_time,
|
|
|
- act_time=act_time))
|
|
|
- # TODO 发送通知
|
|
|
-
|
|
|
- # print(res.json())
|
|
|
- return_text = res.text
|
|
|
- # print(return_text)
|
|
|
- return_json = ""
|
|
|
- return_code = res.status_code
|
|
|
- # 提取值并写入extract.yaml文件里面
|
|
|
- try:
|
|
|
- return_json = res.json()
|
|
|
- except Exception as e:
|
|
|
- error_log("返回结果不是json格式,不能用这种方法")
|
|
|
- if "extract" in arg_names_keys:
|
|
|
- for key, value in arg_names['extract'].items():
|
|
|
- if "(.*?)" in value or "(.+?)" in value: # 正则
|
|
|
- # print("-------"+ value)
|
|
|
- zz_value = re.search(value, return_text)
|
|
|
- if zz_value:
|
|
|
- extract_value = {key: zz_value.group(1)}
|
|
|
- write_yaml(extract_value)
|
|
|
- else: # jsonpath
|
|
|
- # print("+++++"+value)
|
|
|
- js_value = jsonpath.jsonpath(return_json, value)
|
|
|
- if js_value:
|
|
|
- extract_value = {key: js_value[0]}
|
|
|
- write_yaml(extract_value)
|
|
|
- try:
|
|
|
- return_json = res.json()
|
|
|
-
|
|
|
- data = return_json["data"]["result"]
|
|
|
- for i in range(len(data)):
|
|
|
- write_yaml_projectCode({("projectCode" + str(i)): data[i]['projectCode']})
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- pass
|
|
|
-
|
|
|
- # 断言结果
|
|
|
- self.assert_result(arg_names['validate'], return_json, return_code)
|
|
|
-
|
|
|
- else:
|
|
|
- error_log("request下面是否包含method,url")
|
|
|
- else:
|
|
|
- error_log("一级关键字必须包括:name, request, validate")
|
|
|
- except Exception as e:
|
|
|
- error_log("规范yaml文件standard_yaml异常:%s" % str(traceback.format_exc()))
|
|
|
-
|
|
|
- # 断言方法
|
|
|
- def assert_result(self, yq_result, sj_result, return_code):
|
|
|
- try:
|
|
|
- logs("预期结果:%s" % yq_result)
|
|
|
- logs("实际结果:%s" % sj_result)
|
|
|
- # logs("实际结果:%s" % json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
|
|
|
- for yq in yq_result:
|
|
|
- for key, value in yq.items():
|
|
|
- # print(key, value)
|
|
|
- if key == "equals":
|
|
|
- self.assert_equals(value, sj_result, return_code)
|
|
|
-
|
|
|
- elif key == "contains":
|
|
|
- # self.assert_contains(value, json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
|
|
|
- self.assert_contains(value, sj_result)
|
|
|
-
|
|
|
-
|
|
|
- elif key == 'db_equals':
|
|
|
- self.database_assert(value, sj_result)
|
|
|
-
|
|
|
- else:
|
|
|
- error_log("框架暂时不支持此框架断言")
|
|
|
- logs("接口测试成功")
|
|
|
-
|
|
|
- logs("------接口测试结束————————\n")
|
|
|
- except Exception as e:
|
|
|
- logs("接口测试失败!!!")
|
|
|
- logs("------接口测试结束————————")
|
|
|
- error_log("断言assert_result异常:%s" % str(traceback.format_exc()))
|
|
|
-
|
|
|
- # 相等断言
|
|
|
- def assert_equals(self, value, sj_result, return_code):
|
|
|
- for assert_key, assert_value in value.items():
|
|
|
- # print(assert_value)
|
|
|
- if assert_key == "status_code":
|
|
|
- if assert_value != return_code:
|
|
|
- error_log("断言失败:状态码不等于%s" % assert_value)
|
|
|
- else:
|
|
|
- lists = jsonpath.jsonpath(sj_result, '$..%s' % assert_key)
|
|
|
- if lists:
|
|
|
- if assert_value not in lists:
|
|
|
- error_log("断言失败:" + assert_key + "不等于" + str(assert_value))
|
|
|
- else:
|
|
|
- error_log("断言失败:返回中的结果不存在:" + assert_key)
|
|
|
-
|
|
|
- # 包含断言
|
|
|
- def assert_contains(self, value, sj_result):
|
|
|
-
|
|
|
- if str(value).replace("{", "").replace("}", "") not in str(sj_result):
|
|
|
- error_log("断言失败:返回结果中不包含" + str(value))
|
|
|
-
|
|
|
- # 数据库断言
|
|
|
- def database_assert(self, value, sj_result):
|
|
|
- for sql, key in value.items():
|
|
|
- if key not in sj_result:
|
|
|
- error_log("数据库断言失败:返回结果中不包含" + str(key))
|
|
|
- else:
|
|
|
- res = None
|
|
|
- try:
|
|
|
- res = DatabaseUtil().execute_sql(sql)
|
|
|
- except:
|
|
|
- error_log("数据库断言失败:SQL查询异常:%s" % str(traceback.format_exc()))
|
|
|
- if res is None:
|
|
|
- error_log("数据库断言失败:SQL语句有语法错误或者查询结果为空")
|
|
|
- else:
|
|
|
- res_new = []
|
|
|
- for i in res:
|
|
|
- for k in i:
|
|
|
- res_new.append(k)
|
|
|
- if str(sj_result[key]) in res_new:
|
|
|
- logs("数据库断言成功:数据库查询结果" + str(res) + ",返回结果:" + str(sj_result[key]))
|
|
|
- else:
|
|
|
- error_log("数据库断言失败:SQL查询出的结果不等于接口返回结果")
|