xiaoshi_AI_userRecord_api_time.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # 定时
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. import json
  4. import requests
  5. from datetime import datetime, timedelta
  6. import logging
  7. logging.basicConfig()
  8. logging.getLogger('apscheduler').setLevel(logging.DEBUG)
  9. #时间格式转换
  10. def formateDate(time_str="Thu, 07 Mar 2024 18:36:18 GMT",time_format = "%a, %d %b %Y %H:%M:%S GMT",formate='%Y-%m-%d %H:%M:%S'):
  11. # 解析时间字符串为datetime对象,并设置时区为GMT
  12. dt = ''
  13. if isinstance(time_str, datetime):
  14. dt = time_str
  15. else:
  16. dt = datetime.strptime(time_str, time_format)
  17. # 转换为所需的日期格式 YYYY-mm-dd
  18. formatted_date = dt.strftime(formate)
  19. return formatted_date
  20. # 登录dify
  21. def dify_login():
  22. result = ''
  23. headers={
  24. 'Content-Type': 'application/json'
  25. }
  26. params = {
  27. 'email': "zhuliu@china-wispro.com",
  28. 'language': "zh-Hans",
  29. 'password': "xiaoshi221101",
  30. 'remember_me': True,
  31. }
  32. url = f"http://192.168.2.24/console/api/login"
  33. response = requests.post(url, headers=headers,data=json.dumps(params))
  34. response.raise_for_status()
  35. json_obj = response.json()
  36. if json_obj['result'] == 'success':
  37. result = json_obj['data']['access_token']
  38. return result
  39. # 定时任务
  40. def job():
  41. print('启动定时')
  42. # 获取token
  43. token = dify_login()
  44. if not token:
  45. print("获取登录token失败")
  46. return False
  47. difyIds = ['a30f1218-bc9d-4f2a-985a-2386671a91dc','7f7812ce-89f4-4981-a55c-efe2e016cb36','01f7efbf-d23f-49f9-8ee4-740892c4f6a5']
  48. yesterday = formateDate(datetime.now() - timedelta(days=1),"%a, %d %b %Y %H:%M:%S GMT",'%Y-%m-%d')
  49. startTime = f"{yesterday}T00:00"
  50. endTime = f"{yesterday}T23:59"
  51. headers={
  52. 'Content-Type': 'application/json'
  53. }
  54. for difyId in difyIds:
  55. params = {
  56. 'difyId':difyId,
  57. 'startTime':startTime,
  58. 'endTime':endTime,
  59. 'token':token
  60. }
  61. try:
  62. url = f"http://192.168.2.100:3500/api/getDifyRecord"
  63. response = requests.post(url, headers=headers,data=json.dumps(params))
  64. except Exception as e:
  65. pass
  66. def main():
  67. print("定时开始启动")
  68. scheduler = BlockingScheduler()
  69. # scheduler.add_job(job, 'interval', seconds=5)
  70. scheduler.add_job(job, 'cron', hour=1, minute=10)
  71. # scheduler.add_job(job, 'cron', second='*/5')
  72. try:
  73. print("定时启动")
  74. # 启动调度器
  75. scheduler.start()
  76. print("定时启动成功")
  77. except (KeyboardInterrupt, SystemExit):
  78. print("定时关闭")
  79. # 关闭调度器
  80. scheduler.shutdown()
  81. print("Scheduler shutdown complete")
  82. if __name__ == '__main__':
  83. main()