from apscheduler.scheduler import Scheduler
schedudler = Scheduler(daemonic = False)
@schedudler.cron_schedule(second='*', day_of_week='0-4', hour='9-12,13-15')
def quote_send_sh_job():
print 'a simple cron job start at', datetime.datetime.now()
schedudler.start()
对于定时任务我想大家都不陌生,简单的讲就是预先定义好在约定的时间执行约定的任务。这在程序开发中是一个很常用的功能在java 中可以使用Quartz,在.Net中也有对应的实现,而在Python中对应的实现就是APScheduler。
利用APScheduler,可以实现给定的日期时间,固定间隔时间,以及crontab类型的任务。这里解释一下cron,cron就是在Linux下使用的定时任务器,可以通过定义特定格式的表达式来对触发时间进行描述。在APScheduler中可以添加对应cron表达式的任务。
APScheduler可以选择任务的存储方式,如以内存存储方式的非持久化存储,或者以mongodb,redis,shelves,qlalchemy等的持久化方式存储。当然你也可以写自己的持久化存储方式只要继承JobStore类实现相关的方法就可以了。还是建议采用持久化的方式来存储,来避免出现当机等突发现象引起的不必要的损失。
APScheduler使用起来非常的简单。一种是通过装饰器的方式来调用。
一种是直接在类中调用。
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
__MYSQL_url = 'mysql://root:123456@localhost/w'
uler(standalone=True)
if __name__ == '__main__':
scheduler = Sche
dscheduler.add_jobstore(SQLAlchemyJobStore(url=__MYSQL_url), 'default')
d_date_job(alarm, alarm_time,, args=[date
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.a
dtime.now()])
print 'alarms added: ', alarm_time
alarm_time = datetime.now() + timedelta(seconds=15)
, alarm_time
alarm_time = datetime.now() + timedelta(seconds=20)
scheduler
scheduler.add_date_job(alarm, alarm_time,, args=[datetime.now()])
print 'alarms added:
'.add_date_job(alarm, alarm_time,, args=[datetime.now()])
print 'alarms added: ', alarm_time
scheduler.add_cron_job(alarm, day_of_week='mon-fri', hour=5, minute=30)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
注意代码一种将daemonic = False,daemonic属性是用来设置是否以后台线程的方式运行,默认值为true就是他会随主程序的结束一起结束,若设成false则只能手动结束(实质是调用python内部类threading中的setDaemon方法)。APScheduler是以线程池的多线程的方式并发执行任务的(参见threadpool.py)。
代码二演示了一个利用SQLAlchemy来实现持久化的方式,APScheduler会将定义好的将要执行的任务放入数据库,在任务执行完成后删除对应数据。记住在定义的多次任务中,数据库中存的只是下一个时间要执行的单条任务信息,而不是所有将会执行的任务信息。
再讲一下misfire_grace_time属性的作用。 因为某种特定的原因导致定时任务服务挂掉,在重启后如果任务的时间和实际的时间的差值小于定义的misfire_grace_time,任务还会执行,这就为任务提供了一定容错机制。
另外APScheduler还可以监听各种运行中的事件,来调用自己定义的方法如
def my_listener(event):
if event.exception:
print 'The job crashed :('
else:
print 'The job worked :)'
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)