Browse Source

Initial commit

jiangwei_1 6 tháng trước cách đây
mục cha
commit
a4dda5d0d1

+ 111 - 0
DingDing.py

@@ -0,0 +1,111 @@
+import os
+import socket
+from pathlib import Path
+from dingtalkchatbot.chatbot import DingtalkChatbot
+import datetime
+# 获取jenkins构建信息和本次报告地址
+import jenkins  # 安装pip install python-jenkins
+
+from common.yaml_util import read_config_yaml, read_yaml_out_time_url, write_yaml_out_time_url
+
+# 获取本机IP
+s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+# s.connect(("8.8.8.8", 80))
+# ip = s.getsockname()[0]
+# jenkins登录地址
+jenkins_url = "http://121.229.175.82:63222/"
+# 获取jenkins对象
+server = jenkins.Jenkins(jenkins_url, username='jiangwei', password='jW@123456')  # Jenkins登录名 ,密码
+
+# job名称
+job_name = "job/qccq/"  # Jenkins运行任务名称
+# job的url地址
+job_url = jenkins_url + job_name
+# 获取最后一次构建
+job_last_build_url = server.get_info(job_name)['lastBuild']['url']
+# 报告地址
+report_url = job_last_build_url + 'allure'  # 'allure'为我的Jenkins全局工具配置中allure别名
+if "localhost" in report_url:
+    report_url = report_url.replace("localhost", ip)
+else:
+    report_url = report_url
+'''
+钉钉推送方法:
+读取report文件中"prometheusData.txt",循环遍历获取需要的值。
+使用钉钉机器人的接口,拼接后推送text
+'''
+
+
+# url = 'https://oapi.dingtalk.com/robot/send?access_token=f8566d2ed0feb8dc3bf2856df161241a8cf5a4c76af3d903abcabfdebebdff0e'  # webhook
+
+
+def DingTalkSend(info):
+    d = {}
+    # 获取项目绝对路径
+    path = Path(__file__).parent
+    file = '/report/export/prometheusData.txt'
+    print(rf"{path}" + rf"{file}")
+    # 打开prometheusData 获取需要发送的信息
+    # f = open(rf"{path}" + rf"{file}", 'r', encoding='UTF-8')
+    f = open(os.getcwd() + file, 'r', encoding='UTF-8')
+    for lines in f:
+        for c in lines:
+            launch_name = lines.strip('\n').split(' ')[0]
+            num = lines.strip('\n').split(' ')[1]
+            d.update({launch_name: num})
+    print(d)
+    f.close()
+    retries_run = d.get('launch_retries_run')  # 运行总数
+    print('运行总数:{}'.format(retries_run))
+    status_defects = d.get('launch_problems_test_defects')  # 未执行数
+    if status_defects is None:
+        status_defects = "0"
+    else:
+        status_defects = status_defects
+    print(status_defects)
+    status_passed = d.get('launch_status_passed')  # 通过数量
+    print('通过数量:{}'.format(status_passed))
+    status_failed = d.get('launch_status_failed')  # 不通过数量
+    print('不通过数量:{}'.format(status_failed))
+    out_time_url = str(read_yaml_out_time_url())
+    print(out_time_url)
+    # 钉钉推送
+    text = (
+                   f"<font color=\'#FFA500\'>[通知] </font>青春重庆-{info}报告"
+                   "\n\n用例运行总数: " + retries_run +
+                   # "\n\n用例未执行数: " + status_defects +
+                   "\n\n用例通过数量: " + status_passed +
+                   '''\n\n<font>用例失败数量: </font><font color=\'#FF0000\' size=2>%s</font> \n\n''' +
+                   "\n\n响应超过2s接口汇总:" + out_time_url +
+                   # "\n\n构建地址:\n" + job_url +
+                   "\n\n测试报告地址: \n" + report_url +
+                   "\n\n播报时间: " + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) % (status_defects)
+
+    dd_robot = DingtalkChatbot(read_config_yaml("dingding", "prod_webhook"))
+    ret = dd_robot.send_markdown(title='青春重庆', text=text, is_at_all=False)
+    print(ret)
+
+
+def dingtalk_send_warning(msg):
+    """
+    发送报警超时消息dingtalkchatbot
+    :param msg: 请求返回res
+    :return: none
+    """
+    try:
+        res_time = round(msg.elapsed.total_seconds() * 1000, 2)
+        text = f"<font color=\'#FFA500\'>[通知] </font>青春重庆-报警" \
+               f"\n\n>请求接口路径: {msg.url} " \
+               f"\n\n>请求体: {str(msg.request.body).replace('%', '')} " \
+               f"\n\n>接口响应时间: {res_time}毫秒 " \
+               f"\n\n>播报时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} "
+
+        dd_robot = DingtalkChatbot(read_config_yaml("dingding", "prod_webhook"))
+        dd_robot.send_markdown(title='青春重庆', text=text, is_at_all=False)
+    except Exception as e:
+        raise e
+
+
+if __name__ == '__main__':
+    # dingtalk_send_warning("测试")
+    DingTalkSend("生产APP巡检")

