GVKun编程网logo

[转]Python定时任务框架APScheduler

7

如果您想了解[转]Python定时任务框架APScheduler的知识,那么本篇文章将是您的不二之选。同时我们将深入剖析apscheduler定时任务框架、APScheduler定时任务框架、APSc

如果您想了解[转]Python定时任务框架APScheduler的知识,那么本篇文章将是您的不二之选。同时我们将深入剖析apscheduler 定时任务框架、APScheduler定时任务框架、APScheduler轻量级定时任务框架、APScheduler(定时任务—Python库)的各个方面,并给出实际的案例分析,希望能帮助到您!

本文目录一览:

[转]Python定时任务框架APScheduler

[转]Python定时任务框架APScheduler

  APScheduler是基于Quartz的 一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以 持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比java舒服多了。

1. 安装

        安装过程很简单,可以基于easy_install和源码。

  1. easy_install apscheduler  

        或者下载源码,运行命令:

  1. python setup.py install  

2. cron job例子

        APScheduler是进程内的调度器,可以定时触发具体的函数,并且可以访问应用的所有变量和函数。在web应用中通过APScheduler实现定时任务是很方便的。下面看例子:

  1. from apscheduler.scheduler import Scheduler  
  2.   
  3. schedudler = Scheduler(daemonic = False)  
  4.  
  5. @schedudler.cron_schedule(second=''*'', day_of_week=''0-4'', hour=''9-12,13-15'')  
  6. def quote_send_sh_job():  
  7.     print ''a simple cron job start at'', datetime.datetime.now()  
  8.   
  9. schedudler.start()  

        上面通过装饰器定义了cron job,可以通过函数scheduler.add_cron_job添加,用装饰器更方便。Scheduler构造函数中传入daemonic参数,表示执行线程是非守护的,在Schduler的文档中推荐使用非守护线程:

  1. Jobs are always executed in non-daemonic threads.  

        具体cron job的配置参看doc,基本上与Quartz一致。

        在添加job时还有一个比较重要的参数max_instances,指定一个job的并发实例数,默认值是1。默认情况下,如果一个job准备执行,但是该job的前一个实例尚未执行完,则后一个job会失败,可以通过这个参数来改变这种情况。

3. Store

        APScheduler提供了jobstore用于存储job的执行信息,默认使用的是RAMJobStore,还提供了 SQLAlchemyJobStore、ShelveJobStore和MongoDBJobStore。APScheduler允许同时使用多个 jobstore,通过别名(alias)区分,在添加job时需要指定具体的jobstore的别名,否则使用的是别名是default的 jobstore,即RAMJobStore。下面以MongoDBJobStore举例说明。

 
  1. import pymongo  
  2. from apscheduler.scheduler import Scheduler  
  3. from apscheduler.jobstores.mongodb_store import MongoDBJobStore  
  4. import time  
  5.   
  6. sched = Scheduler(daemonic = False)  
  7.   
  8. mongo = pymongo.Connection(host=''127.0.0.1'', port=27017)  
  9. store = MongoDBJobStore(connection=mongo)  
  10. sched.add_jobstore(store, ''mongo'')        # 别名是mongo  
  11.  
  12. @sched.cron_schedule(second=''*'', day_of_week=''0-4'', hour=''9-12,13-15'', jobstore=''mongo'')        # 向别名为mongo的jobstore添加job  
  13. def job():  
  14.         print ''a job''  
  15.         time.sleep(1)  
  16.   
  17. sched.start()  

        注意start必须在添加job动作之后调用,否则会抛错。默认会把job信息保存在apscheduler数据库下的jobs表:

 
  1. > db.jobs.findOne()  
  2. {  
  3.         "_id" : ObjectId("502202d1443c1557fa8b8d66"),  
  4.         "runs" : 20,  
  5.         "name" : "job",  
  6.         "misfire_grace_time" : 1,  
  7.         "coalesce" : true,  
  8.         "args" : BinData(0,"gAJdcQEu"),  
  9.         "next_run_time" : ISODate("2012-08-08T14:10:46Z"),  
  10.         "max_instances" : 1,  
  11.         "max_runs" : null,  
  12.         "trigger" : BinData(0,"gAJjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3JvbgpDcm9uVHJpZ2dlcgpxASmBcQJ9cQMoVQZmaWVsZHNxBF1xBShjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5maWVsZHMKQmFzZUZpZWxkCnEGKYFxB31xCChVCmlzX2RlZmF1bHRxCYhVC2V4cHJlc3Npb25zcQpdcQtjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5leHByZXNzaW9ucwpBbGxFeHByZXNzaW9uCnEMKYFxDX1xDlUEc3RlcHEPTnNiYVUEbmFtZXEQVQR5ZWFycRF1YmgGKYFxEn1xEyhoCYhoCl1xFGgMKYFxFX1xFmgPTnNiYWgQVQVtb250aHEXdWJjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5maWVsZHMKRGF5T2ZNb250aEZpZWxkCnEYKYFxGX1xGihoCYhoCl1xG2gMKYFxHH1xHWgPTnNiYWgQVQNkYXlxHnViY2Fwc2NoZWR1bGVyLnRyaWdnZXJzLmNyb24uZmllbGRzCldlZWtGaWVsZApxHymBcSB9cSEoaAmIaApdcSJoDCmBcSN9cSRoD05zYmFoEFUEd2Vla3EldWJjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5maWVsZHMKRGF5T2ZXZWVrRmllbGQKcSYpgXEnfXEoKGgJiWgKXXEpY2Fwc2NoZWR1bGVyLnRyaWdnZXJzLmNyb24uZXhwcmVzc2lvbnMKUmFuZ2VFeHByZXNzaW9uCnEqKYFxK31xLChoD05VBGxhc3RxLUsEVQVmaXJzdHEuSwB1YmFoEFULZGF5X29mX3dlZWtxL3ViaAYpgXEwfXExKGgJiWgKXXEyKGgqKYFxM31xNChoD05oLUsMaC5LCXViaCopgXE1fXE2KGgPTmgtSw9oLksNdWJlaBBVBGhvdXJxN3ViaAYpgXE4fXE5KGgJiGgKXXE6aAwpgXE7fXE8aA9Oc2JhaBBVBm1pbnV0ZXE9dWJoBimBcT59cT8oaAmJaApdcUBoDCmBcUF9cUJoD05zYmFoEFUGc2Vjb25kcUN1YmVVCnN0YXJ0X2RhdGVxRE51Yi4="),  
  13.         "func_ref" : "__main__:job",  
  14.         "kwargs" : BinData(0,"gAJ9cQEu")  
  15. }  

        上面就是存储的具体信息。

