123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import os
- import socket
- from pathlib import Path
- from dingtalkchatbot.chatbot import DingtalkChatbot
- import datetime
- # 获取jenkins构建信息和本次报告地址
- import jenkins # 安装pip install python-jenkins
- from common.yaml_util import read_config_yaml
- # 获取本机IP
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # s.connect(("8.8.8.8", 80))
- # ip = s.getsockname()[0]
- # jenkins登录地址
- jenkins_url = "http://121.229.175.82:63222/"
- # 获取jenkins对象
- server = jenkins.Jenkins(jenkins_url, username='jiangwei', password='jW@123456') # Jenkins登录名 ,密码
- # job名称
- job_name = "job/qccq/" # Jenkins运行任务名称
- # job的url地址
- job_url = jenkins_url + job_name
- # 获取最后一次构建
- job_last_build_url = server.get_info(job_name)['lastBuild']['url']
- # 报告地址
- report_url = job_last_build_url + 'allure' # 'allure'为我的Jenkins全局工具配置中allure别名
- if "localhost" in report_url:
- report_url = report_url.replace("localhost", ip)
- else:
- report_url = report_url
- '''
- 钉钉推送方法:
- 读取report文件中"prometheusData.txt",循环遍历获取需要的值。
- 使用钉钉机器人的接口,拼接后推送text
- '''
- # url = 'https://oapi.dingtalk.com/robot/send?access_token=f8566d2ed0feb8dc3bf2856df161241a8cf5a4c76af3d903abcabfdebebdff0e' # webhook
- def DingTalkSend(info):
- d = {}
- # 获取项目绝对路径
- path = Path(__file__).parent
- file = '/report/export/prometheusData.txt'
- print(rf"{path}" + rf"{file}")
- # 打开prometheusData 获取需要发送的信息
- # f = open(rf"{path}" + rf"{file}", 'r', encoding='UTF-8')
- f = open(os.getcwd() + file, 'r', encoding='UTF-8')
- for lines in f:
- for c in lines:
- launch_name = lines.strip('\n').split(' ')[0]
- num = lines.strip('\n').split(' ')[1]
- d.update({launch_name: num})
- print(d)
- f.close()
- retries_run = d.get('launch_retries_run') # 运行总数
- print('运行总数:{}'.format(retries_run))
- status_defects = d.get('launch_problems_test_defects') # 未执行数
- if status_defects is None:
- status_defects = "0"
- else:
- status_defects = status_defects
- print(status_defects)
- status_passed = d.get('launch_status_passed') # 通过数量
- print('通过数量:{}'.format(status_passed))
- status_failed = d.get('launch_status_failed') # 不通过数量
- print('不通过数量:{}'.format(status_failed))
- # 钉钉推送
- text = (
- f"<font color=\'#FFA500\'>[通知] </font>青春重庆-{info}报告"
- "\n\n用例运行总数: " + retries_run +
- # "\n\n用例未执行数: " + status_defects +
- "\n\n用例通过数量: " + status_passed +
- '''\n\n<font>用例失败数量: </font><font color=\'#FF0000\' size=2>%s</font> \n\n''' +
- # "\n\n构建地址:\n" + job_url +
- "\n\n测试报告地址: \n" + report_url +
- "\n\n播报时间: " + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) % (status_defects)
- dd_robot = DingtalkChatbot(read_config_yaml("dingding", "prod_webhook"))
- ret = dd_robot.send_markdown(title='青春重庆', text=text, is_at_all=False)
- print(ret)
- def dingtalk_send_warning(msg):
- """
- 发送报警超时消息dingtalkchatbot
- :param msg: 请求返回res
- :return: none
- """
- try:
- res_time = round(msg.elapsed.total_seconds() * 1000, 2)
- text = f"<font color=\'#FFA500\'>[通知] </font>青春重庆-报警"\
- f"\n\n>请求接口路径: {msg.url}% " \
- f"\n\n>请求体: {str(msg.request.body).replace('%','')}% "\
- f"\n\n>接口响应时间: {res_time}毫秒 " \
- f"\n\n>播报时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} "
- dd_robot = DingtalkChatbot('')
- dd_robot.send_markdown(title='青春重庆', text=text, is_at_all=False)
- except Exception as e:
- raise e
- if __name__ == '__main__':
- # dingtalk_send_warning("测试")
- DingTalkSend("生产APP巡检")
|