+ 0 - 0
common/__init__.py


+ 47 - 0
common/database_util.py

@@ -0,0 +1,47 @@
+import traceback
+from sqlite3 import Connection, Cursor
+
+# import MySQLdb
+import pymysql
+
+from common.log_util import error_log
+
+
+class DatabaseUtil:
+    def __init__(self):
+        self.cs = None
+        self.conn = None
+
+    def create_conn(self):
+        self.conn: Connection = pymysql.connect(
+            host='10.12.43.2',
+            user='root',
+            password='Xx98$kdiesJS',
+            database='jwtest',
+            port=3409
+        )
+        return self.conn
+
+    # 执行sql语句
+    def execute_sql(self, sql):
+
+        # 创建游标
+        self.cs: Cursor = self.create_conn().cursor()
+        # 通过游标对象,执行SQL
+        try:
+            self.cs.execute(sql)
+        except:
+            error_log("SQL查询异常:%s" % str(traceback.format_exc()))
+
+            # 提取值
+        value = self.cs.fetchall()
+        self.close_resuoe()
+        print(value)
+        return value
+
+    def close_resuoe(self):
+        self.cs.close()
+        self.conn.close()
+
+# if __name__ == '__main__':
+#     print(DatabaseUtil().execute_sql('select * from uisql '))

+ 28 - 0
common/encrypt.py

@@ -0,0 +1,28 @@
+import base64
+import json
+import time
+import uuid
+from Crypto.Cipher import AES
+
+
+def encrypt_padding(string: str, block_size: int) -> str:
+    length = len(string.encode('utf-8'))
+    add_length = block_size - (length % block_size)
+    return string + (chr(add_length) * add_length)
+
+
+def aes_encrypt(string: str, aes_key: str) -> str:
+    cipher = AES.new(aes_key.encode('utf-8'), AES.MODE_ECB)
+    cipher_text = cipher.encrypt(encrypt_padding(string, AES.block_size).encode('utf-8'))
+    cipher_text = base64.b64encode(cipher_text)
+    return cipher_text.decode()
+
+
+def genRat(url: str,emp:int) -> str:
+    mi = {
+        't': int(time.time() * 1000),
+        'w': str(uuid.uuid4()),
+        'r': url,
+        'u': emp
+    }
+    return aes_encrypt(json.dumps(mi), '253140fdc9c44b0cb903c387b3e85d64')

+ 104 - 0
common/log_util.py

@@ -0,0 +1,104 @@
+import logging
+import colorlog
+import time
+
+from common.yaml_util import get_project_path, read_config_yaml
+
+
+class LoggerUtil:
+    def create_log(self, logger_name='log',level='INFO'):
+
+        level_relations = {
+            'debug': logging.DEBUG,
+            'info': logging.INFO,
+            'warning': logging.WARNING,
+            'error': logging.ERROR,
+            'crit': logging.CRITICAL
+        }
+
+
+        # 创建一个日志对象
+        self.logger = logging.getLogger(logger_name)
+        # 设置全局的日志级别(从低到高:debug调试<info信息<warning警告<error错误<critical严重)
+        self.logger.setLevel(logging.DEBUG)
+        if not self.logger.handlers:
+            # ------文件日志------
+            # 1创建文件日志路径
+            self.file_log_path = get_project_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
+                int(time.time())) + ".log"
+            # 2创建文件日志控制器
+            self.file_hander = logging.FileHandler(self.file_log_path, encoding='UTF-8')
+            # 3设置文件日志级别
+            # file_log_level = str(read_config_yaml("log", "log_level")).lower()
+            # 设置日志显示级别
+            self.logger.setLevel(level_relations.get(level))
+
+            # if file_log_level == "debug":
+            #     self.file_hander.setLevel(logging.DEBUG)
+            # elif file_log_level == "info":
+            #     self.file_hander.setLevel(logging.INFO)
+            # elif file_log_level == "warning":
+            #     self.file_hander.setLevel(logging.WARNING)
+            # elif file_log_level == "error":
+            #     self.file_hander.setLevel(logging.ERROR)
+            # elif file_log_level == "critical":
+            #     self.file_hander.setLevel(logging.CRITICAL)
+            # else:
+            #     self.file_hander.setLevel(logging.DEBUG)
+            # 4创建文件的格式
+            self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
+            # 将文件日志的控制器加入日志对象
+            self.logger.addHandler(self.file_hander)
+            # 创建控制台日志
+            # 1创建控制台日志控制器
+            self.console_hander = logging.StreamHandler()
+            # 2设置控制台日志级别
+            self.logger.setLevel(level_relations.get(level))
+
+            # console_log_level = str(read_config_yaml("log", "log_level")).lower()
+            # if console_log_level == "debug":
+            #     self.console_hander.setLevel(logging.DEBUG)
+            # elif console_log_level == "info":
+            #     self.console_hander.setLevel(logging.INFO)
+            # elif console_log_level == "warning":
+            #     self.console_hander.setLevel(logging.WARNING)
+            # elif console_log_level == "error":
+            #     self.console_hander.setLevel(logging.ERROR)
+            # elif console_log_level == "critical":
+            #     self.console_hander.setLevel(logging.CRITICAL)
+            # else:
+            #     self.console_hander.setLevel(logging.DEBUG)
+
+            # formatter = colorlog.ColoredFormatter(
+            #     '%(log_color)s[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s',
+            #     log_colors=log_colors_config)
+            formatter = colorlog.ColoredFormatter(
+                '%(log_color)s[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s',
+                log_colors=read_config_yaml("log","log_colors_config"))
+
+            # 3创建控制台的日志格式
+            self.console_hander.setFormatter(formatter)
+            # 将控制台日志加入日志对象
+            self.logger.addHandler(self.console_hander)
+
+        # 返回包含文件日志控制器和控制台日志控制器的日志对象
+        return self.logger
+
+
+# 错误日志的输出
+def error_log(message):
+    # LoggerUtil().create_log(level="error").error(message)
+    raise Exception(message)
+
+
+def warring_log(message):
+    LoggerUtil().create_log(level="warning").warning(message)
+    # raise Exception(message)
+
+
+
+# 信息日志的输出
+def logs(message):
+    LoggerUtil().create_log(level="info").info(message)
+    # raise Exception(message)
+