4.异常处理

        当job抛出异常时,APScheduler会默默的把他吞掉,不提供任何提示,这不是一种好的实践,我们必须知晓程序的任何差错。APScheduler提供注册listener,可以监听一些事件,包括:job抛出异常、job没有来得及执行等。

 

Constant Event class Triggered when... EVENT_SCHEDULER_START SchedulerEvent The scheduler is started EVENT_SCHEDULER_SHUTDOWN SchedulerEvent The scheduler is shut down EVENT_JOBSTORE_ADDED JobStoreEvent A job store is added to the scheduler EVENT_JOBSTORE_REMOVED JobStoreEvent A job store is removed from the scheduler EVENT_JOBSTORE_JOB_ADDED JobStoreEvent A job is added to a job store EVENT_JOBSTORE_JOB_REMOVED JobStoreEvent A job is removed from a job store EVENT_JOB_EXECUTED JobEvent A job is executed successfully EVENT_JOB_ERROR JobEvent A job raised an exception during execution EVENT_JOB_MISSED JobEvent A job’s execution time is missed


        看下面的例子,监听异常和miss事件,这里用logging模块打印日志,logger.exception()可以打印出异常堆栈信息。

 
  1. def err_listener(ev):  
  2.     err_logger = logging.getLogger(''schedErrJob'')  
  3.     if ev.exception:  
  4.         err_logger.exception(''%s error.'', str(ev.job))  
  5.     else:  
  6.         err_logger.info(''%s miss'', str(ev.job))  
  7.   
  8. schedudler.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)  

        事件的属性包括:

 

  • job – the job instance in question
  • scheduled_run_time – the time when the job was scheduled to be run
  • retval – the return value of the successfully executed job
  • exception – the exception raised by the job
  • traceback – the traceback object associated with the exception

        最后,需要注意一点当job不以daemon模式运行时,并且APScheduler也不是daemon的,那么在关闭脚本时,Ctrl + C是不奏效的,必须kill才可以。可以通过命令实现关闭脚本:

 
ps axu | grep {脚本名} | grep -v grep | awk ''{print $2;}'' | xargs kill

 


apscheduler 定时任务框架

apscheduler 定时任务框架

一、APScheduler简介:

    Python的一个定时任务框架,满足用户定时执行或者周期性执行任务的需求,提供了基于日期date、固定的时间间隔interval、以及类似于Linux上的定时任务crontab类型的定时任务。并且该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化。

