소스 검색

Initial commit

“蒋伟” 6 달 전
부모
커밋
df2be6edcb

+ 0 - 0
common/__init__.py


+ 0 - 47
common/database_util.py

@@ -1,47 +0,0 @@
-import traceback
-from sqlite3 import Connection, Cursor
-
-# import MySQLdb
-import pymysql
-
-from common.log_util import error_log
-
-
-class DatabaseUtil:
-    def __init__(self):
-        self.cs = None
-        self.conn = None
-
-    def create_conn(self):
-        self.conn: Connection = pymysql.connect(
-            host='10.12.43.2',
-            user='root',
-            password='Xx98$kdiesJS',
-            database='jwtest',
-            port=3409
-        )
-        return self.conn
-
-    # 执行sql语句
-    def execute_sql(self, sql):
-
-        # 创建游标
-        self.cs: Cursor = self.create_conn().cursor()
-        # 通过游标对象,执行SQL
-        try:
-            self.cs.execute(sql)
-        except:
-            error_log("SQL查询异常:%s" % str(traceback.format_exc()))
-
-            # 提取值
-        value = self.cs.fetchall()
-        self.close_resuoe()
-        print(value)
-        return value
-
-    def close_resuoe(self):
-        self.cs.close()
-        self.conn.close()
-
-# if __name__ == '__main__':
-#     print(DatabaseUtil().execute_sql('select * from uisql '))

+ 0 - 28
common/encrypt.py

@@ -1,28 +0,0 @@
-import base64
-import json
-import time
-import uuid
-from Crypto.Cipher import AES
-
-
-def encrypt_padding(string: str, block_size: int) -> str:
-    length = len(string.encode('utf-8'))
-    add_length = block_size - (length % block_size)
-    return string + (chr(add_length) * add_length)
-
-
-def aes_encrypt(string: str, aes_key: str) -> str:
-    cipher = AES.new(aes_key.encode('utf-8'), AES.MODE_ECB)
-    cipher_text = cipher.encrypt(encrypt_padding(string, AES.block_size).encode('utf-8'))
-    cipher_text = base64.b64encode(cipher_text)
-    return cipher_text.decode()
-
-
-def genRat(url: str,emp:int) -> str:
-    mi = {
-        't': int(time.time() * 1000),
-        'w': str(uuid.uuid4()),
-        'r': url,
-        'u': emp
-    }
-    return aes_encrypt(json.dumps(mi), '253140fdc9c44b0cb903c387b3e85d64')

+ 0 - 104
common/log_util.py