+ 57 - 0
common/parameterize.py

@@ -0,0 +1,57 @@
+import json
+import traceback
+import yaml
+
+from common.yaml_util import read_data_yaml, get_project_path
+from common.log_util import error_log
+
+
+# 读取测试用列
+def read_testcase(yaml_path):
+    try:
+        # with open(get_project_path() + yaml_path, mode='r', encoding='utf-8') as f:
+        with open(yaml_path, mode='r', encoding='utf-8') as f:
+            arg_names = yaml.load(stream=f, Loader=yaml.FullLoader)
+            if len(arg_names) >= 2:
+                return arg_names
+            else:
+                if "parameterize" in dict(*arg_names).keys():
+                    new_arg_names = ddt(*arg_names)
+                    return new_arg_names
+                else:
+                    return arg_names
+    except Exception as e:
+        error_log("读取测试用例方法read_testcase异常:%s" % str(traceback.format_exc()))
+
+
+def ddt(arg_names):
+    # 新方法
+    try:
+        arg_names_str = json.dumps(arg_names)
+        data_list = arg_names["parameterize"]
+        key_length = len(data_list[0])
+        # print(key_length)
+        length_success = True
+        # 循环数据
+        for param in arg_names["parameterize"]:
+            if len(param) != key_length:
+                length_success = False
+                error_log("此条数据有误:%s" % param)
+        # 替换值
+        new_art_names = []
+        if length_success:
+            for x in range(1, len(data_list)):
+                # print(data_list[x])
+                temp_arg_names = arg_names_str
+                for y in range(0, len(data_list[x])):
+                    if isinstance(data_list[x][y], int) or isinstance(data_list[x][y], float):
+                        temp_arg_names = temp_arg_names.replace('"$ddt{' + data_list[0][y] + '}"', str(data_list[x][y]))
+                    else:
+                        temp_arg_names = temp_arg_names.replace("$ddt{" + data_list[0][y] + "}", str(data_list[x][y]))
+                new_art_names.append(json.loads(temp_arg_names))
+            # print(new_art_names)
+        return new_art_names
+    except Exception as e:
+        error_log("数据驱动ddt异常:%s" % str())
+
+

+ 240 - 0
common/request_util.py