Python的第三方库,用来提供Python的后台程序。包含四个组件,分别是:

triggers:任务触发器组件,提供任务触发方式

job stores: 任务商店组件,提供任务保存方式

executors:任务调度组件,提供任务调度方式

schedulers: 任务调度组件,提供任务工作方式

 二、APScheduler安装

1)利用pip安装(推荐)

# pip install apscheduler
2 基于源码:https://pypi.python.org/pypi/APScheduler/

# python setup.py install

三、基本概念

1、 APScheduler有四种组件及相关说明

1) triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了他们自己初始化配置外,触发器完全是无状态的。
2 )job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其他作业存储器可以将任务保存到各种数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。

3)executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器就会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务如比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据实际需求可以同时使用两种执行器

4)schedulers(调度器):调度器是将其他部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器。相反调度器提供了处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如可以添加、移除、修改任务作业。

APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:

BlockingScheduler:适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用

BackgroundScheduler:适合于要求任务在程序后台运行的情况,当希望调度器在应用后台执行时使用。

AsyncIOScheduler:适合于使用asyncio框架的情况

GeventScheduler:适合于使用gevent框架的情况

TornadoScheduler:适合于使用Tornado框架的应用

TwistedScheduler:适合使用Twisted框架的应用

QtScheduler:适合使用QT的情况

2、配置调度器

APScheduler提供了许多不同的方式来配置调度器,你可以使用一个配置字典或者作为参数关键字的方式传入。你也可以先创建调度器。在配置和添加作业,这样可以在不同的环境中得到更大的灵活性。

3、简单的实例

 

from apscheduler.schedulers.blocking import BlockingScheduler
import time
#实例化一个调度器
scheduler = BlockingScheduler()

def job1():
print "%s: 执行任务" % time.asctime()

# 添加任务并设置触发方式为3s一次

scheduler.add_job(job1, ''interval'', seconds=3)
#开始运行调度器
scheduler.start()

 

四、各组件功能

 1、trigger组件

trigger提供任务的触发方式,共三种方式:

      date:只在某个时间点执行一次run_date(datetime|str)

scheduler.add_job(my_job, ''date'', run_date=date(2017, 9, 8), args=[])
scheduler.add_job(my_job, ''date'', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
scheduler.add_job(my_job, ''date'', run_date=''2019-6-12 21:30:05'', args=[])
# The ''date'' trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=[[])

 interval:每隔一段时间执行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0,

start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, ''interval'', hours=2)
scheduler.add_job(my_job, ''interval'', hours=2, start_date=''2017-9-8 21:30:00'',
end_date=''2019-06-12 21:30:00)
@scheduler.scheduled_job(''interval'', id=''my_job_id'', hours=2)
def my_job():
print("Hello World")

 

cron:使用Linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

       sched.add_job(my_job, ''cron'', hour=3, minute=30)

    sched.add_job(my_job, ''cron'', day_of_week=''mon-fri'', hour=5, minute=30, end_date=''2017-10-
30'')
    @sched.scheduled_job(''cron'', id=''my_job_id'', day=''last sun'')
    def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

2、scheduler组件

scheduler组件提供执行的方式,在不同的运行环境中选择合适的方式

 BlockingScheduler:进程中只运行调度器时的方式

from apscheduler.schedulers.blocking import BlockingScheduler
import time
scheduler = BlockingScheduler()
def job1():

print "%s: 执行任务" % time.asctime()

scheduler.add_job(job1, ''interval'', seconds=3)
scheduler.start()

BackgroundScheduler:不想使用任何框架时的方式

from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():

print "%s:执行任务 " % time.asctime() 

scheduler.add_job(job1, ''interval'', seconds=3)

scheduler.start()
while True:
    pass

AsyncIOScheduler: asyncio module的方式( Python3)

from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
import asyncio
except ImportError:
import trollius as asyncio
...
...

# while True pass

try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass

GeventScheduler: gevent方式 

from apscheduler.schedulers.gevent import GeventScheduler
...

...

g = scheduler.start()
# while True:pass
try:
    g.join()
except (KeyboardInterrupt, SystemExit):
    pass

TornadoScheduler: Tornado方式

from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler

... 

...

# while True:pass
try:
    IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
    pass

TwistedScheduler: Twisted方式

from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler

... 

...

# while True:pass
try:
    reactor.run()
except (KeyboardInterrupt, SystemExit):
    pass

 

QtScheduler: Qt方式

3、executors组件

executors组件提供任务的调度方式

base

debug
gevent 

pool(max_workers=10) 