@@ -1,104 +0,0 @@
-import logging
-import colorlog
-import time
-
-from common.yaml_util import get_project_path, read_config_yaml
-
-
-class LoggerUtil:
-    def create_log(self, logger_name='log',level='INFO'):
-
-        level_relations = {
-            'debug': logging.DEBUG,
-            'info': logging.INFO,
-            'warning': logging.WARNING,
-            'error': logging.ERROR,
-            'crit': logging.CRITICAL
-        }
-
-
-        # 创建一个日志对象
-        self.logger = logging.getLogger(logger_name)
-        # 设置全局的日志级别(从低到高:debug调试<info信息<warning警告<error错误<critical严重)
-        self.logger.setLevel(logging.DEBUG)
-        if not self.logger.handlers:
-            # ------文件日志------
-            # 1创建文件日志路径
-            self.file_log_path = get_project_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
-                int(time.time())) + ".log"
-            # 2创建文件日志控制器
-            self.file_hander = logging.FileHandler(self.file_log_path, encoding='UTF-8')
-            # 3设置文件日志级别
-            # file_log_level = str(read_config_yaml("log", "log_level")).lower()
-            # 设置日志显示级别
-            self.logger.setLevel(level_relations.get(level))
-
-            # if file_log_level == "debug":
-            #     self.file_hander.setLevel(logging.DEBUG)
-            # elif file_log_level == "info":
-            #     self.file_hander.setLevel(logging.INFO)
-            # elif file_log_level == "warning":
-            #     self.file_hander.setLevel(logging.WARNING)
-            # elif file_log_level == "error":
-            #     self.file_hander.setLevel(logging.ERROR)
-            # elif file_log_level == "critical":
-            #     self.file_hander.setLevel(logging.CRITICAL)
-            # else:
-            #     self.file_hander.setLevel(logging.DEBUG)
-            # 4创建文件的格式
-            self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
-            # 将文件日志的控制器加入日志对象
-            self.logger.addHandler(self.file_hander)
-            # 创建控制台日志
-            # 1创建控制台日志控制器
-            self.console_hander = logging.StreamHandler()
-            # 2设置控制台日志级别
-            self.logger.setLevel(level_relations.get(level))
-
-            # console_log_level = str(read_config_yaml("log", "log_level")).lower()
-            # if console_log_level == "debug":
-            #     self.console_hander.setLevel(logging.DEBUG)
-            # elif console_log_level == "info":
-            #     self.console_hander.setLevel(logging.INFO)
-            # elif console_log_level == "warning":
-            #     self.console_hander.setLevel(logging.WARNING)
-            # elif console_log_level == "error":
-            #     self.console_hander.setLevel(logging.ERROR)
-            # elif console_log_level == "critical":
-            #     self.console_hander.setLevel(logging.CRITICAL)
-            # else:
-            #     self.console_hander.setLevel(logging.DEBUG)
-
-            # formatter = colorlog.ColoredFormatter(
-            #     '%(log_color)s[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s',
-            #     log_colors=log_colors_config)
-            formatter = colorlog.ColoredFormatter(
-                '%(log_color)s[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s',
-                log_colors=read_config_yaml("log","log_colors_config"))
-
-            # 3创建控制台的日志格式
-            self.console_hander.setFormatter(formatter)
-            # 将控制台日志加入日志对象
-            self.logger.addHandler(self.console_hander)
-
-        # 返回包含文件日志控制器和控制台日志控制器的日志对象
-        return self.logger
-
-
-# 错误日志的输出
-def error_log(message):
-    # LoggerUtil().create_log(level="error").error(message)
-    raise Exception(message)
-
-
-def warring_log(message):
-    LoggerUtil().create_log(level="warning").warning(message)
-    # raise Exception(message)
-
-
-
-# 信息日志的输出
-def logs(message):
-    LoggerUtil().create_log(level="info").info(message)
-    # raise Exception(message)
-

+ 0 - 57
common/parameterize.py

@@ -1,57 +0,0 @@
-import json
-import traceback
-import yaml
-
-from common.yaml_util import read_data_yaml, get_project_path
-from common.log_util import error_log
-
-
-# 读取测试用列
-def read_testcase(yaml_path):
-    try:
-        # with open(get_project_path() + yaml_path, mode='r', encoding='utf-8') as f:
-        with open(yaml_path, mode='r', encoding='utf-8') as f:
-            arg_names = yaml.load(stream=f, Loader=yaml.FullLoader)
-            if len(arg_names) >= 2:
-                return arg_names
-            else:
-                if "parameterize" in dict(*arg_names).keys():
-                    new_arg_names = ddt(*arg_names)
-                    return new_arg_names
-                else:
-                    return arg_names
-    except Exception as e:
-        error_log("读取测试用例方法read_testcase异常:%s" % str(traceback.format_exc()))
-
-
-def ddt(arg_names):
-    # 新方法
-    try:
-        arg_names_str = json.dumps(arg_names)
-        data_list = arg_names["parameterize"]
-        key_length = len(data_list[0])
-        # print(key_length)
-        length_success = True
-        # 循环数据
-        for param in arg_names["parameterize"]:
-            if len(param) != key_length:
-                length_success = False
-                error_log("此条数据有误:%s" % param)
-        # 替换值
-        new_art_names = []
-        if length_success:
-            for x in range(1, len(data_list)):
-                # print(data_list[x])
-                temp_arg_names = arg_names_str
-                for y in range(0, len(data_list[x])):
-                    if isinstance(data_list[x][y], int) or isinstance(data_list[x][y], float):
-                        temp_arg_names = temp_arg_names.replace('"$ddt{' + data_list[0][y] + '}"', str(data_list[x][y]))
-                    else:
-                        temp_arg_names = temp_arg_names.replace("$ddt{" + data_list[0][y] + "}", str(data_list[x][y]))
-                new_art_names.append(json.loads(temp_arg_names))
-            # print(new_art_names)
-        return new_art_names
-    except Exception as e:
-        error_log("数据驱动ddt异常:%s" % str())
-
-