@@ -0,0 +1,240 @@
+import json
+import re
+import traceback
+import jsonpath
+import requests
+
+from common.database_util import DatabaseUtil
+from common.encrypt import genRat
+from common.log_util import error_log, logs
+from common.yaml_util import write_yaml, read_config_yaml, write_yaml_out_time_url
+from random1111 import DubugTalk
+
+
+class RequestUtil:
+    def __init__(self, two_node):
+        self.base_url = read_config_yaml("base", two_node)
+
+    # 定义一个全局变量,类变量,通过类名调用
+    sess = requests.session()
+
+    def send_request(self, name, method, url, **kwargs):
+        try:
+            # 请求方式处理
+            method = str(method).lower()
+            # 基础路径拼接以及替换
+            url_old = self.replace_value(url)
+            url = self.base_url + self.replace_value(url)
+            # 参数替换
+            for key, value in kwargs.items():
+                if key in ['params', 'data', 'json', 'headers']:
+                    kwargs[key] = self.replace_value(value)
+                    if key == 'headers':
+                        kwargs['headers']["Mi"]= DubugTalk().genRat(str(url_old))
+                elif key == 'files':
+                    for file_key, file_path in value.items():
+                        value[file_key] = open(file_path, 'rb')
+            # 输出日志
+
+            logs("请求名称:%s" % name)
+            logs("请求方式:%s" % method)
+            logs("请求路径:%s" % url)
+            if "headers" in kwargs.keys():
+                logs("请求头:%s" % kwargs["headers"])
+            if "params" in kwargs.keys():
+                logs("请求params参数:%s" % kwargs["params"])
+            elif "data" in kwargs.keys():
+                logs("请求data参数:%s" % kwargs["data"])
+            elif "json" in kwargs.keys():
+                logs("请求json参数:%s" % kwargs["json"])
+            if "files" in kwargs.keys():
+                logs("文件上传:%s" % kwargs["files"])
+            # 请求
+            res = RequestUtil.sess.request(method, url, **kwargs)
+            logs("实际结果: %s" % res)
+            return res
+
+
+        except Exception as e:
+            error_log("发送请求send_request异常:%s" % str(traceback.format_exc()))
+
+    # 替换值的方法
+    # 问题1:(替换url,params,data,json,headers)
+    # 问题2:(string,float,int,dict)
+    def replace_value(self, data):
+        if data:
+            # 保存数据类型
+            data_type = type(data)
+
+            # 判断数据类型
+            if isinstance(data, dict) or isinstance(data, list):
+                str_data = json.dumps(data)
+            else:
+                str_data = str(data)
+            # 替换
+            for cs in range(1, str_data.count('${') + 1):
+                if "${" in str_data and "}" in str_data:
+                    start_index = str_data.index("${")
+                    end_index = str_data.index("}", start_index)
+                    old_index = str_data[start_index:end_index + 1]
+                    # 反射,通过类的对象和方法,字符串调用方法
+                    function_name = old_index[2:old_index.index('(')]
+                    args_value = old_index[old_index.index("(") + 1:old_index.index(")")]
+                    new_index = ""
+                    if args_value != "":
+                        args_value2 = args_value.split(',')
+                        new_index = getattr(DubugTalk(), function_name)(*args_value2)
+                    else:
+                        new_index = getattr(DubugTalk(), function_name)()
+                    if isinstance(new_index, int) or isinstance(new_index, float):
+                        str_data = str_data.replace('"' + old_index + '"', str(new_index))
+                    else:
+                        str_data = str_data.replace(old_index, str(new_index))
+            # 还原数据类型
+            if isinstance(data, dict) or isinstance(data, list):
+                data = json.loads(str_data)
+            else:
+                data = data_type(str_data)
+        return data
+
+    # def replace_headers(self, data):
+
+    # 规范yaml文件
+    def standard_yaml(self, arg_names):
+        try:
+            logs("--------接口测试开始--------")
+            arg_names_keys = arg_names.keys()
+            # 判断一级关键字是否存在:name,request,validate
+            if "name" in arg_names_keys and "request" in arg_names_keys and "validate" in arg_names_keys:
+                # 判断requests下面是否包含method,url
+                request_keys = arg_names['request'].keys()
+                if "method" in request_keys and "url" in request_keys:
+                    logs("yaml框架检查基本通过")
+                    # pop返回该值,并删除
+                    name = arg_names.pop("name")
+                    method = arg_names['request'].pop("method")
+                    url = arg_names['request'].pop("url")
+                    # 发送请求
+                    res = self.send_request(name, method, url, **arg_names['request'])
+                    # 获取接口的请求时间
+                    res_time = round(res.elapsed.total_seconds() * 1000, 2)
+                    act_time = read_config_yaml("request", "request_out_time")
+                    print("act_time============", act_time)
+                    print("res_time============", res_time)
+
+                    # 判断响应时间
+                    if res_time >= act_time:
+                        # warring_log(
+                        #     "接口实际请求时间{res_time}毫秒,请求时间大于{act_time}毫秒,请关注".format(res_time=res_time,
+                        #                                                                          act_time=act_time))
+                        # # 超时发送钉钉推送
+                        # dingtalk_send_warning(res)
+                        write_yaml_out_time_url({name: res.url})
+
+                    # print(res.json())
+                    return_text = res.text
+                    # print(return_text)
+                    return_json = ""
+                    return_code = res.status_code
+                    # 提取值并写入extract.yaml文件里面
+                    try:
+                        return_json = res.json()
+                    except Exception as e:
+                        error_log("返回结果不是json格式,不能用这种方法")
+                    if "extract" in arg_names_keys:
+                        for key, value in arg_names['extract'].items():
+                            if "(.*?)" in value or "(.+?)" in value:  # 正则
+                                # print("-------"+ value)
+                                zz_value = re.search(value, return_text)
+                                if zz_value:
+                                    extract_value = {key: zz_value.group(1)}
+                                    write_yaml(extract_value)
+                            else:  # jsonpath
+                                # print("+++++"+value)
+                                js_value = jsonpath.jsonpath(return_json, value)
+                                if js_value:
+                                    extract_value = {key: js_value[0]}
+                                    write_yaml(extract_value)
+                    # 断言结果
+                    self.assert_result(arg_names['validate'], return_json, return_code)
+
+                else:
+                    error_log("request下面是否包含method,url")
+            else:
+                error_log("一级关键字必须包括:name, request, validate")
+        except Exception as e:
+            error_log("规范yaml文件standard_yaml异常:%s" % str(traceback.format_exc()))
+
+    # 断言方法
+    def assert_result(self, yq_result, sj_result, return_code):
+        try:
+            logs("预期结果:%s" % yq_result)
+            logs("实际结果:%s" % sj_result)
+            # logs("实际结果:%s" % json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
+            for yq in yq_result:
+                for key, value in yq.items():
+                    # print(key, value)
+                    if key == "equals":
+                        self.assert_equals(value, sj_result, return_code)
+
+                    elif key == "contains":
+                        # self.assert_contains(value, json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
+                        self.assert_contains(value, sj_result)
+
+
+                    elif key == 'db_equals':
+                        self.database_assert(value, sj_result)
+
+                    else:
+                        error_log("框架暂时不支持此框架断言")
+            logs("接口测试成功")
+
+            logs("------接口测试结束————————\n")
+        except Exception as e:
+            logs("接口测试失败!!!")
+            logs("------接口测试结束————————")
+            error_log("断言assert_result异常:%s" % str(traceback.format_exc()))
+
+    # 相等断言
+    def assert_equals(self, value, sj_result, return_code):
+        for assert_key, assert_value in value.items():
+            # print(assert_value)
+            if assert_key == "status_code":
+                if assert_value != return_code:
+                    error_log("断言失败:状态码不等于%s" % assert_value)
+            else:
+                lists = jsonpath.jsonpath(sj_result, '$..%s' % assert_key)
+                if lists:
+                    if assert_value not in lists:
+                        error_log("断言失败:" + assert_key + "不等于" + str(assert_value))
+                else:
+                    error_log("断言失败:返回中的结果不存在:" + assert_key)
+
+    # 包含断言
+    def assert_contains(self, value, sj_result):
+
+        if str(value).replace("{", "").replace("}", "") not in str(sj_result):
+            error_log("断言失败:返回结果中不包含" + str(value))
+
+    # 数据库断言
+    def database_assert(self, value, sj_result):
+        for sql, key in value.items():
+            if key not in sj_result:
+                error_log("数据库断言失败:返回结果中不包含" + str(key))
+            else:
+                res = None
+                try:
+                    res = DatabaseUtil().execute_sql(sql)
+                except:
+                    error_log("数据库断言失败:SQL查询异常:%s" % str(traceback.format_exc()))
+                if res is None:
+                    error_log("数据库断言失败:SQL语句有语法错误或者查询结果为空")
+                else:
+                    res_new = []
+                    for i in res:
+                        for k in i:
+                            res_new.append(k)
+                    if str(sj_result[key]) in res_new:
+                        logs("数据库断言成功:数据库查询结果" + str(res) + ",返回结果:" + str(sj_result[key]))
+                    else:
+                        error_log("数据库断言失败:SQL查询出的结果不等于接口返回结果")

