# 定时 from apscheduler.schedulers.blocking import BlockingScheduler import json import requests from datetime import datetime, timedelta import logging logging.basicConfig() logging.getLogger('apscheduler').setLevel(logging.DEBUG) #时间格式转换 def formateDate(time_str="Thu, 07 Mar 2024 18:36:18 GMT",time_format = "%a, %d %b %Y %H:%M:%S GMT",formate='%Y-%m-%d %H:%M:%S'): # 解析时间字符串为datetime对象,并设置时区为GMT dt = '' if isinstance(time_str, datetime): dt = time_str else: dt = datetime.strptime(time_str, time_format) # 转换为所需的日期格式 YYYY-mm-dd formatted_date = dt.strftime(formate) return formatted_date # 登录dify def dify_login(): result = '' headers={ 'Content-Type': 'application/json' } params = { 'email': "zhuliu@china-wispro.com", 'language': "zh-Hans", 'password': "xiaoshi221101", 'remember_me': True, } url = f"http://192.168.2.24/console/api/login" response = requests.post(url, headers=headers,data=json.dumps(params)) response.raise_for_status() json_obj = response.json() if json_obj['result'] == 'success': result = json_obj['data']['access_token'] return result # 定时任务 def job(): print('启动定时') # 获取token token = dify_login() if not token: print("获取登录token失败") return False difyIds = ['a30f1218-bc9d-4f2a-985a-2386671a91dc','7f7812ce-89f4-4981-a55c-efe2e016cb36','01f7efbf-d23f-49f9-8ee4-740892c4f6a5'] yesterday = formateDate(datetime.now() - timedelta(days=1),"%a, %d %b %Y %H:%M:%S GMT",'%Y-%m-%d') startTime = f"{yesterday}T00:00" endTime = f"{yesterday}T23:59" headers={ 'Content-Type': 'application/json' } for difyId in difyIds: params = { 'difyId':difyId, 'startTime':startTime, 'endTime':endTime, 'token':token } try: url = f"http://192.168.2.100:3500/api/getDifyRecord" response = requests.post(url, headers=headers,data=json.dumps(params)) except Exception as e: pass def main(): print("定时开始启动") scheduler = BlockingScheduler() # scheduler.add_job(job, 'interval', seconds=5) scheduler.add_job(job, 'cron', hour=1, minute=10) # scheduler.add_job(job, 'cron', second='*/5') try: print("定时启动") # 启动调度器 scheduler.start() print("定时启动成功") except (KeyboardInterrupt, SystemExit): print("定时关闭") # 关闭调度器 scheduler.shutdown() print("Scheduler shutdown complete") if __name__ == '__main__': main()