Python定时任务调度——APScheduler
简介
APScheduler 框架可以让用户定时执行或者周期性执行 Python 任务。既可以添加任务也可以删除任务,还可以将任务存储在数据库中。当 APScheduler 重启之后,还会继续执行之前设置的任务。
APScheduler 是跨平台的,注意 APScheduler 既不是守护进程也不是服务,更不是命令行程序。APScheduler 是进程内的调度器,也就是说它的实现原理是在进程内产生内置的阻塞来创建定时服务,以便在预定的时间内执行某个任务。
安装
可以使用 pip 进行安装:
pip install apscheduler
基础概念
APScheduler 由四个组件构成:
- 触发器(triggers)
- 任务仓库(job stores)
- 执行器(executors)
- 调度器(schedulers)
触发器包含了所有定时任务逻辑,每个任务都有一个对应的触发器,触发器决定任何的何时执行,初始配置情况下,触发器是无状态的。
任务仓库保存要执行的任务,其中一个默认的任务仓库将任务保存在内存中,而另外几个任务仓库将任务保存在数据库中。在将任务保存到任务仓库前,会对任务执行序列化操作,当重新读取任务时,再执行反序列化操作。除了默认的任务仓库,其他任务仓库都不会在内存中保存任务,而是作为任务保存、加载、更新以及搜索的一个中间件。任务仓库在定时器之间不能共享。
执行器用来执行定时任务,它只是将要执行的任务放在新的线程或者线程池中运行。执行完毕之后,再通知定时器。
调度器将其它几个组件联系在一起,一般在应用中只有一个调度器,程序开发者不会直接操作触发器、任务仓库或执行器,相反,调度器提供了这个接口。任务仓库以及执行器的配置都是通过调度器来实现的。
选择合适的调度器、任务仓库、执行器和触发器
APScheduler 支持的存储方式有:
- MemoryStore
- SQLAlchemyJobStore,默认使用 SQLite。
- MongoDBJobStore
- ZooKeeperJobStore
- RedisJobStore
- RethinkDBJobStore
如果是非持久任务,使用默认配置的MemoryStore就可以了,如果是持久性任务,那么久需要根据编程环境进行选择了。
APScheduler 中一些常用调度器:
- BlockingScheduler:适合于只在进程中运行单个任务的情况
- BackgroundScheduler: 适合于要求任何在程序后台运行的情况
- AsyncIOScheduler:适合于使用asyncio框架的情况
- GeventScheduler: 适合于使用gevent框架的情况
- TornadoScheduler: 适合于使用Tornado框架的应用
- TwistedScheduler: 适合使用Twisted框架的应用
- QtScheduler: 适合使用QT的情况
大多数情况下,执行器选择ThreadPoolExecutor就可以了,但是如果涉及到比较耗CPU的任务,就可以选择ProcessPoolExecutor,以充分利用多核CPU。,当然也可以同时使用两个执行器。
当调度一个任务时,需要选择一个触发器。这个触发器决定何时执行任务。APScheduler 支持的触发器有三种:
- date:任务仅执行一次
- interval:任务循环执行
- cron:任务定时执行
trigger 对任务的控制
add_job的第二个参数是trigger,它管理着作业的调度方式。它可以为date, interval或者cron。对于不同的trigger,对应的参数也相同。
cron 定时调度
参数:
属性 | 类型 | 举例 |
---|---|---|
year | int、str | 4-digit year |
month | int、str | month (1-12) |
day | int、str | day of the (1-31) |
week | int、str | ISO week (1-53) |
day_of_week | int、str | number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) |
hour | int、str | hour (0-23) |
minute | int、str | minute (0-59) |
second | int、str | second (0-59) |
start_date | datetime、str | earliest possible date/time to trigger on (inclusive) |
end_date | datetime、str | latest possible date/time to trigger on (inclusive) |
timezone | datetime.tzinfo、str | time zone to use for the date/time calculations (defaults to scheduler timezone) |
和Linux的Crontab一样,它的值格式为:
Expression | Field | Description |
---|---|---|
any | Fire on every value | |
/a | any | Fire every a values, starting from the minimum |
a-b | any | Fire on any value within the a-b range (a must be smaller than b) |
a-b/c | any | Fire every c values within the a-b range |
xth y | day | Fire on the x -th occurrence of weekday y within the month |
last x | day | Fire on the last occurrence of weekday x within the month |
last | day | Fire on the last day within the month |
x,y,z | any | Fire on any matching expression; can combine any number of any of the above expressions |
例如:
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
interval 间隔调度
参数:
属性 | 类型 | 举例 |
---|---|---|
weeks | int | number of weeks to wait |
days | int | number of days to wait |
hours | int | number of hours to wait |
minutes | int | number of minutes to wait |
seconds | int | number of seconds to wait |
start_date | datetime、str | starting point for the interval calculation |
end_date | datetime、str | latest possible date/time to trigger on |
timezone | datetime.tzinfo、str | time zone to use for the date/time calculations |
例如:
# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)
date 定时调度
最基本的一种调度,作业只会执行一次。它的参数如下:
属性 | 类型 | 举例 |
---|---|---|
run_date | datetime、str | the date/time to run the job at |
timezone | datetime.tzinfo、str | time zone for run_date if it doesn’t have one already |
例如:
# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text
使用MemoryStore
使用 MemoryStore、BlockingScheduler 观察 corn、interval、date 的不同。
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def alarm(type):
print '[%s Alarm] This alarm was scheduled at %s.' % (type, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 定时执行
def corn_trigger():
global SCHEDULER
SCHEDULER.add_job(func=alarm, args=['cron'], trigger='cron', second='*/5', id='corn_job')
# 循环执行
def interval_trigger():
global SCHEDULER
SCHEDULER.add_job(func=alarm, args=['interval'], trigger='interval', seconds=5, id='interval_job')
# 一次执行
def date_trigger():
global SCHEDULER
SCHEDULER.add_job(func=alarm, args=['date'], trigger='date', run_date=datetime.now(), id='date_job')
SCHEDULER = BlockingScheduler()
if __name__ == '__main__':
corn_trigger()
interval_trigger()
date_trigger()
try:
SCHEDULER.start()
except (KeyboardInterrupt, SystemExit):
SCHEDULER.shutdown()
执行结果:
[date Alarm] This alarm was scheduled at 2017-07-22 11:12:42.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:45.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:47.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:50.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:52.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:55.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:57.
任务持久化(SQLAlchemyJobStore)
APScheduler 可以把任务持久化,如持久化到 MySQL 中。当 APScheduler 把任务持久好到 MySQL 时,会默认自动创建一张 apscheduler_jobs:
字段名 | 说明 |
---|---|
id | 定义的 job_id |
next_run_time | 下次执行时间 |
job_state | job 的信息 |
定时从 MySQL 中查询数据
import MySQLdb
import time
import logging
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log.txt',
filemode='a')
def query(host, port, user, password, db):
conn = MySQLdb.connect(host=host, port=port, user=user, passwd=password, db=db)
cr = conn.cursor()
cr.execute('select * from score')
conn.commit()
res = cr.fetchall()
print res
with open(r'rs.txt', 'a') as f:
f.write(str(res) + '\n')
if __name__ == '__main__':
url = 'mysql://root:123456@localhost:3306/work'
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(executors=executors, job_defaults=job_defaults)
scheduler.add_jobstore('sqlalchemy', url=url)
start = datetime.strptime('2017-07-22 11:32:00', '%Y-%m-%d %H:%M:%S')
end = datetime.strptime('2017-07-22 11:34:00', '%Y-%m-%d %H:%M:%S')
scheduler.add_job(func=query, args=('127.0.0.1', 3306, 'root', '123456', 'test'),
trigger='cron', start_date=start, end_date=end, second='*/5', id='query')
try:
scheduler.start()
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
GitHub 上可查看源代码
参考资料
APScheduler User Guide
还没有评论,来说两句吧...