+ 80 - 0
common/yaml_util.py

@@ -0,0 +1,80 @@
+import json
+import os
+
+import yaml
+
+
+# 获取项目根目录
+def get_project_path():
+    return os.path.abspath(os.getcwd().split("common")[0])
+
+
+# 读取
+def read_yaml(key):
+    with open(os.getcwd() + '/extract.yaml', mode='r', encoding='utf-8') as f:
+        value = yaml.load(stream=f, Loader=yaml.FullLoader)
+        return value[key]
+
+
+# 读取
+
+
+# 写入
+
+def write_yaml(data):
+    with open(os.getcwd() + '/extract.yaml', mode='a', encoding='utf-8') as f:
+        value = yaml.dump(data, stream=f, allow_unicode=True)
+        return value
+
+
+# 清空
+def clear_yaml():
+    with open(os.getcwd() + '/extract.yaml', mode='w', encoding='utf-8') as f:
+        f.truncate()
+
+
+def clear_out_yaml():
+    with open(os.getcwd() + '/out_time_url.yaml', mode='w', encoding='utf-8') as f:
+        f.truncate()
+
+# 读取测试用列
+def read_testcase(yaml_path):
+    with open(yaml_path, mode='r', encoding='utf-8') as f:
+        value = yaml.load(stream=f, Loader=yaml.FullLoader)
+        return value
+
+
+# 读取conftest,yaml
+def read_config_yaml(one_node, two_nede):
+    with open(get_project_path() + '/config.yaml', mode='r', encoding='utf-8') as f:
+        value = yaml.load(f, Loader=yaml.FullLoader)
+        return value[one_node][two_nede]
+
+
+def read_yaml_out_time_url():
+    with open(get_project_path() + '/out_time_url.yaml', mode='r', encoding='utf-8') as f:
+        value = yaml.load(f, Loader=yaml.FullLoader)
+        return value
+
+
+def write_yaml_out_time_url(data):
+    with open(os.getcwd() + '/out_time_url.yaml', mode='a', encoding='utf-8') as f:
+        value = yaml.dump(data, stream=f, allow_unicode=True)
+        return value
+
+
+# #读取数据得yaml
+def read_data_yaml(yaml_path):
+    with open(get_project_path() + yaml_path, mode='r', encoding='utf-8') as f:
+        value = yaml.load(stream=f, Loader=yaml.FullLoader)
+        return value
+
+
+def read_case(yaml_path):
+    with open(os.getcwd() + yaml_path, mode='r', encoding='utf-8') as f:
+        value = yaml.load(stream=f, Loader=yaml.FullLoader)
+        return value
+
+
+if __name__ == '__main__':
+    print(os.path.abspath(os.getcwd().split("common")[0]))

