DingDing.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import os
  2. import socket
  3. from pathlib import Path
  4. from dingtalkchatbot.chatbot import DingtalkChatbot
  5. import datetime
  6. # 获取jenkins构建信息和本次报告地址
  7. import jenkins # 安装pip install python-jenkins
  8. from common.yaml_util import read_config_yaml, read_yaml_out_time_url, write_yaml_out_time_url
  9. # 获取本机IP
  10. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  11. # s.connect(("8.8.8.8", 80))
  12. # ip = s.getsockname()[0]
  13. # jenkins登录地址
  14. jenkins_url = "http://121.229.175.82:63222/"
  15. # 获取jenkins对象
  16. server = jenkins.Jenkins(jenkins_url, username='jiangwei', password='jW@123456') # Jenkins登录名 ,密码
  17. # job名称
  18. job_name = "job/XCHY/" # Jenkins运行任务名称
  19. # job的url地址
  20. job_url = jenkins_url + job_name
  21. # 获取最后一次构建
  22. job_last_build_url = server.get_info(job_name)['lastBuild']['url']
  23. # 报告地址
  24. report_url = job_last_build_url + 'allure' # 'allure'为我的Jenkins全局工具配置中allure别名
  25. if "localhost" in report_url:
  26. report_url = report_url.replace("localhost", ip)
  27. else:
  28. report_url = report_url
  29. '''
  30. 钉钉推送方法:
  31. 读取report文件中"prometheusData.txt",循环遍历获取需要的值。
  32. 使用钉钉机器人的接口,拼接后推送text
  33. '''
  34. # url = 'https://oapi.dingtalk.com/robot/send?access_token=f8566d2ed0feb8dc3bf2856df161241a8cf5a4c76af3d903abcabfdebebdff0e' # webhook
  35. def DingTalkSend(info):
  36. d = {}
  37. # 获取项目绝对路径
  38. path = Path(__file__).parent
  39. file = '/report/export/prometheusData.txt'
  40. print(rf"{path}" + rf"{file}")
  41. # 打开prometheusData 获取需要发送的信息
  42. # f = open(rf"{path}" + rf"{file}", 'r', encoding='UTF-8')
  43. f = open(os.getcwd() + file, 'r', encoding='UTF-8')
  44. for lines in f:
  45. for c in lines:
  46. launch_name = lines.strip('\n').split(' ')[0]
  47. num = lines.strip('\n').split(' ')[1]
  48. d.update({launch_name: num})
  49. print(d)
  50. f.close()
  51. # retries_run = d.get('launch_retries_run') # 运行总数
  52. # print('运行总数:{}'.format(retries_run))
  53. # status_defects = d.get('launch_problems_test_defects') # 未执行数
  54. # if status_defects is None:
  55. # status_defects = "0"
  56. # else:
  57. # status_defects = status_defects
  58. # print(status_defects)
  59. # status_passed = d.get('launch_status_passed') # 通过数量
  60. # print('通过数量:{}'.format(status_passed))
  61. status_failed = d.get('launch_status_broken') # 不通过数量
  62. print('不通过数量:{}'.format(status_failed))
  63. # 钉钉推送
  64. text1 = (
  65. f"<font color=\'#FFA500\'>[通知] </font>星辰绘影-{info}"
  66. "\n\n服务巡检情况: " + "正常" +
  67. "\n\n播报时间: " + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  68. text2 = (
  69. f"<font color=\'#FFA500\'>[通知] </font>星辰绘影-{info}"
  70. '''\n\n<font>服务巡检情况: </font><font color=\'#FF0000\' size=2>异常</font> \n\n''' +
  71. "\n\n播报时间: " + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  72. print(type(status_failed))
  73. try:
  74. if status_failed == "0":
  75. print("我要发text1")
  76. dd_robot = DingtalkChatbot(read_config_yaml("dingding", "test_yunwin_webhook"))
  77. ret = dd_robot.send_markdown(title='星辰绘影服务巡检', text=text1, is_at_all=False)
  78. else:
  79. print("我要发text2")
  80. dd_robot = DingtalkChatbot(read_config_yaml("dingding", "test_yunwin_webhook"))
  81. ret = dd_robot.send_markdown(title='星辰绘影服务巡检', text=text2, is_at_all=False)
  82. except Exception as e:
  83. pass
  84. if __name__ == '__main__':
  85. DingTalkSend("服务巡检")