twisted 

4、jobstore组件

jobstore提供任务的各种持久化方式

base

memory
mongodb
    scheduler.add_jobstore(''mongodb'', collection=''example_jobs'') 

redis

    scheduler.add_jobstore(''redis'', jobs_key=''example.jobs'', run_times_key=''example.run_times'')
rethinkdb
    scheduler.add_jobstore(''rethinkdb'', database=''apscheduler_example'') 

sqlalchemy

  scheduler.add_jobstore(''sqlalchemy'', url=url)

zookeeper

  scheduler.add_jobstore(''zookeeper'', path=''/example_jobs'')

 

五、任务操作

1、添加任务add_job(如上)

如果使用了任务的存储,开启时最好添加replace_existing=True,否则每次开启时都会创建任务的副本,开启后任务不会马上启动,可修改triger参数

2、删除任务remove_job

#根据任务实例删除

job = scheduler.add_job(myfunc, ''interval'', minutes=2)
job.remove()

# 根据任务id删除

scheduler.add_job(myfunc, ''interval'', minutes=2, id=''my_job_id'')
scheduler.remove_job(''my_job_id'')

3、任务的暂停pause_job和继续resume_job

job = scheduler.add_job(myfunc, ''interval'', minutes=2)
#根据任务实例
job.pause()
job.resume()

# 根据任务id暂停

scheduler.add_job(myfunc, ''interval'', minutes=2, id=''my_job_id'')
scheduler.pause_job(''my_job_id'')
4、任务的修饰modify和重设reschedule_job

修饰:job.modify(max_instances=6, name=''Alternate name'') 

重设:scheduler.reschedule_job(''my_job_id'', trigger=''cron'', minute=''*/5'')

5、调度器操作

开启:scheduler.start()

关闭:scheduler.shotdown(wait=True | False)

暂停:scheduler.pause()

继续:scheduler.resume() 

监听:http://apscheduler.readthedocs.io/en/v3.3.0/modules/events.html#module-apscheduler.events

def my_listener(event):
    if event.exception:
print(''The job crashed :('')
    else:
print(''The job worked :)'')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
官方实例
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
    ''mongo'': MongoDBJobStore(),
    ''default'': SQLAlchemyJobStore(url=''sqlite:///jobs.sqlite'')
}
executors = {
''default'': ThreadPoolExecutor(20),
''processpool'': ProcessPoolExecutor(5)
}
job_defaults = {
    ''coalesce'': False,
    ''max_instances'': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
job_defaults=job_default

 

APScheduler定时任务框架

APScheduler定时任务框架

1.简介

APScheduler是一个Python**定时任务框架**,提供了**基于日期**、**固定时间间隔**以及**crontab**类型的任务,并且可以**持久化任务**。基于这些功能,我们可以很方便的实现一个python定时任务系统。

2.安装

pip install apscheduler

3.组成部分

触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。

作业存储器(job stores):作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。

执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。

调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。

4.操作作业

添加

? add_job的第二个参数是trigger,它管理着作业的调度方式。它可以为date,interval或者cron。对于不同的trigger,对应的参数也相同。

  • add_job() 添加定时任务
import time
 
from apscheduler.schedulers.blocking import BlockingScheduler

def my_job():
    print time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time()))

sched = BlockingScheduler()
sched.add_job(my_job,'interval',seconds=5)
sched.start()
  • scheduled_job()修饰器 添加定时任务
import time
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
 
@sched.scheduled_job('interval',seconds=5)
def my_job():
    print time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time()))
sched.start()

移除

job = scheduler.add_job(myfunc,minutes=2)
job.remove()
#如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效

sched.add_job(myfunc,minutes=2,id='my_job_id')
sched.remove_job('my_job_id')

暂停/恢复

apsched.job.Job.pause()
apsched.schedulers.base.BaseScheduler.pause_job()

获得job列表

获得调度作业的列表,可以使用get_jobs()来完成,它会返回所有的job实例。或者使用print_jobs()来输出所有格式化的作业列表。也可以利用get_job(任务ID)获取指定任务的作业列表

job = sched.add_job(my_job,seconds=2,id='123')
print sched.get_job(job_id='123')
print sched.get_jobs()

关闭调度器

默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。

sched.shutdown()
sched.shutdown(wait=False)
@H_301_72@5.作业运行的控制

corn定时调度

(int|str) 表示参数既可以是int类型,也可以是str类型

(datetime | str) 表示参数既可以是datetime类型,也可以是str类型

 

year (int|str) – 4-digit year -(表示四位数的年份,如2008年)