+ 0 - 240
common/request_util.py

@@ -1,240 +0,0 @@
-import json
-import re
-import traceback
-import jsonpath
-import requests
-
-from common.database_util import DatabaseUtil
-from common.encrypt import genRat
-from common.log_util import error_log, logs
-from common.yaml_util import write_yaml, read_config_yaml, write_yaml_out_time_url
-from random1111 import DubugTalk
-
-
-class RequestUtil:
-    def __init__(self, two_node):
-        self.base_url = read_config_yaml("base", two_node)
-
-    # 定义一个全局变量,类变量,通过类名调用
-    sess = requests.session()
-
-    def send_request(self, name, method, url, **kwargs):
-        try:
-            # 请求方式处理
-            method = str(method).lower()
-            # 基础路径拼接以及替换
-            url_old = self.replace_value(url)
-            url = self.base_url + self.replace_value(url)
-            # 参数替换
-            for key, value in kwargs.items():
-                if key in ['params', 'data', 'json', 'headers']:
-                    kwargs[key] = self.replace_value(value)
-                    if key == 'headers':
-                        kwargs['headers']["Mi"]= DubugTalk().genRat(str(url_old))
-                elif key == 'files':
-                    for file_key, file_path in value.items():
-                        value[file_key] = open(file_path, 'rb')
-            # 输出日志
-
-            logs("请求名称:%s" % name)
-            logs("请求方式:%s" % method)
-            logs("请求路径:%s" % url)
-            if "headers" in kwargs.keys():
-                logs("请求头:%s" % kwargs["headers"])
-            if "params" in kwargs.keys():
-                logs("请求params参数:%s" % kwargs["params"])
-            elif "data" in kwargs.keys():
-                logs("请求data参数:%s" % kwargs["data"])
-            elif "json" in kwargs.keys():
-                logs("请求json参数:%s" % kwargs["json"])
-            if "files" in kwargs.keys():
-                logs("文件上传:%s" % kwargs["files"])
-            # 请求
-            res = RequestUtil.sess.request(method, url, **kwargs)
-            logs("实际结果: %s" % res)
-            return res
-
-
-        except Exception as e:
-            error_log("发送请求send_request异常:%s" % str(traceback.format_exc()))
-
-    # 替换值的方法
-    # 问题1:(替换url,params,data,json,headers)
-    # 问题2:(string,float,int,dict)
-    def replace_value(self, data):
-        if data:
-            # 保存数据类型
-            data_type = type(data)
-
-            # 判断数据类型
-            if isinstance(data, dict) or isinstance(data, list):
-                str_data = json.dumps(data)
-            else:
-                str_data = str(data)
-            # 替换
-            for cs in range(1, str_data.count('${') + 1):
-                if "${" in str_data and "}" in str_data:
-                    start_index = str_data.index("${")
-                    end_index = str_data.index("}", start_index)
-                    old_index = str_data[start_index:end_index + 1]
-                    # 反射,通过类的对象和方法,字符串调用方法
-                    function_name = old_index[2:old_index.index('(')]
-                    args_value = old_index[old_index.index("(") + 1:old_index.index(")")]
-                    new_index = ""
-                    if args_value != "":
-                        args_value2 = args_value.split(',')
-                        new_index = getattr(DubugTalk(), function_name)(*args_value2)
-                    else:
-                        new_index = getattr(DubugTalk(), function_name)()
-                    if isinstance(new_index, int) or isinstance(new_index, float):
-                        str_data = str_data.replace('"' + old_index + '"', str(new_index))
-                    else:
-                        str_data = str_data.replace(old_index, str(new_index))
-            # 还原数据类型
-            if isinstance(data, dict) or isinstance(data, list):
-                data = json.loads(str_data)
-            else:
-                data = data_type(str_data)
-        return data
-
-    # def replace_headers(self, data):
-
-    # 规范yaml文件
-    def standard_yaml(self, arg_names):
-        try:
-            logs("--------接口测试开始--------")
-            arg_names_keys = arg_names.keys()
-            # 判断一级关键字是否存在:name,request,validate
-            if "name" in arg_names_keys and "request" in arg_names_keys and "validate" in arg_names_keys:
-                # 判断requests下面是否包含method,url
-                request_keys = arg_names['request'].keys()
-                if "method" in request_keys and "url" in request_keys:
-                    logs("yaml框架检查基本通过")
-                    # pop返回该值,并删除
-                    name = arg_names.pop("name")
-                    method = arg_names['request'].pop("method")
-                    url = arg_names['request'].pop("url")
-                    # 发送请求
-                    res = self.send_request(name, method, url, **arg_names['request'])
-                    # 获取接口的请求时间
-                    res_time = round(res.elapsed.total_seconds() * 1000, 2)
-                    act_time = read_config_yaml("request", "request_out_time")
-                    print("act_time============", act_time)
-                    print("res_time============", res_time)
-
-                    # 判断响应时间
-                    if res_time >= act_time:
-                        # warring_log(
-                        #     "接口实际请求时间{res_time}毫秒,请求时间大于{act_time}毫秒,请关注".format(res_time=res_time,
-                        #                                                                          act_time=act_time))
-                        # # 超时发送钉钉推送
-                        # dingtalk_send_warning(res)
-                        write_yaml_out_time_url({name: res.url})
-
-                    # print(res.json())
-                    return_text = res.text
-                    # print(return_text)
-                    return_json = ""
-                    return_code = res.status_code
-                    # 提取值并写入extract.yaml文件里面
-                    try:
-                        return_json = res.json()
-                    except Exception as e:
-                        error_log("返回结果不是json格式,不能用这种方法")
-                    if "extract" in arg_names_keys:
-                        for key, value in arg_names['extract'].items():
-                            if "(.*?)" in value or "(.+?)" in value:  # 正则
-                                # print("-------"+ value)
-                                zz_value = re.search(value, return_text)
-                                if zz_value:
-                                    extract_value = {key: zz_value.group(1)}
-                                    write_yaml(extract_value)
-                            else:  # jsonpath
-                                # print("+++++"+value)
-                                js_value = jsonpath.jsonpath(return_json, value)
-                                if js_value:
-                                    extract_value = {key: js_value[0]}
-                                    write_yaml(extract_value)
-                    # 断言结果
-                    self.assert_result(arg_names['validate'], return_json, return_code)
-
-                else:
-                    error_log("request下面是否包含method,url")
-            else:
-                error_log("一级关键字必须包括:name, request, validate")
-        except Exception as e:
-            error_log("规范yaml文件standard_yaml异常:%s" % str(traceback.format_exc()))
-
-    # 断言方法
-    def assert_result(self, yq_result, sj_result, return_code):
-        try:
-            logs("预期结果:%s" % yq_result)
-            logs("实际结果:%s" % sj_result)
-            # logs("实际结果:%s" % json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
-            for yq in yq_result:
-                for key, value in yq.items():
-                    # print(key, value)
-                    if key == "equals":
-                        self.assert_equals(value, sj_result, return_code)
-
-                    elif key == "contains":
-                        # self.assert_contains(value, json.loads(json.dumps(sj_result).replace(r"\\", "\\")))
-                        self.assert_contains(value, sj_result)
-
-
-                    elif key == 'db_equals':
-                        self.database_assert(value, sj_result)
-
-                    else:
-                        error_log("框架暂时不支持此框架断言")
-            logs("接口测试成功")
-
-            logs("------接口测试结束————————\n")
-        except Exception as e:
-            logs("接口测试失败!!!")
-            logs("------接口测试结束————————")
-            error_log("断言assert_result异常:%s" % str(traceback.format_exc()))
-
-    # 相等断言
-    def assert_equals(self, value, sj_result, return_code):
-        for assert_key, assert_value in value.items():
-            # print(assert_value)
-            if assert_key == "status_code":
-                if assert_value != return_code:
-                    error_log("断言失败:状态码不等于%s" % assert_value)
-            else:
-                lists = jsonpath.jsonpath(sj_result, '$..%s' % assert_key)
-                if lists:
-                    if assert_value not in lists:
-                        error_log("断言失败:" + assert_key + "不等于" + str(assert_value))
-                else:
-                    error_log("断言失败:返回中的结果不存在:" + assert_key)
-
-    # 包含断言
-    def assert_contains(self, value, sj_result):
-
-        if str(value).replace("{", "").replace("}", "") not in str(sj_result):
-            error_log("断言失败:返回结果中不包含" + str(value))
-
-    # 数据库断言
-    def database_assert(self, value, sj_result):
-        for sql, key in value.items():
-            if key not in sj_result:
-                error_log("数据库断言失败:返回结果中不包含" + str(key))
-            else:
-                res = None
-                try:
-                    res = DatabaseUtil().execute_sql(sql)
-                except:
-                    error_log("数据库断言失败:SQL查询异常:%s" % str(traceback.format_exc()))
-                if res is None:
-                    error_log("数据库断言失败:SQL语句有语法错误或者查询结果为空")
-                else:
-                    res_new = []
-                    for i in res:
-                        for k in i:
-                            res_new.append(k)
-                    if str(sj_result[key]) in res_new:
-                        logs("数据库断言成功:数据库查询结果" + str(res) + ",返回结果:" + str(sj_result[key]))
-                    else:
-                        error_log("数据库断言失败:SQL查询出的结果不等于接口返回结果")

