DingDing.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import os
  2. import socket
  3. from pathlib import Path
  4. from dingtalkchatbot.chatbot import DingtalkChatbot
  5. import datetime
  6. # 获取jenkins构建信息和本次报告地址
  7. import jenkins # 安装pip install python-jenkins
  8. # 获取本机IP
  9. # s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  10. # s.connect(("8.8.8.8", 80))
  11. # ip = s.getsockname()[0]
  12. # jenkins登录地址
  13. jenkins_url = "http://121.229.175.82:63222/"
  14. # 获取jenkins对象
  15. server = jenkins.Jenkins(jenkins_url, username='jiangwei', password='jW@123456') # Jenkins登录名 ,密码
  16. # job名称
  17. job_name = "job/qccq/" # Jenkins运行任务名称
  18. # job的url地址
  19. job_url = jenkins_url + job_name
  20. # 获取最后一次构建
  21. job_last_build_url = server.get_info(job_name)['lastBuild']['url']
  22. # 报告地址
  23. report_url = job_last_build_url + 'allure' # 'allure'为我的Jenkins全局工具配置中allure别名
  24. if "localhost" in report_url:
  25. report_url = report_url.replace("localhost", ip)
  26. else:
  27. report_url = report_url
  28. '''
  29. 钉钉推送方法:
  30. 读取report文件中"prometheusData.txt",循环遍历获取需要的值。
  31. 使用钉钉机器人的接口,拼接后推送text
  32. '''
  33. def DingTalkSend(info):
  34. d = {}
  35. # 获取项目绝对路径
  36. path = Path(__file__).parent
  37. file = '/report/export/prometheusData.txt'
  38. print(rf"{path}" + rf"{file}")
  39. # 打开prometheusData 获取需要发送的信息
  40. # f = open(rf"{path}" + rf"{file}", 'r', encoding='UTF-8')
  41. f =open(os.getcwd()+file,'r', encoding='UTF-8')
  42. for lines in f:
  43. for c in lines:
  44. launch_name = lines.strip('\n').split(' ')[0]
  45. num = lines.strip('\n').split(' ')[1]
  46. d.update({launch_name: num})
  47. print(d)
  48. f.close()
  49. retries_run = d.get('launch_retries_run') # 运行总数
  50. print('运行总数:{}'.format(retries_run))
  51. status_defects = d.get('launch_problems_test_defects') # 未执行数
  52. if status_defects is None:
  53. status_defects = "0"
  54. else:
  55. status_defects = status_defects
  56. print(status_defects)
  57. status_passed = d.get('launch_status_passed') # 通过数量
  58. print('通过数量:{}'.format(status_passed))
  59. status_failed = d.get('launch_status_failed') # 不通过数量
  60. print('不通过数量:{}'.format(status_failed))
  61. # 钉钉推送
  62. url = 'https://oapi.dingtalk.com/robot/send?access_token=00fd89864b0527c2fd401ac470d0bd89e43de2ce5523fd3aeab44c85671e5d8e' # webhook
  63. text = (
  64. f"<font color=\'#FFA500\'>[通知] </font>青春重庆-{info}测试报告"
  65. "\n\n用例运行总数: " + retries_run +
  66. # "\n\n用例未执行数: " + status_defects +
  67. "\n\n用例通过数量: " + status_passed +
  68. '''\n\n<font>用例失败数量: </font><font color=\'#FF0000\' size=2>%s</font> \n\n''' +
  69. # "\n\n构建地址:\n" + job_url +
  70. "\n\n测试报告地址: \n" + report_url +
  71. "\n\n播报时间: " + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) % (status_defects)
  72. dd_robot = DingtalkChatbot(url)
  73. ret = dd_robot.send_markdown(title='青春重庆', text=text, is_at_all=False)
  74. print(ret)
  75. if __name__ == '__main__':
  76. DingTalkSend("生产")