request_util.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import json
  2. import re
  3. import traceback
  4. import jsonpath
  5. import requests
  6. from common.database_util import DatabaseUtil
  7. from common.log_util import error_log, logs
  8. from common.yaml_util import write_yaml, read_yaml, read_config_yaml, write_yaml_projectCode
  9. from random1111 import DubugTalk
  10. class RequestUtil:
  11. def __init__(self, two_node):
  12. self.base_url = read_config_yaml("base", two_node)
  13. # 定义一个全局变量,类变量,通过类名调用
  14. sess = requests.session()
  15. def send_request(self, name, method, url, **kwargs):
  16. try:
  17. # 请求方式处理
  18. method = str(method).lower()
  19. # 基础路径拼接以及替换
  20. url = self.base_url + self.replace_value(url)
  21. # 参数替换
  22. for key, value in kwargs.items():
  23. if key in ['params', 'data', 'json', 'headers']:
  24. kwargs[key] = self.replace_value(value)
  25. elif key == 'files':
  26. for file_key, file_path in value.items():
  27. value[file_key] = open(file_path, 'rb')
  28. # 输出日志
  29. logs("请求名称:%s" % name)
  30. logs("请求方式:%s" % method)
  31. logs("请求路径:%s" % url)
  32. if "headers" in kwargs.keys():
  33. logs("请求头:%s" % kwargs["headers"])
  34. if "params" in kwargs.keys():
  35. logs("请求params参数:%s" % kwargs["params"])
  36. elif "data" in kwargs.keys():
  37. logs("请求data参数:%s" % kwargs["data"])
  38. elif "json" in kwargs.keys():
  39. logs("请求json参数:%s" % kwargs["json"])
  40. if "files" in kwargs.keys():
  41. logs("文件上传:%s" % kwargs["files"])
  42. # 请求
  43. res = RequestUtil.sess.request(method, url, **kwargs)
  44. logs("实际结果: %s" % res)
  45. return res
  46. except Exception as e:
  47. error_log("发送请求send_request异常:%s" % str(traceback.format_exc()))
  48. # 替换值的方法
  49. # 问题1:(替换url,params,data,json,headers)
  50. # 问题2:(string,float,int,dict)
  51. def replace_value(self, data):
  52. if data:
  53. # 保存数据类型
  54. data_type = type(data)
  55. # 判断数据类型
  56. if isinstance(data, dict) or isinstance(data, list):
  57. str_data = json.dumps(data)
  58. else:
  59. str_data = str(data)
  60. # 替换
  61. for cs in range(1, str_data.count('${') + 1):
  62. if "${" in str_data and "}" in str_data:
  63. start_index = str_data.index("${")
  64. end_index = str_data.index("}", start_index)
  65. old_index = str_data[start_index:end_index + 1]
  66. # 反射,通过类的对象和方法,字符串调用方法
  67. function_name = old_index[2:old_index.index('(')]
  68. args_value = old_index[old_index.index("(") + 1:old_index.index(")")]
  69. new_index = ""
  70. if args_value != "":
  71. args_value2 = args_value.split(',')
  72. new_index = getattr(DubugTalk(), function_name)(*args_value2)
  73. else:
  74. new_index = getattr(DubugTalk(), function_name)()
  75. if isinstance(new_index, int) or isinstance(new_index, float):
  76. str_data = str_data.replace('"' + old_index + '"', str(new_index))
  77. else:
  78. str_data = str_data.replace(old_index, str(new_index))
  79. # 还原数据类型
  80. if isinstance(data, dict) or isinstance(data, list):
  81. data = json.loads(str_data)
  82. else:
  83. data = data_type(str_data)
  84. return data
  85. # 规范yaml文件
  86. def standard_yaml(self, arg_names):
  87. try:
  88. logs("--------接口测试开始--------")
  89. arg_names_keys = arg_names.keys()
  90. # 判断一级关键字是否存在:name,request,validate
  91. if "name" in arg_names_keys and "request" in arg_names_keys and "validate" in arg_names_keys:
  92. # 判断requests下面是否包含method,url
  93. request_keys = arg_names['request'].keys()
  94. if "method" in request_keys and "url" in request_keys:
  95. logs("yaml框架检查基本通过")
  96. # pop返回该值,并删除
  97. name = arg_names.pop("name")
  98. method = arg_names['request'].pop("method")
  99. url = arg_names['request'].pop("url")
  100. # 发送请求
  101. res = self.send_request(name, method, url, **arg_names['request'])
  102. # print(json.loads(json.dumps(res.json()).replace(r"\\", "\\")))
  103. return_text = res.text
  104. # print(return_text)
  105. return_json = ""
  106. return_code = res.status_code
  107. # 提取值并写入extract.yaml文件里面
  108. try:
  109. return_json = res.json()
  110. except Exception as e:
  111. error_log("返回结果不是json格式,不能用这种方法")
  112. if "extract" in arg_names_keys:
  113. for key, value in arg_names['extract'].items():
  114. if "(.*?)" in value or "(.+?)" in value: # 正则
  115. # print("-------"+ value)
  116. zz_value = re.search(value, return_text)
  117. if zz_value:
  118. extract_value = {key: zz_value.group(1)}
  119. write_yaml(extract_value)
  120. else: # jsonpath
  121. # print("+++++"+value)
  122. js_value = jsonpath.jsonpath(return_json, value)
  123. if js_value:
  124. extract_value = {key: js_value[0]}
  125. write_yaml(extract_value)
  126. try:
  127. return_json = res.json()
  128. data = return_json["data"]["result"]
  129. for i in range(len(data)):
  130. write_yaml_projectCode({("projectCode" + str(i)): data[i]['projectCode']})
  131. except Exception as e:
  132. pass
  133. # 断言结果
  134. self.assert_result(arg_names['validate'], return_json, return_code)
  135. else:
  136. error_log("request下面是否包含method,url")
  137. else:
  138. error_log("一级关键字必须包括:name, request, validate")
  139. except Exception as e:
  140. error_log("规范yaml文件standard_yaml异常:%s" % str(traceback.format_exc()))
  141. # 断言方法
  142. def assert_result(self, yq_result, sj_result, return_code):
  143. try:
  144. logs("预期结果:%s" % yq_result)
  145. logs("实际结果:%s" % json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
  146. # print(yq_result)
  147. for yq in yq_result:
  148. for key, value in yq.items():
  149. # print(key, value)
  150. if key == "equals":
  151. self.assert_equals(value, sj_result, return_code)
  152. elif key == "contains":
  153. self.assert_contains(value, json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
  154. elif key == 'db_equals':
  155. self.database_assert(value, sj_result)
  156. else:
  157. error_log("框架暂时不支持此框架断言")
  158. logs("接口测试成功")
  159. logs("------接口测试结束————————\n")
  160. except Exception as e:
  161. logs("接口测试失败!!!")
  162. logs("------接口测试结束————————")
  163. error_log("断言assert_result异常:%s" % str(traceback.format_exc()))
  164. # 相等断言
  165. def assert_equals(self, value, sj_result, return_code):
  166. for assert_key, assert_value in value.items():
  167. # print(assert_value)
  168. if assert_key == "status_code":
  169. if assert_value != return_code:
  170. error_log("断言失败:状态码不等于%s" % assert_value)
  171. else:
  172. lists = jsonpath.jsonpath(sj_result, '$..%s' % assert_key)
  173. if lists:
  174. if assert_value not in lists:
  175. error_log("断言失败:" + assert_key + "不等于" + str(assert_value))
  176. else:
  177. error_log("断言失败:返回中的结果不存在:" + assert_key)
  178. # 包含断言
  179. def assert_contains(self, value, sj_result):
  180. if str(value).replace("{", "").replace("}", "") not in str(sj_result):
  181. error_log("断言失败:返回结果中不包含" + str(value))
  182. # 数据库断言
  183. def database_assert(self, value, sj_result):
  184. for sql, key in value.items():
  185. if key not in sj_result:
  186. error_log("数据库断言失败:返回结果中不包含" + str(key))
  187. else:
  188. res = None
  189. try:
  190. res = DatabaseUtil().execute_sql(sql)
  191. except:
  192. error_log("数据库断言失败:SQL查询异常:%s" % str(traceback.format_exc()))
  193. if res is None:
  194. error_log("数据库断言失败:SQL语句有语法错误或者查询结果为空")
  195. else:
  196. res_new = []
  197. for i in res:
  198. for k in i:
  199. res_new.append(k)
  200. if str(sj_result[key]) in res_new:
  201. logs("数据库断言成功:数据库查询结果" + str(res) + ",返回结果:" + str(sj_result[key]))
  202. else:
  203. error_log("数据库断言失败:SQL查询出的结果不等于接口返回结果")