+ 0 - 80
common/yaml_util.py

@@ -1,80 +0,0 @@
-import json
-import os
-
-import yaml
-
-
-# 获取项目根目录
-def get_project_path():
-    return os.path.abspath(os.getcwd().split("common")[0])
-
-
-# 读取
-def read_yaml(key):
-    with open(os.getcwd() + '/extract.yaml', mode='r', encoding='utf-8') as f:
-        value = yaml.load(stream=f, Loader=yaml.FullLoader)
-        return value[key]
-
-
-# 读取
-
-
-# 写入
-
-def write_yaml(data):
-    with open(os.getcwd() + '/extract.yaml', mode='a', encoding='utf-8') as f:
-        value = yaml.dump(data, stream=f, allow_unicode=True)
-        return value
-
-
-# 清空
-def clear_yaml():
-    with open(os.getcwd() + '/extract.yaml', mode='w', encoding='utf-8') as f:
-        f.truncate()
-
-
-def clear_out_yaml():
-    with open(os.getcwd() + '/out_time_url.yaml', mode='w', encoding='utf-8') as f:
-        f.truncate()
-
-# 读取测试用列
-def read_testcase(yaml_path):
-    with open(yaml_path, mode='r', encoding='utf-8') as f:
-        value = yaml.load(stream=f, Loader=yaml.FullLoader)
-        return value
-
-
-# 读取conftest,yaml
-def read_config_yaml(one_node, two_nede):
-    with open(get_project_path() + '/config.yaml', mode='r', encoding='utf-8') as f:
-        value = yaml.load(f, Loader=yaml.FullLoader)
-        return value[one_node][two_nede]
-
-
-def read_yaml_out_time_url():
-    with open(get_project_path() + '/out_time_url.yaml', mode='r', encoding='utf-8') as f:
-        value = yaml.load(f, Loader=yaml.FullLoader)
-        return value
-
-
-def write_yaml_out_time_url(data):
-    with open(os.getcwd() + '/out_time_url.yaml', mode='a', encoding='utf-8') as f:
-        value = yaml.dump(data, stream=f, allow_unicode=True)
-        return value
-
-
-# #读取数据得yaml
-def read_data_yaml(yaml_path):
-    with open(get_project_path() + yaml_path, mode='r', encoding='utf-8') as f:
-        value = yaml.load(stream=f, Loader=yaml.FullLoader)
-        return value
-
-
-def read_case(yaml_path):
-    with open(os.getcwd() + yaml_path, mode='r', encoding='utf-8') as f:
-        value = yaml.load(stream=f, Loader=yaml.FullLoader)
-        return value
-
-
-if __name__ == '__main__':
-    print(os.path.abspath(os.getcwd().split("common")[0]))