+ 26 - 0
config.yaml

@@ -0,0 +1,26 @@
+#基础路径
+base:
+  base_qccq_cs: http://221.229.99.73:63220
+  base_qccq_sc: https://ai.teleagi.cn
+
+#日志配置
+log:
+  log_name: logs_
+  log_format: '[%(asctime)s] %(filename)s->%(funcName)s line:%(lineno)d [%(levelname)s] %(message)s'
+  log_colors_config:
+    DEBUG: 'cyan'
+    INFO: 'green'
+    WARNING: 'yellow'
+    ERROR: 'red'
+    CRITICAL: 'red'
+
+
+request:
+  request_out_time: 2000
+
+
+dingding:
+  prod_webhook: 'https://oapi.dingtalk.com/robot/send?access_token=f8566d2ed0feb8dc3bf2856df161241a8cf5a4c76af3d903abcabfdebebdff0e'
+  test_yunwin_webhook: 'https://oapi.dingtalk.com/robot/send?access_token=a9842cfd9a486b4516b1f228df14b3f9c45826a85c05edbdf7457ee2d2f14b78'
+
+

+ 69 - 0
conftest.py

@@ -0,0 +1,69 @@
+import json
+import os
+from pathlib import Path
+
+import pytest
+import requests
+
+from common.yaml_util import clear_yaml, read_testcase, read_config_yaml, write_yaml, clear_out_yaml
+
+path = Path(__file__).parent.glob("**/test_001_登录接口.yaml")
+for yaml_path1 in path:
+    yaml_path = yaml_path1
+
+path = Path(__file__).parent.glob("**/test_001_登录获取key.yaml")
+for yaml_path1 in path:
+    yzm_path = yaml_path1
+
+
+@pytest.fixture(scope="session", autouse=True)
+def clear_yam():
+    clear_yaml()
+    clear_out_yaml()
+    login_new()
+    # key_login(yzm_path)
+    # login_system(yaml_path)
+
+
+#
+# def login_system(yaml_path: Path):
+#     global i
+#     data = read_testcase(yaml_path)
+#     for i in data:
+#         method = i['request'].pop("method")
+#         url = read_config_yaml("base", "base_qccq_sc") + i['request'].pop("url")
+#         # 发送请求
+#         res = requests.request(method, url, **i['request'])
+#         token = res.json()['data']["token"]
+#         write_yaml({"token": token})
+#
+#
+# def key_login(yzm_path):
+#     global i
+#     data = read_testcase(yzm_path)
+#     for i in data:
+#         method = i['request'].pop('method')
+#         url = read_config_yaml("base", "base_qccq_sc") + i['request'].pop("url")
+#         res = requests.request(method, url)
+#         write_yaml({"captchaKey": res.json()["data"]["captchaKey"]})
+#         write_yaml({"captcha": res.json()["data"]["length"]})
+def login_new():
+    # url = "https://app.youth.cq.cqyl.org.cn/api/service-sysmgr/LoginController/getCaptchaLength?accountId=18983179310"
+    # method = "get"
+    # res = requests.request(method, url)
+    # data = res.json()["data"]
+    #
+    # url1 = "https://app.youth.cq.cqyl.org.cn/api/service-sysmgr/LoginController/login"
+    # method1 = "post"
+    # request_data = {
+    #     "captchaKey": data["captchaKey"],
+    #     "captcha": data["length"],
+    #     "mobile": "18983179310",
+    #     'password': "4E797559645371504E71767A563733444C2F7250785176504C79696231427977737073446347774A4C4A5631486A5231554B7455744B692B56423446673473562B4755504E737064767665674753445672334A6A4836434B447A336B6F63346F6C554654786D5467516768684D7661564A4D6B49327A75377A72535A6745746A4E546A535058344B39312B794A6C57634B4E45334E4F723043704E505031653145686C337033767330354D3D",
+    #     "scope": "PHONE",
+    #     "username": '18983179310'
+    # }
+    # print(request_data)
+    # response = requests.request(url=url1, method=method1, json=request_data)
+    # token = response.json()["data"]["token"]
+    write_yaml({"token": 'eyJhbGciOiJIUzUxMiIsInppcCI6IkdaSVAifQ.H4sIAAAAAAAAAGWQTU7DMBSE7-J1ItmxYztZI6QK1E3LAZzkNRgSO_KP-Kl6FLgft8BWaVRgOd-8GY3eET0FjVpEZSdJN-ASwyBK1qimbCjrStofOOGV7CsqUYG0CqglglKCmWCiQD52Ke3ffIA5-94naRdwKmhrElFx-CEPHlwC8LqcKwRu6lwRwCgTNvmMSE4ZrpiQjDS4ZnVTpURMwX825rxuJEv22t0ekU5nJk5TgXoHKsBez3AhcRn-kAGm20mNF2nU1XEq3F7pRXn_Yt1aD7PS02o-WgPbOHd5RFopCM_TOT-vM5s1ltXORtevxQ5GnZ7nftODdj7c21Eb1AYXoUBdfL_UnNLP7DOY83b09fG5u7nbZ4JO3-WyAzrQAQAA.YNoA4j-Asy4GvhbB6atkkEfOno49lwMDLqgf5hZp_EhVGpILu7BNVsB7pvleDGrMU30asoXjxEmJqqnvzuCAbA'})

