123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # 定时
- from apscheduler.schedulers.blocking import BlockingScheduler
- import json
- import requests
- from datetime import datetime, timedelta
- import logging
- logging.basicConfig()
- logging.getLogger('apscheduler').setLevel(logging.DEBUG)
- #时间格式转换
- def formateDate(time_str="Thu, 07 Mar 2024 18:36:18 GMT",time_format = "%a, %d %b %Y %H:%M:%S GMT",formate='%Y-%m-%d %H:%M:%S'):
- # 解析时间字符串为datetime对象,并设置时区为GMT
- dt = ''
- if isinstance(time_str, datetime):
- dt = time_str
- else:
- dt = datetime.strptime(time_str, time_format)
- # 转换为所需的日期格式 YYYY-mm-dd
- formatted_date = dt.strftime(formate)
- return formatted_date
- # 登录dify
- def dify_login():
- result = ''
- headers={
- 'Content-Type': 'application/json'
- }
- params = {
- 'email': "zhuliu@china-wispro.com",
- 'language': "zh-Hans",
- 'password': "xiaoshi221101",
- 'remember_me': True,
- }
- url = f"http://192.168.2.24/console/api/login"
- response = requests.post(url, headers=headers,data=json.dumps(params))
- response.raise_for_status()
- json_obj = response.json()
- if json_obj['result'] == 'success':
- result = json_obj['data']['access_token']
- return result
- # 定时任务
- def job():
- print('启动定时')
- # 获取token
- token = dify_login()
- if not token:
- print("获取登录token失败")
- return False
- difyIds = ['a30f1218-bc9d-4f2a-985a-2386671a91dc','7f7812ce-89f4-4981-a55c-efe2e016cb36','01f7efbf-d23f-49f9-8ee4-740892c4f6a5']
- yesterday = formateDate(datetime.now() - timedelta(days=1),"%a, %d %b %Y %H:%M:%S GMT",'%Y-%m-%d')
- startTime = f"{yesterday}T00:00"
- endTime = f"{yesterday}T23:59"
- headers={
- 'Content-Type': 'application/json'
- }
- for difyId in difyIds:
- params = {
- 'difyId':difyId,
- 'startTime':startTime,
- 'endTime':endTime,
- 'token':token
- }
- try:
- url = f"http://192.168.2.100:3500/api/getDifyRecord"
- response = requests.post(url, headers=headers,data=json.dumps(params))
- except Exception as e:
- pass
- def main():
- print("定时开始启动")
- scheduler = BlockingScheduler()
- # scheduler.add_job(job, 'interval', seconds=5)
- scheduler.add_job(job, 'cron', hour=1, minute=10)
- # scheduler.add_job(job, 'cron', second='*/5')
-
- try:
- print("定时启动")
- # 启动调度器
- scheduler.start()
- print("定时启动成功")
- except (KeyboardInterrupt, SystemExit):
- print("定时关闭")
- # 关闭调度器
- scheduler.shutdown()
- print("Scheduler shutdown complete")
- if __name__ == '__main__':
- main()
|