+ 0 - 14
test_case/test_001_我的空间.yaml

@@ -1,14 +0,0 @@
-  -
-    name: 我的空间
-    request:
-      method: post
-      url: /aicloud-api/xcdmx/file/queryImage
-      headers:
-        opertoken: ${read_yaml(token)}
-        "Mi": ${genRat("/aicloud-api/xcdmx/file/queryImage")}
-      json:
-        pageNo: 1
-        pageSize: 10
-        name: ""
-    validate:
-      - contains: 200

+ 0 - 14
test_case/test_002_为你推荐.yaml

@@ -1,14 +0,0 @@
-  -
-    name: 为你推荐
-    request:
-      method: post
-      url: /aicloud-api/xcdmx/interface/draftDecisionController/fetchList
-      headers:
-        opertoken: ${read_yaml(token)}
-        "Mi": ${genRat("/aicloud-api/xcdmx/interface/draftDecisionController/fetchList")}
-      json:
-        categoryIds:
-          [106180212]
-        cursorId: ""
-    validate:
-      - contains: 200

+ 0 - 12
test_case/test_003_模板中心.yaml

@@ -1,12 +0,0 @@
-  -
-    name: 模板中心
-    request:
-      method: post
-      url: /aicloud-api/xcdmx/interface/draftDecisionController/fetchList
-      headers:
-        opertoken: ${read_yaml(token)}
-        "Mi": ${genRat("/aicloud-api/xcdmx/interface/draftDecisionController/fetchList")}
-      json: {}
-
-    validate:
-      - contains: 200