+ 9 - 0
pytest.ini

@@ -0,0 +1,9 @@
+[pytest]
+addopts = -vs --alluredir=./temps --clean-alluredir
+testpaths = ./test_case/
+;python_files = test_*.py
+;python_classes = Test*
+;python_functions = test
+;markers =
+;    smoke:冒烟
+;    usermanage:用户管理

+ 105 - 0
random1111.py

@@ -0,0 +1,105 @@
+import base64
+import hashlib
+import os
+import random
+import time
+
+# import rsa as rsa
+import yaml
+
+# from common.encrypt import genRat
+from common.request_util import *
+from common.yaml_util import read_yaml
+
+
+class DubugTalk:
+    # 获得随机数
+    def get_random_number(self, min, max):
+        return str(random.randint(int(min), int(max)))
+
+    def get_random_string(self):
+        return str(random.randint(1000000, 9999999)) + str(random.randint(1000000, 9999999)) + "30p9sifjve41"
+
+    def read_yaml(self, key):
+        return read_yaml(key)
+
+    # Md5加密
+    def md5(self, args):
+        # 以指定的编码格式编码字符串
+        utf8_str = str(args).encode("utf-8")
+        # MD5加密(哈稀算法)
+        md5_str = hashlib.md5(utf8_str).hexdigest()
+        return md5_str.upper()
+
+    # base64加密
+    def bs64(self, args):
+        # 以指定的编码格式编码字符串
+        utf8_str = str(args).encode("utf-8")
+        # base64加密
+        ba64_str = base64.b64encode(utf8_str).decode('utf-8')
+        return ba64_str.upper()
+
+    # RSA双钥加密
+    # # 生成公钥私钥到指定文件
+    # def create_key(self):
+    #     # 根据密钥长度生成公钥私钥
+    #     (public_key, private_key) = rsa.newkeys(1024)
+    #     # 报存公钥
+    #     with open("public.pem", "w+") as f:
+    #         f.write(public_key.save_pkcs1().decode())
+    #     # 报存私钥
+    #     with open("private.pem", "w+") as f:
+    #         f.write(private_key.save_pkcs1().decode())
+    # 通过公钥加密
+    # def public_key_jiami(self, args):
+    #     # 导入密钥
+    #     with open("public.pem") as f:
+    #         pubkey = rsa.PublicKey.load_pkcs1(f.read().encode())
+    #     # 加密
+    #     byte_str = rsa.encrypt(str(args).encode("utf-8"), pubkey)
+    #     # 把二进制转化成字符串格式
+    #     miwen = base64.b64encode(byte_str).decode("utf-8")
+    #     return miwen
+    # 通过私钥解密
+    # def private_key_jiemi(self,args):
+    #     with open("private.pem") as f:
+    #         prikey = rsa.PrivateKey.load_pkcs1(f.read().encode())
+    #     # 把字符串转换二进制
+    #     byte_key = base64.b64decode(args)
+    #     #解密
+    #     mingwen = rsa.decrypt(byte_key, prikey).decode()
+    #     return mingwen
+
+    # singe签名
+    def singes(self, yaml_path):
+        last_url = ""
+        last_data = {}
+        with open(os.getcwd() + yaml_path, encoding="utf-8") as f:
+            yaml_value = yaml.load(f, Loader=yaml.FullLoader)
+            for caseinfo in yaml_value:
+                # print(caseinfo)
+                caseinfo_keys = caseinfo.keys()
+                # 判断一级关键字是否存在:name,request,validate
+                if "request" in caseinfo_keys:
+                    # 判断url
+                    if "url" in caseinfo['request'].keys():
+                        last_url = caseinfo['request']['url']
+                        print(caseinfo['request']['url'])
+                    # 判断参数
+                    req = caseinfo['request']
+                    for key, value in req.items():
+                        if key in ["params", "data", "json"]:
+                            for p_key, p_value in req[key].items():
+                                last_data[p_key] = p_value
+        last_url = last_url[last_url.index("?") + 1:len(last_url)]
+        # 把url字典格式加入到last_data字典格式里面
+        lis = last_url.split("&")
+        print(lis)
+        for a in lis:
+            last_data[a[0:a.index('=')]] = a[a.index('=') + 1:len(a)]
+        from common.request_util import RequestUtil
+        # last_data = RequestUtil(self).replace_value(last_data)
+        print(last_data)
+
+    def genRat(self, url: str):
+        return genRat(url, 10231)

