APScheduler
最近想要建立一个crontab集中的任务管理中心,调用saltstack来集中管理IT系统,有两个方案,一个是apscheduler,一个celery,可是发现celery的crontab的功能存在bug,计划任务不能很好的执行,所以计划选择apscheduler来作为框架来完成。
基本概念
- triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了初始化配置外,触发器完全是无状态的。
- job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其他作业存储可以将任务作业保存到各种数据库中,支持MongoDB,Redis,SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。
- executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器将会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据实际需求可以同时使用两种执行器。
- schedulers(调度器):调度器是将其他部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器,相反调度器提供处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如何添加、修改、移除任务作业。
APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:
- BlockingScheduler: 适合于只在进程中运行单个任务的情况,通常在调度器是唯一要运行的东西时使用。
- BackgroundScheduler: 适合于要求在任何程序后台运行的情况,当希望调度器在应用后台执行时使用。
- AsyncIOScheduler:适合于使用asyncio框架的情况。
- GeventScheduler: 适合于使用gevent框架的情况。
- TornadoScheduler: 适合于使用Tornado框架的应用。
- TwistedScheduler: 适合使用Twisted框架的应用。
- QtScheduler: 适合使用QT的情况。
BlockingScheduler
1 | import datetime |
2 | from apscheduler.schedulers.blocking import BlockingScheduler |
3 |
|
4 | sched = BlockingScheduler() |
5 |
|
6 |
|
7 | @sched.scheduled_job('cron', hour='21', minute='50',args=('hello apscheduler',), id='test_job') |
8 | def test_job(s): |
9 | print(datetime.datetime.now(),s,'cron') |
10 |
|
11 |
|
12 | @sched.scheduled_job('interval',seconds=5,args=('hello world',), id='test1_job') |
13 | def test1_job(s): |
14 | print(datetime.datetime.now(),s,'interval') |
15 |
|
16 |
|
17 | @sched.scheduled_job('date',run_date=datetime.datetime(2018,6,24,22,0,0),args=('hello date',), id='test2_job') |
18 | def test2_job(s): |
19 | print(datetime.datetime.now(),s,'date') |
20 |
|
21 | if __name__ == "__main__": |
22 | sched.start() |
triggers(触发器)
- cron: crontab类型任务
-
-
-
-
- day_of_week: 0-6 or mon,tue,wed,thu,fri,sat,sun
-
-
-
- start_date: datetime 表示开始时间
- end_date: datetime 表示结束时间
- timezone: datetime.tzinfo 表示时区取值
1 |
|
2 | sched.add_job(my_job, 'cron', year=2018, month=6, day=24, hour=23, minute=59, second=0) |
3 |
|
4 |
|
5 | sched.add_job(my_job1, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') |
6 |
|
7 |
|
8 | sched.add_job(my_job2, 'cron', second='*/5') |
- interval: 固定时间间隔任务
-
-
-
-
-
- start_date: datetime 表示开始时间
- end_date: datetime 表示结束时间
- timezone: datetime.tzinfo 表示时区取值
1 |
|
2 | sched.add_job(my_job, 'interval', hours=3) |
- date: 给予时间日期的一次性任务
- run_date: datetime 任务开始的时间
- timezone: datetime.tzinfo
1 |
|
2 | sched.add_job(my_job,'date', run_date=datetime.datetime(2018,10,1,0,0,0)) |
操作作业
1 | sched.add_job(myfunc, 'interval', minutes=2, id='my_job') |
1 | sched.remove_job('my_job') |
1 | sched.pause('my_job') |
2 | sched.resume('my_job') |
1 | sched.get_job(job_id='test_job') |
日志
1 | import datetime |
2 | from apscheduler.schedulers.blocking import BlockingScheduler |
3 | import logging |
4 |
|
5 | logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(processName)s]: %(message)s') |
6 |
|
7 | sched = BlockingScheduler() |
8 |
|
9 | @sched.scheduled_job('cron', second='*/5', args=('hello apscheduler',), id='test_job') |
10 | def test_job(s): |
11 | print(datetime.datetime.now(),s,'cron') |
12 |
|
13 | @sched.scheduled_job('interval',seconds=5,args=('hello world',), id='test1_job') |
14 | def test1_job(s): |
15 | print(datetime.datetime.now(),s,'interval') |
16 |
|
17 | @sched.scheduled_job('date',run_date=datetime.datetime(2018,6,24,22,44,0),args=('hello date',), id='test2_job') |
18 | def test2_job(s): |
19 | print(datetime.datetime.now(),s,'date') |
20 |
|
21 | if __name__ == "__main__": |
22 | sched._logger = logging |
23 | print(sched.get_jobs()) |
24 | try: |
25 | sched.start() |
26 | except KeyboardInterrupt: |
27 | sched.shutdown() |
意外处理
1 | from apscheduler.schedulers.blocking import BlockingScheduler |
2 | from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR |
3 | import datetime |
4 | import logging |
5 |
|
6 | logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(processName)s]: %(message)s') |
7 |
|
8 |
|
9 | def date_test(x): |
10 | print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x) |
11 |
|
12 | def my_lister(event): |
13 | if event.exception: |
14 | print('任务出错了') |
15 | else: |
16 | print('任务调度正常') |
17 |
|
18 | scheduler = BlockingScheduler() |
19 | scheduler.add_job(func=date_test, args=('worker',),trigger='interval',seconds=3,id='interval_task') |
20 | scheduler.add_listener(my_lister, EVENT_JOB_EXECUTED|EVENT_JOB_ERROR) |
21 | scheduler.start() |
在生产环境下,把异常处理换成一份邮件,这样定时任务可以立马知道。
配置scheduler
1 | from pytz import timezone |
2 | from apscheduler.schedulers.background import BackgroundScheduler |
3 | from apscheduler.jobstores.mongodb import MongoDBJobStore |
4 | from apscheduler.jobstores.redis import RedisJobStore |
5 | from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore |
6 | from apscheduler.executors.pool import ThreadPoolExectuor, ProcessPoolExecutor |
7 |
|
8 | jobstores = { |
9 | 'mongo': MongoDBJobStore(), |
10 | 'default': RedisJobStore(db=0) |
11 | } |
12 |
|
13 | executors = { |
14 | 'default': ThreadPoolExecutor(20), |
15 | 'processpool': ProcessPoolExecutor(5) |
16 | } |
17 |
|
18 | job_defaults = { |
19 | 'coalesce': False, |
20 | 'max_instance': 3 |
21 | } |
22 |
|
23 | scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone('Asia/Shanghai')) |
24 |
|
25 | @scheduler.scheduled_job('cron', second='*/5', args=('hello apscheduler',), id='test_job') |
26 | def test_job(x): |
27 | print(x) |
28 |
|
29 | if __name__ == "__main": |
30 | try: |
31 | scheduler.start() |
32 | except KeyboardInterrupt: |
33 | scheduler.shutdown() |
34 |
|
35 | while True: |
36 | pass |