介紹
官網(wǎng)文檔:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...
APScheduler是一個(gè)python的第三方庫(kù),用來(lái)提供python的后臺(tái)程序。包含四個(gè)組件,分別是:
triggers: 任務(wù)觸發(fā)器組件,提供任務(wù)觸發(fā)方式
job stores: 任務(wù)商店組件,提供任務(wù)保存方式
executors: 任務(wù)調(diào)度組件,提供任務(wù)調(diào)度方式
schedulers: 任務(wù)調(diào)度組件,提供任務(wù)工作方式
安裝pip 安裝
$ pip install apscheduler
源碼安裝
$ python setup.py install簡(jiǎn)單的實(shí)例
from apscheduler.schedulers.blocking import BlockingScheduler import time # 實(shí)例化一個(gè)調(diào)度器 scheduler = BlockingScheduler() def job1(): print "%s: 執(zhí)行任務(wù)" % time.asctime() # 添加任務(wù)并設(shè)置觸發(fā)方式為3s一次 scheduler.add_job(job1, "interval", seconds=3) # 開(kāi)始運(yùn)行調(diào)度器 scheduler.start()
輸出:
$ python first.py Fri Sep 8 20:41:55 2017: 執(zhí)行任務(wù) Fri Sep 8 20:41:58 2017: 執(zhí)行任務(wù) ...各組件功能 trigger組件
trigger提供任務(wù)的觸發(fā)方式,共三種方式:
date:只在某個(gè)時(shí)間點(diǎn)執(zhí)行一次run_date(datetime|str)
scheduler.add_job(my_job, "date", run_date=date(2017, 9, 8), args=[]) scheduler.add_job(my_job, "date", run_date=datetime(2017, 9, 8, 21, 30, 5), args=[]) scheduler.add_job(my_job, "date", run_date="2017-9-08 21:30:05", args=[]) # The "date" trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=[[])
interval: 每隔一段時(shí)間執(zhí)行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, "interval", hours=2) scheduler.add_job(my_job, "interval", hours=2, start_date="2017-9-8 21:30:00", end_date="2018-06-15 21:30:00) @scheduler.scheduled_job("interval", id="my_job_id", hours=2) def my_job(): print("Hello World")
cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
sched.add_job(my_job, "cron", hour=3, minute=30) sched.add_job(my_job, "cron", day_of_week="mon-fri", hour=5, minute=30, end_date="2017-10-30") @sched.scheduled_job("cron", id="my_job_id", day="last sun") def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!")scheduler組件
scheduler組件提供執(zhí)行的方式,在不同的運(yùn)用環(huán)境中選擇合適的方式
BlockingScheduler: 進(jìn)程中只運(yùn)行調(diào)度器時(shí)的方式
from apscheduler.schedulers.blocking import BlockingScheduler import time scheduler = BlockingScheduler() def job1(): print "%s: 執(zhí)行任務(wù)" % time.asctime() scheduler.add_job(job1, "interval", seconds=3) scheduler.start()
BackgroundScheduler: 不想使用任何框架時(shí)的方式
from apscheduler.schedulers.background import BackgroundScheduler import time scheduler = BackgroundScheduler() def job1(): print "%s: 執(zhí)行任務(wù)" % time.asctime() scheduler.add_job(job1, "interval", seconds=3) scheduler.start() while True: pass
AsyncIOScheduler: asyncio module的方式(Python3)
from apscheduler.schedulers.asyncio import AsyncIOScheduler try: import asyncio except ImportError: import trollius as asyncio ... ... # while True:pass try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass
GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler ... ... g = scheduler.start() # while True:pass try: g.join() except (KeyboardInterrupt, SystemExit): pass
TornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop from apscheduler.schedulers.tornado import TornadoScheduler ... ... # while True:pass try: IOLoop.instance().start() except (KeyboardInterrupt, SystemExit): pass
TwistedScheduler: Twisted方式
from twisted.internet import reactor from apscheduler.schedulers.twisted import TwistedScheduler ... ... # while True:pass try: reactor.run() except (KeyboardInterrupt, SystemExit): pass
QtScheduler: Qt方式
executors組件executors組件提供任務(wù)的調(diào)度方式
base
debug
gevent
pool(max_workers=10)
twisted
jobstore組件jobstore提供任務(wù)的各種持久化方式
base
memory
mongodb
scheduler.add_jobstore("mongodb", collection="example_jobs")
redis
scheduler.add_jobstore("redis", jobs_key="example.jobs", run_times_key="example.run_times")
rethinkdb
scheduler.add_jobstore("rethinkdb", database="apscheduler_example")
sqlalchemy
scheduler.add_jobstore("sqlalchemy", url=url)
zookeeper
scheduler.add_jobstore("zookeeper", path="/example_jobs")
刪除任務(wù)remove_job如果使用了任務(wù)的存儲(chǔ),開(kāi)啟時(shí)最好添加replace_existing=True,否則每次開(kāi)啟都會(huì)創(chuàng)建任務(wù)的副本
開(kāi)啟后任務(wù)不會(huì)馬上啟動(dòng),可修改trigger參數(shù)
# 根據(jù)任務(wù)實(shí)例刪除 job = scheduler.add_job(myfunc, "interval", minutes=2) job.remove() # 根據(jù)任務(wù)id刪除 scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id") scheduler.remove_job("my_job_id")任務(wù)的暫停pause_job和繼續(xù)resume_job
job = scheduler.add_job(myfunc, "interval", minutes=2) # 根據(jù)任務(wù)實(shí)例 job.pause() job.resume() # 根據(jù)任務(wù)id暫停 scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id") scheduler.pause_job("my_job_id") scheduler.resume_job("my_job_id")任務(wù)的修飾modify和重設(shè)reschedule_job
修飾:job.modify(max_instances=6, name="Alternate name")
重設(shè):scheduler.reschedule_job("my_job_id", trigger="cron", minute="*/5")
開(kāi)啟 scheduler.start()
關(guān)閉 scheduler.shotdown(wait=True | False)
暫停 scheduler.pause()
繼續(xù) scheduler.resume()
監(jiān)聽(tīng) http://apscheduler.readthedoc...
def my_listener(event): if event.exception: print("The job crashed :(") else: print("The job worked :)") scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)官方實(shí)例
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { "mongo": MongoDBJobStore(), "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite") } executors = { "default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5) } job_defaults = { "coalesce": False, "max_instances": 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/40820.html
摘要:安裝利用進(jìn)行安裝源碼安裝有四種組成部分觸發(fā)器包含調(diào)度邏輯,每一個(gè)作業(yè)有它自己的觸發(fā)器,用于決定接下來(lái)哪一個(gè)作業(yè)會(huì)運(yùn)行。除了他們自己初始配置意外,觸發(fā)器完全是無(wú)狀態(tài)的。 APScheduler簡(jiǎn)介 在平常的工作中幾乎有一半的功能模塊都需要定時(shí)任務(wù)來(lái)推動(dòng),例如項(xiàng)目中有一個(gè)定時(shí)統(tǒng)計(jì)程序,定時(shí)爬出網(wǎng)站的URL程序,定時(shí)檢測(cè)釣魚網(wǎng)站的程序等等,都涉及到了關(guān)于定時(shí)任務(wù)的問(wèn)題,第一時(shí)間想到的是利用t...
摘要:中任務(wù)調(diào)度一般用中的任務(wù)調(diào)度工具也有不少等。調(diào)度器配置示例方式一方式二三略。移除調(diào)用放到,參數(shù)為調(diào)用實(shí)例的方法注意如果任務(wù)已經(jīng)調(diào)度完畢,并且之后也不會(huì)再被執(zhí)行的情況下,會(huì)被自動(dòng)移除??梢员O(jiān)聽(tīng)調(diào)度任務(wù)執(zhí)行情況相關(guān)的事件。 Java中任務(wù)調(diào)度一般用Quartz,Python中的任務(wù)調(diào)度工具也有不少:Celery,RQ,APScheduler等。Celery:非常強(qiáng)大的分布式任務(wù)調(diào)度框架RQ...
摘要:最近公司有項(xiàng)目需要使用到定時(shí)任務(wù),其定時(shí)邏輯類似于的,就使用了這個(gè)類庫(kù)。在一次循環(huán)結(jié)束之前會(huì)計(jì)算任務(wù)下次執(zhí)行事件與當(dāng)前時(shí)間之差,然后讓調(diào)度線程掛起直到那個(gè)時(shí)間到來(lái)。 最近公司有項(xiàng)目需要使用到定時(shí)任務(wù),其定時(shí)邏輯類似于linux的Cron,就使用了Apscheduler這個(gè)類庫(kù)?;诠镜臉I(yè)務(wù),需要修改Apshceduler,故而研究了一下Apscheduler的代碼。 Apschedu...
摘要:項(xiàng)目中需要用到定時(shí)器和循環(huán)執(zhí)行。運(yùn)用線程執(zhí)行輪詢操作,也有運(yùn)用系統(tǒng)的的文章最多,但是太麻煩。和中間人的消息傳輸支持所有特性,但也提供大量其他實(shí)驗(yàn)性方案的支持,包括用進(jìn)行本地開(kāi)發(fā)。同時(shí)也包含了對(duì)任務(wù)的控制。后續(xù)有需求在繼續(xù)。 項(xiàng)目中需要用到定時(shí)器和循環(huán)執(zhí)行。去網(wǎng)上搜了一下,比較常見(jiàn)的有一下集中。運(yùn)用Python線程執(zhí)行輪詢操作,也有運(yùn)用Linux系統(tǒng)的Cron,Celery的文章最多,但...
摘要:今天介紹在中使用定時(shí)任務(wù)的兩種方式。添加并啟動(dòng)定時(shí)任務(wù)其它命令顯示當(dāng)前的定時(shí)任務(wù)刪除所有定時(shí)任務(wù)今天的定時(shí)任務(wù)就說(shuō)到這里,有錯(cuò)誤之處,歡迎交流指正 今天介紹在django中使用定時(shí)任務(wù)的兩種方式。 方式一: APScheduler1)安裝: pip install apscheduler 2)使用: from apscheduler.scheduler import Scheduler...
閱讀 2918·2021-10-19 10:09
閱讀 3136·2021-10-09 09:41
閱讀 3384·2021-09-26 09:47
閱讀 2697·2019-08-30 15:56
閱讀 602·2019-08-29 17:04
閱讀 992·2019-08-26 11:58
閱讀 2511·2019-08-26 11:51
閱讀 3362·2019-08-26 11:29