BIN
requirements.txt


+ 12 - 0
run.py

@@ -0,0 +1,12 @@
+import os
+import time
+
+import pytest
+
+
+if __name__ == '__main__':
+    pytest.main(['-vs'])
+    time.sleep(3)
+    os.system("allure generate ./temps -o ./report --clean")
+    # pytest.main()  # 启动框架
+    # os.system("allure open  report -p 0")  # 打开报告

+ 14 - 0
test_case/test_001_我的空间.yaml

@@ -0,0 +1,14 @@
+  -
+    name: 我的空间
+    request:
+      method: post
+      url: /aicloud-api/xcdmx/file/queryImage
+      headers:
+        opertoken: ${read_yaml(token)}
+        "Mi": ${genRat("/aicloud-api/xcdmx/file/queryImage")}
+      json:
+        pageNo: 1
+        pageSize: 10
+        name: ""
+    validate:
+      - contains: 200

+ 14 - 0
test_case/test_002_为你推荐.yaml

@@ -0,0 +1,14 @@
+  -
+    name: 为你推荐
+    request:
+      method: post
+      url: /aicloud-api/xcdmx/interface/draftDecisionController/fetchList
+      headers:
+        opertoken: ${read_yaml(token)}
+        "Mi": ${genRat("/aicloud-api/xcdmx/interface/draftDecisionController/fetchList")}
+      json:
+        categoryIds:
+          [106180212]
+        cursorId: ""
+    validate:
+      - contains: 200

+ 12 - 0
test_case/test_003_模板中心.yaml

@@ -0,0 +1,12 @@
+  -
+    name: 模板中心
+    request:
+      method: post
+      url: /aicloud-api/xcdmx/interface/draftDecisionController/fetchList
+      headers:
+        opertoken: ${read_yaml(token)}
+        "Mi": ${genRat("/aicloud-api/xcdmx/interface/draftDecisionController/fetchList")}
+      json: {}
+
+    validate:
+      - contains: 200

+ 14 - 0
test_case/test_004_AI海报.yaml

@@ -0,0 +1,14 @@
+  -
+    name: AI海报
+    request:
+      method: post
+      url: /aicloud-api/xcdmx/interface/posterGeneration/aSentence
+      headers:
+        opertoken: ${read_yaml(token)}
+        "Mi": ${genRat("/aicloud-api/xcdmx/interface/posterGeneration/aSentence")}
+      json:
+        seq_id: ${get_random_string()}
+        prompt: "生成一张冬至宣传海报,一家人一桌暖心聚会,幸福满溢这个寒冬。上面写着“冬至暖心"
+        content: "生成一张冬至宣传海报,一家人一桌暖心聚会,幸福满溢这个寒冬。上面写着“冬至暖心"
+    validate:
+      - contains: 200

+ 34 - 0
test_case/test_005_AI绘图.yaml

@@ -0,0 +1,34 @@
+  -
+    name: AI海报
+    request:
+      method: post
+      url: /aicloud-api/xcdmx/interface/api/AlterAIController/crateRegionalLayoutAI
+      headers:
+        opertoken: ${read_yaml(token)}
+        "Mi": ${genRat("/aicloud-api/xcdmx/interface/api/AlterAIController/crateRegionalLayoutAI")}
+      json:
+        batch_size: 1
+        seq_id: ${get_random_string()}
+        content: "操场上奔跑"
+        extra_message: [
+  {
+    "key": "instances",
+    "value": [
+      "操场上奔跑"
+    ]
+  },
+  {
+    "key": "bboxes",
+    "value": [
+      [
+        0.45117,
+        0.45117,
+        0.54883,
+        0.54883
+      ]
+    ]
+  }
+]
+        resolution_ratio: [1,1]
+    validate:
+      - contains: 200

+ 41 - 0
test_case/test_allAPI.py

@@ -0,0 +1,41 @@
+import json
+
+from pathlib import Path
+
+import allure
+import pytest
+
+from common.parameterize import ddt, read_testcase
+from common.request_util import RequestUtil
+from common.yaml_util import write_yaml_out_time_url
+
+path = Path(__file__).parent.glob("**/*.yaml")
+
+
+def creat_case(yaml_path):
+    @pytest.mark.parametrize("arg_names", read_testcase(rf"{yaml_path}"))
+    def test_fun(self, arg_names):
+        allure.dynamic.title(arg_names["name"])
+        RequestUtil("base_qccq_sc").standard_yaml(arg_names)
+
+    return test_fun
+
+
+@allure.epic("青春重庆")
+@allure.feature("用户管理模块")
+class TestAPI:
+    pass
+
+
+# for yaml_path in path:
+#     yaml_name = yaml_path.name[:-5]
+#     setattr(TestAPI, yaml_name, creat_case(yaml_path))
+
+
+data = []
+for yaml_path in path:
+    data.append(yaml_path)
+data.sort()
+for yaml_path in data:
+    yaml_name = yaml_path.name[:-5]
+    setattr(TestAPI, yaml_name, creat_case(yaml_path))