month (int|str) – month (1-12) -(表示取值范围为1-12月)

day (int|str) – day of the (1-31) -(表示取值范围为1-31日)

week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))

day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)

hour (int|str) – hour (0-23) - (表示取值范围为0-23时)

minute (int|str) – minute (0-59) - (表示取值范围为0-59分)

second (int|str) – second (0-59) - (表示取值范围为0-59秒)

start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)

end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)

timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
  • 某一定时时刻执行
#表示2017年3月22日17时19分07秒执行该程序
sched.add_job(my_job,'cron',year=2017,month = 03,day = 22,hour = 17,minute = 19,second = 07)

#表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
sched.add_job(my_job,month='6-8,11-12',day='3rd fri',hour='0-3')

#表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00
sched.add_job(my_job(),day_of_week='mon-fri',hour=5,minute=30,end_date='2014-05-30')

#表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
sched.add_job(my_job,second = '*/5')

1571301599276

interval 间隔调度

  • 参数介绍

    weeks (int) – number of weeks to wait
    days (int) – number of days to wait
    hours (int) – number of hours to wait
    minutes (int) – number of minutes to wait
    seconds (int) – number of seconds to wait
    start_date (datetime|str) – starting point for the interval calculation
    end_date (datetime|str) – latest possible date/time to trigger on
    timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
  • 示例

    #表示每隔3天17时19分07秒执行一次任务
    sched.add_job(my_job,days  = 03,hours = 17,minutes = 19,seconds = 07)

date 定时调度

  • 作业只会执行一次

    run_date (datetime|str) – the date/time to run the job at  -(任务开始的时间)
    
    timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
  • 例子

    # The job will be executed on November 6th,2009
    
    sched.add_job(my_job,'date',run_date=date(2009,6),args=['text'])
    
    # The job will be executed on November 6th,2009 at 16:30:05
    
    sched.add_job(my_job,run_date=datetime(2009,6,16,30,5),args=['text'])

APScheduler轻量级定时任务框架

APScheduler轻量级定时任务框架

目录
  • 一、APScheduler简介
    • 支持的后端存储作业
    • 集成的Python框架
  • 二、APScheduler下载安装
  • 三、APScheduler组件
    • 各组件简介
      • 调度器
      • 作业存储器
      • 执行器
      • 触发器
  • 四、使用
    • 添加任务
    • 指定时间执行任务,只执行一次
    • 间隔时间执行任务

一、APScheduler简介

APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库)。
APScheduler有三个内置的调度系统,其中包括:

  • cron式调度(可选开始/结束时间)
  • 基于间隔的执行(以偶数间隔运行作业,也可以选择开始/结束时间)
  • 一次性延迟执行任务(在指定的日期/时间内运行作业一次)

支持的后端存储作业

APScheduler可以任意混合和匹配调度系统和作业存储的后端,其中支持后端存储作业包括:

  • Memory
  • SQLAlchemy
  • MongoDB
  • Redis
  • RethinkDB
  • ZooKeeper

集成的Python框架

APScheduler内继承了几个常见的Python框架:

  • asyncio
  • gevent
  • tornado
  • qt

二、APScheduler下载安装

使用pip安装:

pip install apscheduler
pip install apscheduler==3.6.3

如果超时或者出现别的情况,可以选择:

# 法1使用豆瓣源下载
pip install -i https://pypi.doubanio.com/simple/ apscheduler
# 法2使用清华源下载
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple apscheduler

要是再不行,点击该链接或者pypi官网下载了。下载并解压缩,进入跟setup.py文件同级的目录,打开cmd,使用命令进行下载:

python setup.py install

三、APScheduler组件

APScheduler共有4种组件,分别是:

  • 触发器(trigger),触发器中包含调度逻辑,每个作业都有自己的触发器来决定下次运行时间。除了它们自己初始配置以外,触发器完全是无状态的。
  • 作业存储器(job store),存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中,当作业被保存在一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化,需要说明的是,作业存储器不能共享调度器。
  • 执行器(executor),处理作业的运行,通常通过在作业中提交指定的可调用对象到一个线程或者进程池来进行,当作业完成时,执行器会将通知调度器。
  • 调度器(scheduler),配置作业存储器和执行器可以在调度器中完成。例如添加、修改、移除作业,根据不同的应用场景,可以选择不同的调度器,可选的将在下一小节展示。

各组件简介

调度器

  • BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。
  • BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)。
  • AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
  • GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
  • TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
  • TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
  • QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。

作业存储器

如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)

执行器

对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。

触发器

