Jusene's Blog

定时任务框架APScheduler

字数统计: 1.5k阅读时长: 6 min
2018/06/25 Share

APScheduler

最近想要建立一个crontab集中的任务管理中心,调用saltstack来集中管理IT系统,有两个方案,一个是apscheduler,一个celery,可是发现celery的crontab的功能存在bug,计划任务不能很好的执行,所以计划选择apscheduler来作为框架来完成。

基本概念

  • triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了初始化配置外,触发器完全是无状态的。
  • job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其他作业存储可以将任务作业保存到各种数据库中,支持MongoDB,Redis,SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。
  • executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器将会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据实际需求可以同时使用两种执行器。
  • schedulers(调度器):调度器是将其他部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器,相反调度器提供处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如何添加、修改、移除任务作业。

APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:

  • BlockingScheduler: 适合于只在进程中运行单个任务的情况,通常在调度器是唯一要运行的东西时使用。
  • BackgroundScheduler: 适合于要求在任何程序后台运行的情况,当希望调度器在应用后台执行时使用。
  • AsyncIOScheduler:适合于使用asyncio框架的情况。
  • GeventScheduler: 适合于使用gevent框架的情况。
  • TornadoScheduler: 适合于使用Tornado框架的应用。
  • TwistedScheduler: 适合使用Twisted框架的应用。
  • QtScheduler: 适合使用QT的情况。

BlockingScheduler

1
import datetime
2
from apscheduler.schedulers.blocking import BlockingScheduler
3
4
sched = BlockingScheduler()
5
6
# cron定时调度,某一定时刻执行
7
@sched.scheduled_job('cron', hour='21', minute='50',args=('hello apscheduler',), id='test_job')
8
def test_job(s):
9
    print(datetime.datetime.now(),s,'cron')
10
11
# 间隔调度,每隔多久执行
12
@sched.scheduled_job('interval',seconds=5,args=('hello world',), id='test1_job')
13
def test1_job(s):
14
    print(datetime.datetime.now(),s,'interval')
15
16
# 定时调度,作业只会执行一次
17
@sched.scheduled_job('date',run_date=datetime.datetime(2018,6,24,22,0,0),args=('hello date',), id='test2_job')
18
def test2_job(s):
19
    print(datetime.datetime.now(),s,'date')
20
21
if __name__ == "__main__":
22
    sched.start()

triggers(触发器)

  • cron: crontab类型任务
    • year: 2018
    • month: 1-12month
    • day: 1-31day
    • week: iso week(1-53)
    • day_of_week: 0-6 or mon,tue,wed,thu,fri,sat,sun
    • hour: 0-23hour
    • second: 0-59
    • minute: 0-59
    • start_date: datetime 表示开始时间
    • end_date: datetime 表示结束时间
    • timezone: datetime.tzinfo 表示时区取值
1
# 2018-6-24 23:59:0 执行my_job函数内容
2
sched.add_job(my_job, 'cron', year=2018, month=6, day=24, hour=23, minute=59, second=0)
3
4
# 在6,7,8,11,12月份第三个星期五的0,1,2,3点执行程序
5
sched.add_job(my_job1, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
6
7
# 每5秒执行程序
8
sched.add_job(my_job2, 'cron', second='*/5')
  • interval: 固定时间间隔任务
    • weeks: 等待周数
    • days: 等待天数
    • hours: 等待的小时数
    • miniutes: 等待分数
    • seconds: 等待秒数
    • start_date: datetime 表示开始时间
    • end_date: datetime 表示结束时间
    • timezone: datetime.tzinfo 表示时区取值
1
# 每隔3小时执行
2
sched.add_job(my_job, 'interval', hours=3)
  • date: 给予时间日期的一次性任务
    • run_date: datetime 任务开始的时间
    • timezone: datetime.tzinfo
1
# 下一次运行时间
2
sched.add_job(my_job,'date', run_date=datetime.datetime(2018,10,1,0,0,0))

操作作业

  • 添加作业:
1
sched.add_job(myfunc, 'interval', minutes=2, id='my_job')
  • 移除作业
1
sched.remove_job('my_job')
  • 暂停和恢复作业
1
sched.pause('my_job')
2
sched.resume('my_job')
  • 获取job列表
1
sched.get_job(job_id='test_job')
  • 关闭调度器
1
sched.shutdown()

日志

1
import datetime
2
from apscheduler.schedulers.blocking import BlockingScheduler
3
import logging
4
5
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(processName)s]: %(message)s')
6
7
sched = BlockingScheduler()
8
9
@sched.scheduled_job('cron', second='*/5', args=('hello apscheduler',), id='test_job')
10
def test_job(s):
11
    print(datetime.datetime.now(),s,'cron')
12
13
@sched.scheduled_job('interval',seconds=5,args=('hello world',), id='test1_job')
14
def test1_job(s):
15
    print(datetime.datetime.now(),s,'interval')
16
17
@sched.scheduled_job('date',run_date=datetime.datetime(2018,6,24,22,44,0),args=('hello date',), id='test2_job')
18
def test2_job(s):
19
    print(datetime.datetime.now(),s,'date')
20
21
if __name__ == "__main__":
22
    sched._logger = logging
23
    print(sched.get_jobs())
24
    try:
25
        sched.start()
26
    except KeyboardInterrupt:
27
        sched.shutdown()

意外处理

1
from apscheduler.schedulers.blocking import BlockingScheduler
2
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
3
import datetime
4
import logging
5
6
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(processName)s]: %(message)s')
7
8
9
def date_test(x):
10
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)
11
12
def my_lister(event):
13
    if event.exception:
14
        print('任务出错了')
15
    else:
16
        print('任务调度正常')
17
18
scheduler = BlockingScheduler()
19
scheduler.add_job(func=date_test, args=('worker',),trigger='interval',seconds=3,id='interval_task')
20
scheduler.add_listener(my_lister, EVENT_JOB_EXECUTED|EVENT_JOB_ERROR)
21
scheduler.start()

在生产环境下,把异常处理换成一份邮件,这样定时任务可以立马知道。

配置scheduler

1
from pytz import timezone
2
from apscheduler.schedulers.background import BackgroundScheduler
3
from apscheduler.jobstores.mongodb import MongoDBJobStore
4
from apscheduler.jobstores.redis import RedisJobStore
5
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
6
from apscheduler.executors.pool import ThreadPoolExectuor, ProcessPoolExecutor
7
8
jobstores = {
9
    'mongo': MongoDBJobStore(),
10
    'default': RedisJobStore(db=0)
11
}
12
13
executors = {
14
    'default': ThreadPoolExecutor(20),
15
    'processpool': ProcessPoolExecutor(5)
16
}
17
18
job_defaults = {
19
    'coalesce': False,   #错过执行的任务与合并,最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次
20
    'max_instance': 3    #默认同个时刻只能运行一个实例,通过max_instances=3修改3个
21
}
22
23
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone('Asia/Shanghai'))
24
25
@scheduler.scheduled_job('cron', second='*/5', args=('hello apscheduler',), id='test_job')
26
def test_job(x):
27
    print(x)
28
29
if __name__ == "__main":
30
    try:
31
        scheduler.start()
32
    except KeyboardInterrupt:
33
        scheduler.shutdown()
34
35
    while True:
36
        pass
CATALOG
  1. 1. APScheduler
  2. 2. BlockingScheduler
    1. 2.1. triggers(触发器)
    2. 2.2. 操作作业
    3. 2.3. 日志
    4. 2.4. 意外处理
    5. 2.5. 配置scheduler