import os import socket from pathlib import Path from dingtalkchatbot.chatbot import DingtalkChatbot import datetime # 获取jenkins构建信息和本次报告地址 import jenkins # 安装pip install python-jenkins from common.yaml_util import read_config_yaml, read_yaml_out_time_url, write_yaml_out_time_url # 获取本机IP s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # s.connect(("8.8.8.8", 80)) # ip = s.getsockname()[0] # jenkins登录地址 jenkins_url = "http://121.229.175.82:63222/" # 获取jenkins对象 server = jenkins.Jenkins(jenkins_url, username='jiangwei', password='jW@123456') # Jenkins登录名 ,密码 # job名称 job_name = "job/qccq/" # Jenkins运行任务名称 # job的url地址 job_url = jenkins_url + job_name # 获取最后一次构建 job_last_build_url = server.get_info(job_name)['lastBuild']['url'] # 报告地址 report_url = job_last_build_url + 'allure' # 'allure'为我的Jenkins全局工具配置中allure别名 if "localhost" in report_url: report_url = report_url.replace("localhost", ip) else: report_url = report_url ''' 钉钉推送方法: 读取report文件中"prometheusData.txt",循环遍历获取需要的值。 使用钉钉机器人的接口,拼接后推送text ''' # url = 'https://oapi.dingtalk.com/robot/send?access_token=f8566d2ed0feb8dc3bf2856df161241a8cf5a4c76af3d903abcabfdebebdff0e' # webhook def DingTalkSend(info): d = {} # 获取项目绝对路径 path = Path(__file__).parent file = '/report/export/prometheusData.txt' print(rf"{path}" + rf"{file}") # 打开prometheusData 获取需要发送的信息 # f = open(rf"{path}" + rf"{file}", 'r', encoding='UTF-8') f = open(os.getcwd() + file, 'r', encoding='UTF-8') for lines in f: for c in lines: launch_name = lines.strip('\n').split(' ')[0] num = lines.strip('\n').split(' ')[1] d.update({launch_name: num}) print(d) f.close() retries_run = d.get('launch_retries_run') # 运行总数 print('运行总数:{}'.format(retries_run)) status_defects = d.get('launch_problems_test_defects') # 未执行数 if status_defects is None: status_defects = "0" else: status_defects = status_defects print(status_defects) status_passed = d.get('launch_status_passed') # 通过数量 print('通过数量:{}'.format(status_passed)) status_failed = d.get('launch_status_failed') # 不通过数量 print('不通过数量:{}'.format(status_failed)) out_time_url = str(read_yaml_out_time_url()) print(out_time_url) # 钉钉推送 text = ( f"[通知] 青春重庆-{info}报告" "\n\n用例运行总数: " + retries_run + # "\n\n用例未执行数: " + status_defects + "\n\n用例通过数量: " + status_passed + '''\n\n用例失败数量: %s \n\n''' + "\n\n响应超过2s接口汇总:" + out_time_url + # "\n\n构建地址:\n" + job_url + "\n\n测试报告地址: \n" + report_url + "\n\n播报时间: " + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) % (status_defects) dd_robot = DingtalkChatbot(read_config_yaml("dingding", "prod_webhook")) ret = dd_robot.send_markdown(title='青春重庆', text=text, is_at_all=False) print(ret) def dingtalk_send_warning(msg): """ 发送报警超时消息dingtalkchatbot :param msg: 请求返回res :return: none """ try: res_time = round(msg.elapsed.total_seconds() * 1000, 2) text = f"[通知] 青春重庆-报警" \ f"\n\n>请求接口路径: {msg.url} " \ f"\n\n>请求体: {str(msg.request.body).replace('%', '')} " \ f"\n\n>接口响应时间: {res_time}毫秒 " \ f"\n\n>播报时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} " dd_robot = DingtalkChatbot(read_config_yaml("dingding", "prod_webhook")) dd_robot.send_markdown(title='青春重庆', text=text, is_at_all=False) except Exception as e: raise e if __name__ == '__main__': # dingtalk_send_warning("测试") DingTalkSend("生产APP巡检")