+ 0 - 14
test_case/test_004_AI海报.yaml

@@ -1,14 +0,0 @@
-  -
-    name: AI海报
-    request:
-      method: post
-      url: /aicloud-api/xcdmx/interface/posterGeneration/aSentence
-      headers:
-        opertoken: ${read_yaml(token)}
-        "Mi": ${genRat("/aicloud-api/xcdmx/interface/posterGeneration/aSentence")}
-      json:
-        seq_id: ${get_random_string()}
-        prompt: "生成一张冬至宣传海报,一家人一桌暖心聚会,幸福满溢这个寒冬。上面写着“冬至暖心"
-        content: "生成一张冬至宣传海报,一家人一桌暖心聚会,幸福满溢这个寒冬。上面写着“冬至暖心"
-    validate:
-      - contains: 200

+ 0 - 34
test_case/test_005_AI绘图.yaml

@@ -1,34 +0,0 @@
-  -
-    name: AI海报
-    request:
-      method: post
-      url: /aicloud-api/xcdmx/interface/api/AlterAIController/crateRegionalLayoutAI
-      headers:
-        opertoken: ${read_yaml(token)}
-        "Mi": ${genRat("/aicloud-api/xcdmx/interface/api/AlterAIController/crateRegionalLayoutAI")}
-      json:
-        batch_size: 1
-        seq_id: ${get_random_string()}
-        content: "操场上奔跑"
-        extra_message: [
-  {
-    "key": "instances",
-    "value": [
-      "操场上奔跑"
-    ]
-  },
-  {
-    "key": "bboxes",
-    "value": [
-      [
-        0.45117,
-        0.45117,
-        0.54883,
-        0.54883
-      ]
-    ]
-  }
-]
-        resolution_ratio: [1,1]
-    validate:
-      - contains: 200

+ 0 - 41
test_case/test_allAPI.py

@@ -1,41 +0,0 @@
-import json
-
-from pathlib import Path
-
-import allure
-import pytest
-
-from common.parameterize import ddt, read_testcase
-from common.request_util import RequestUtil
-from common.yaml_util import write_yaml_out_time_url
-
-path = Path(__file__).parent.glob("**/*.yaml")
-
-
-def creat_case(yaml_path):
-    @pytest.mark.parametrize("arg_names", read_testcase(rf"{yaml_path}"))
-    def test_fun(self, arg_names):
-        allure.dynamic.title(arg_names["name"])
-        RequestUtil("base_qccq_sc").standard_yaml(arg_names)
-
-    return test_fun
-
-
-@allure.epic("青春重庆")
-@allure.feature("用户管理模块")
-class TestAPI:
-    pass
-
-
-# for yaml_path in path:
-#     yaml_name = yaml_path.name[:-5]
-#     setattr(TestAPI, yaml_name, creat_case(yaml_path))
-
-
-data = []
-for yaml_path in path:
-    data.append(yaml_path)
-data.sort()
-for yaml_path in data:
-    yaml_name = yaml_path.name[:-5]
-    setattr(TestAPI, yaml_name, creat_case(yaml_path))