当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:

  • date 一次性指定日期
  • interval 在某个时间范围内间隔多长时间执行一次
  • cron 和Linux crontab格式兼容,最为强大

四、使用

当你需要调度作业的时候,你需要为这个作业选择一个触发器,用来描述该作业将在何时被触发,APScheduler有3中内置的触发器类型:

  • 新建一个调度器(scheduler)
  • 添加一个调度任务(job store)
  • 运行调度任务

添加任务

有两种方式可以添加一个新的作业:

  • add_job来添加作业
  • 装饰器模式添加作业

指定时间执行任务,只执行一次

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def job2(text):
    print(''job2'', datetime.datetime.now(), text)
scheduler = BlockingScheduler()
scheduler.add_job(job2, ''date'', run_date=datetime.datetime(2020, 2, 25, 19, 5, 6), args=[''text''], id=''job2'')
scheduler.start()

上例中,只在2010-2-25 19:05:06执行一次,args传递一个text参数。

间隔时间执行任务

下面来个简单的例子,作业每个5秒执行一次:

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1():
    print(''job1'', datetime.datetime.now())
scheduler = BlockingScheduler()
scheduler.add_job(job1, ''interval'', seconds=5, id=''job1'')  # 每隔5秒执行一次
scheduler.start()

每天凌晨1点30分50秒执行一次

# 装饰器的方式

from apscheduler.schedulers.blocking import BlockingScheduler  # 后台运行
sc = BlockingScheduler()
f = open(''t1.text'', ''a'', encoding=''utf8'')


@sc.scheduled_job(''cron'', day_of_week=''*'', hour=1, minute=''30'', second=''50'')
def check_db():
    print(111111111111)
if __name__ == ''__main__'':
    try:
        sc.start()
        f.write(''定时任务成功执行'')
    except Exception as e:
        sc.shutdown()
        f.write(''定时任务执行失败'')
    finally:
        f.close()

每几分钟执行一次:

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1():
    print(''job1'', datetime.datetime.now())
scheduler = BlockingScheduler()
# 每隔2分钟执行一次, */1:每隔1分钟执行一次
scheduler.add_job(job1, ''cron'', minute="*/2", id=''job1'') 
scheduler.start()

每小时执行一次:

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1():
    print(''job1'', datetime.datetime.now())
scheduler = BlockingScheduler()
# 每小时执行一次
scheduler.add_job(job1, ''interval'', hours=1, id=''job1'')
# 每小时执行一次,上下浮动120秒区间内
# scheduler.add_job(job1, ''interval'', hours=1, id=''job1'', jitter=120)
scheduler.start()

APScheduler(定时任务—Python库)

APScheduler(定时任务—Python库)

<style> a:link { color: #000000; text-decoration: none; } a:visited { color: #000000; text-decoration: none; } a:hover { color: #999999; text-decoration: none; } </style>

<div id="cnblogs_post_body"> <div> <p>目录</p> <div> <ul> <li><a href="https://www.cnblogs.com/qq834761298/p/12164717.html#apscheduler%E7%AE%80%E4%BB%8B">APScheduler简介</a> <ul> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E6%94%AF%E6%8C%81%E7%9A%84%E5%90%8E%E7%AB%AF%E5%AD%98%E5%82%A8%E4%BD%9C%E4%B8%9A">支持的后端存储作业</a> </li> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E9%9B%86%E6%88%90%E7%9A%84python%E6%A1%86%E6%9E%B6">集成的Python框架</a> </li> </ul> </li> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#apscheduler%E4%B8%8B%E8%BD%BD%E5%AE%89%E8%A3%85">APScheduler下载安装</a> </li> <li><a href="https://www.cnblogs.com/qq834761298/p/12164717.html#apscheduler%E7%BB%84%E4%BB%B6">APScheduler组件</a> <ul> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E5%90%84%E7%BB%84%E4%BB%B6%E7%AE%80%E4%BB%8B">各组件简介</a> <ul> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E8%B0%83%E5%BA%A6%E5%99%A8">调度器</a> </li> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E4%BD%9C%E4%B8%9A%E5%AD%98%E5%82%A8%E5%99%A8">作业存储器</a> </li> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E6%89%A7%E8%A1%8C%E5%99%A8">执行器</a> </li> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E8%A7%A6%E5%8F%91%E5%99%A8">触发器</a> </li> </ul> </li> </ul> </li> <li><a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E4%BD%BF%E7%94%A8">使用</a> <ul> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E6%B7%BB%E5%8A%A0%E4%BD%9C%E4%B8%9A">添加作业</a> </li> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E5%8F%AA%E6%89%A7%E8%A1%8C%E4%B8%80%E6%AC%A1">只执行一次</a> </li> <li> <a href="https://www.cnblogs.com/qq834761298/p/12164717.html#%E9%97%B4%E9%9A%94%E6%89%A7%E8%A1%8C">间隔执行</a> </li> </ul> </li> </ul> </div> </div> <h1 id="apscheduler简介">APScheduler简介 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h1> <p><a href="https://apscheduler.readthedocs.io/en/latest/index.html">APScheduler</a>(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库)。<br> APScheduler有三个内置的调度系统,其中包括:</p> <ul> <li>cron式调度(可选开始/结束时间)</li> <li>基于间隔的执行(以偶数间隔运行作业,也可以选择开始/结束时间)</li> <li>一次性延迟执行任务(在指定的日期/时间内运行作业一次)</li> </ul> <h2 id="支持的后端存储作业">支持的后端存储作业 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h2> <p>APScheduler可以任意混合和匹配调度系统和作业存储的后端,其中支持后端存储作业包括:</p> <ul> <li>Memory</li> <li>SQLAlchemy</li> <li>MongoDB</li> <li>Redis</li> <li>RethinkDB</li> <li>ZooKeeper</li> </ul> <h2 id="集成的python框架">集成的Python框架 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h2> <p>APScheduler内继承了几个常见的Python框架:</p> <ul> <li>asyncio</li> <li>gevent</li> <li>tornado</li> <li>qt</li> </ul> <h1 id="apscheduler下载安装">APScheduler下载安装 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h1> <p>使用pip安装:</p> <pre><code>pip <span>install</span> apscheduler pip <span>install</span> apscheduler==<span>3.6</span>.<span>3</span></code></pre> <p>如果超时或者出现别的情况,可以选择:</p> <pre><code><span># 法1使用豆瓣源下载</span> <span>pip</span> install -i https://pypi.doubanio.com/simple/ apscheduler <span># 法2使用清华源下载</span> pip install -i https://pypi.tuna.tsinghua.edu.cn/simple apscheduler</code></pre> <p>要是再不行,点击<a href="https://files.pythonhosted.org/packages/42/a7/c9569e03058430cef420d3543e8f63bc7b32d77ae03278becae977cf32a7/APScheduler-3.5.3.tar.gz">该链接</a>或者<a href="https://pypi.org/project/APScheduler/#files">pypi官网</a>下载了。下载并解压缩,进入跟setup.py文件同级的目录,打开cmd,使用命令进行下载: </p> <pre><code>python setup.py <span>install</span></code></pre> <h1 id="apscheduler组件">APScheduler组件 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h1> <p>APScheduler共有4种组件,分别是:</p> <ul> <li>触发器(trigger),触发器中包含调度逻辑,每个作业都有自己的触发器来决定下次运行时间。除了它们自己初始配置以外,触发器完全是无状态的。</li> <li>作业存储器(job store),存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中,当作业被保存在一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化,需要说明的是,作业存储器不能共享调度器。 </li> <li>执行器(executor),处理作业的运行,通常通过在作业中提交指定的可调用对象到一个线程或者进程池来进行,当作业完成时,执行器会将通知调度器。</li> <li>调度器(scheduler),配置作业存储器和执行器可以在调度器中完成。例如添加、修改、移除作业,根据不同的应用场景,可以选择不同的调度器,可选的将在下一小节展示。</li> </ul> <h2 id="各组件简介">各组件简介 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h2> <h3 id="调度器">调度器 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h3> <ul> <li>BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。</li> <li>BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)。</li> <li>AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。</li> <li>GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。</li> <li>TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。</li> <li>TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用</li> <li>QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。</li> </ul> <h3 id="作业存储器">作业存储器 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h3> <p> 如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)</p> <h3 id="执行器">执行器 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h3> <p> 对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。</p> <h3 id="触发器">触发器 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h3> <p>当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:</p> <ul> <li>date 一次性指定日期</li> <li>interval 在某个时间范围内间隔多长时间执行一次</li> <li>cron 和Linux crontab格式兼容,最为强大</li> </ul> <h1 id="使用">使用 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h1> <p>当你需要调度作业的时候,你需要为这个作业选择一个触发器,用来描述该作业将在何时被触发,APScheduler有3中内置的触发器类型:</p> <ul> <li>新建一个调度器(scheduler)</li> <li>添加一个调度任务(job store)</li> <li>运行调度任务</li> </ul> <h2 id="添加作业">添加作业 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h2> <p>有两种方式可以添加一个新的作业:</p> <ul> <li>add_job来添加作业</li> <li>装饰器模式添加作业</li> </ul> <h2 id="只执行一次">只执行一次 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h2> <pre><code><span>import</span> datetime <span>from</span> apscheduler.schedulers.blocking <span>import</span> BlockingScheduler <span><span>def</span> <span>job2</span><span>(text)</span>:</span> print(<span>''job2''</span>, datetime.datetime.now(), text) scheduler = BlockingScheduler() scheduler.add_job(job2, <span>''date''</span>, run_date=datetime.datetime(<span>2019</span>, <span>2</span>, <span>25</span>, <span>19</span>, <span>5</span>, <span>6</span>), args=[<span>''text''</span>], id=<span>''job2''</span>) scheduler.start()</code></pre> <p>上例中,只在2010-2-25 19:05:06执行一次,args传递一个text参数。</p> <h2 id="间隔执行">间隔执行 <div><a href="#cnblogs_post_body" target="_self">返回顶部</a></div> </h2> <p>下面来个简单的例子,作业每个5秒执行一次:</p> <pre><code><span>import</span> datetime <span>from</span> apscheduler.schedulers.blocking <span>import</span> BlockingScheduler

<span><span>def</span> <span>job1</span><span>()</span>:</span> print(<span>''job1''</span>, datetime.datetime.now()) scheduler = BlockingScheduler() scheduler.add_job(job1, <span>''interval''</span>, seconds=<span>5</span>, id=<span>''job1''</span>) <span># 每隔5秒执行一次</span> scheduler.start()</code></pre> <p>每天凌晨1点30分50秒执行一次</p> <pre><code><span>from</span> apscheduler.schedulers.blocking <span>import</span> BlockingScheduler <span># 后台运行</span> sc = BlockingScheduler() f = open(<span>''t1.text''</span>, <span>''a''</span>, encoding=<span>''utf8''</span>) <span>@sc.scheduled_job(''cron'', day_of_week=''*'', hour=1, minute=''30'', second=''50'')</span> <span><span>def</span> <span>check_db</span><span>()</span>:</span> print(<span>111111111111</span>) <span>if</span> name == <span>''main''</span>: <span>try</span>: sc.start() f.write(<span>''定时任务成功执行''</span>) <span>except</span> Exception <span>as</span> e: sc.shutdown() f.write(<span>''定时任务执行失败''</span>) <span>finally</span>: f.close()</code></pre> <p>每几分钟执行一次:</p> <pre><code><span>import</span> datetime <span>from</span> apscheduler.schedulers.blocking <span>import</span> BlockingScheduler

<span><span>def</span> <span>job1</span><span>()</span>:</span> print(<span>''job1''</span>, datetime.datetime.now()) scheduler = BlockingScheduler() <span># 每隔2分钟执行一次, /1:每隔1分钟执行一次</span> scheduler.add_job(job1, <span>''cron''</span>, minute=<span>"/2"</span>, id=<span>''job1''</span>) scheduler.start()</code></pre> <p>每小时执行一次:</p> <pre><code><span>import</span> datetime <span>from</span> apscheduler.schedulers.blocking <span>import</span> BlockingScheduler

<span><span>def</span> <span>job1</span><span>()</span>:</span> print(<span>''job1''</span>, datetime.datetime.now()) scheduler = BlockingScheduler() <span># 每小时执行一次</span> scheduler.add_job(job1, <span>''interval''</span>, hours=<span>1</span>, id=<span>''job1''</span>) <span># 每小时执行一次,上下浮动120秒区间内</span> <span># scheduler.add_job(job1, ''interval'', hours=1, id=''job1'', jitter=120)</span> scheduler.start()</code></pre> <hr> <p>see also: <a href="https://www.cnblogs.com/zhaoyingjie/p/9664081.html">APScheduler(Python化的Cron)使用总结 定时任务</a> | <a href="https://www.cnblogs.com/ohyb/p/9084011.html">APScheduler</a> | <a href="https://www.jianshu.com/p/b1d498e370a8">python apscheduler 每两小时执行一次</a> | <a href="https://www.jianshu.com/p/998ca0ee13a3">python apscheduler 每分钟执行</a></p>

</div>

今天关于[转]Python定时任务框架APScheduler的分享就到这里,希望大家有所收获,若想了解更多关于apscheduler 定时任务框架、APScheduler定时任务框架、APScheduler轻量级定时任务框架、APScheduler(定时任务—Python库)等相关知识,可以在本站进行查询。

本文标签: