摘要:文檔中文文檔官方文檔定時(shí)服務(wù)與結(jié)合使用簡(jiǎn)介是一個(gè)自帶電池的的任務(wù)隊(duì)列。追蹤任務(wù)在不同狀態(tài)間的遷移,并檢視返回值。
文檔
中文文檔
官方文檔
celery定時(shí)服務(wù)、celery與django結(jié)合使用
簡(jiǎn)介Celery 是一個(gè)“自帶電池”的的任務(wù)隊(duì)列。它易于使用,所以你可以無視其所解決問題的復(fù)雜程度而輕松入門。它遵照最佳實(shí)踐設(shè)計(jì),所以你的產(chǎn)品可以擴(kuò)展,或與其他語言集成,并且它自帶了在生產(chǎn)環(huán)境中運(yùn)行這樣一個(gè)系統(tǒng)所需的工具和支持。
Celery 的最基礎(chǔ)部分。包括:
選擇和安裝消息傳輸方式(中間人)----broker,如RabbitMQ,redis等。
RabbitMQ的安裝:sudo apt-get install rabbitmq-server
本文使用redis
官方推薦RabbitMQ
當(dāng)然部分nosql也可以
安裝 Celery 并創(chuàng)建第一個(gè)任務(wù)
運(yùn)行職程并調(diào)用任務(wù)。
追蹤任務(wù)在不同狀態(tài)間的遷移,并檢視返回值。
安裝pip install celery簡(jiǎn)單使用 定義任務(wù)
tasks.py
from celery import Celery #第一個(gè)參數(shù)是你的celery名稱 #backen 用于存儲(chǔ)結(jié)果 #broker 用于存儲(chǔ)消息隊(duì)列 app = Celery("tasks",backend="redis://:password@host:port/db", broker="redis://:password@host:port/db") @app.task def add(x, y): return x + y
Celery 的第一個(gè)參數(shù)是當(dāng)前模塊的名稱,這個(gè)參數(shù)是必須的,這樣的話名稱可以自動(dòng)生成。第二個(gè)參數(shù)是中間人關(guān)鍵字參數(shù),指定你所使用的消息中間人的 URL,此處使用了 RabbitMQ,也是默認(rèn)的選項(xiàng)。更多可選的中間人見上面的 選擇中間人 一節(jié)。例如,對(duì)于 RabbitMQ 你可以寫 amqp://localhost ,而對(duì)于 Redis 你可以寫 redis://localhost .
你定義了一個(gè)單一任務(wù),稱為 add ,返回兩個(gè)數(shù)字的和。
啟動(dòng)celery服務(wù)步驟:
啟動(dòng)任務(wù)工作者worker
講任務(wù)放入celery隊(duì)列
worker讀取隊(duì)列,并執(zhí)行任務(wù)
啟動(dòng)一個(gè)工作者,創(chuàng)建一個(gè)任務(wù)隊(duì)列
// -A 指定celery名稱,loglevel制定log級(jí)別,只有大于或等于該級(jí)別才會(huì)輸出到日志文件 celery -A tasks worker --loglevel=info
如果你沒有安裝redis庫,請(qǐng)先pip install redis
使用celery現(xiàn)在我們已經(jīng)有一個(gè)celery隊(duì)列了,我門只需要將工作所需的參數(shù)放入隊(duì)列即可
from tasks import add #調(diào)用任務(wù)會(huì)返回一個(gè) AsyncResult 實(shí)例,可用于檢查任務(wù)的狀態(tài),等待任務(wù)完成或獲取返回值(如果任務(wù)失敗,則為異常和回溯)。 #但這個(gè)功能默認(rèn)是不開啟的,你需要設(shè)置一個(gè) Celery 的結(jié)果后端(即backen,我們?cè)趖asks.py中已經(jīng)設(shè)置了,backen就是用來存儲(chǔ)我們的計(jì)算結(jié)果) result=add.delay(4, 4) #如果任務(wù)已經(jīng)完成 if(result.ready()): #獲取任務(wù)執(zhí)行結(jié)果 print(result.get(timeout=1))
常用接口
tasks.add(4,6) ---> 本地執(zhí)行
tasks.add.delay(3,4) --> worker執(zhí)行
t=tasks.add.delay(3,4) --> t.get() 獲取結(jié)果,或卡住,阻塞
t.ready()---> False:未執(zhí)行完,True:已執(zhí)行完
t.get(propagate=False) 拋出簡(jiǎn)單異常,但程序不會(huì)停止
t.traceback 追蹤完整異常
使用配置使用配置來運(yùn)行,對(duì)于正式項(xiàng)目來說可維護(hù)性更好。配置可以使用app.config.XXXXX_XXX="XXX"的形式如app.conf.CELERY_TASK_SERIALIZER = "json"來進(jìn)行配置
配置資料
配置文件config.py
#broker BROKER_URL = "redis://:password@host:port/db" #backen CELERY_RESULT_BACKEND = "redis://:password@host:port/db" #導(dǎo)入任務(wù),如tasks.py CELERY_IMPORTS = ("tasks", ) #列化任務(wù)載荷的默認(rèn)的序列化方式 CELERY_TASK_SERIALIZER = "json" #結(jié)果序列化方式 CELERY_RESULT_SERIALIZER = "json" CELERY_ACCEPT_CONTENT=["json"] #時(shí)間地區(qū)與形式 CELERY_TIMEZONE = "Europe/Oslo" #時(shí)間是否使用utc形式 CELERY_ENABLE_UTC = True #設(shè)置任務(wù)的優(yōu)先級(jí)或任務(wù)每分鐘最多執(zhí)行次數(shù) CELERY_ROUTES = { # 如果設(shè)置了低優(yōu)先級(jí),則可能很久都沒結(jié)果 #"tasks.add": "low-priority", #"tasks.add": {"rate_limit": "10/m"}, #"tasks.add": {"rate_limit": "10/s"}, #"*": {"rate_limit": "10/s"} } #borker池,默認(rèn)是10 BROKER_POOL_LIMIT = 10 #任務(wù)過期時(shí)間,單位為s,默認(rèn)為一天 CELERY_TASK_RESULT_EXPIRES = 3600 #backen緩存結(jié)果的數(shù)目,默認(rèn)5000 CELERY_MAX_CACHED_RESULTS = 10000開啟服務(wù)
celery.py
from celery import Celery #指定名稱 app = Celery("mycelery") #加載配置模塊 app.config_from_object("config") if __name__=="__main__": app.start()任務(wù)定義
tasks.py
from .celery import app @app.task def add(a, b): return a + b啟動(dòng)
// -l 是 --loglevel的簡(jiǎn)寫 celery -A mycelery worker -l info執(zhí)行/調(diào)用服務(wù)
from tasks import add #調(diào)用任務(wù)會(huì)返回一個(gè) AsyncResult 實(shí)例,可用于檢查任務(wù)的狀態(tài),等待任務(wù)完成或獲取返回值(如果任務(wù)失敗,則為異常和回溯)。 #但這個(gè)功能默認(rèn)是不開啟的,你需要設(shè)置一個(gè) Celery 的結(jié)果后端(即backen,我們?cè)趖asks.py中已經(jīng)設(shè)置了,backen就是用來存儲(chǔ)我們的計(jì)算結(jié)果) result=add.delay(4, 4) #如果任務(wù)已經(jīng)完成 if(result.ready()): #獲取任務(wù)執(zhí)行結(jié)果 print(result.get(timeout = 1))分布式
啟動(dòng)多個(gè)celery worker,這樣即使一個(gè)worker掛掉了其他worker也能繼續(xù)提供服務(wù)
方法一
// 啟動(dòng)三個(gè)worker:w1,w2,w3 celery multi start w1 -A project -l info celery multi start w2 -A project -l info celery multi start w3 -A project -l info // 立即停止w1,w2,即便現(xiàn)在有正在處理的任務(wù) celery multi stop w1 w2 // 重啟w1 celery multi restart w1 -A project -l info // celery multi stopwait w1 w2 w3 # 待任務(wù)執(zhí)行完,停止
方法二
// 啟動(dòng)多個(gè)worker,但是不指定worker名字 // 你可以在同一臺(tái)機(jī)器上運(yùn)行多個(gè)worker,但要為每個(gè)worker指定一個(gè)節(jié)點(diǎn)名字,使用--hostname或-n選項(xiàng) // concurrency指定處理進(jìn)程數(shù),默認(rèn)與cpu數(shù)量相同,因此一般無需指定 $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h錯(cuò)誤處理
celery可以指定在發(fā)生錯(cuò)誤的情況下進(jìn)行自定義的處理
config.py
def my_on_failure(self, exc, task_id, args, kwargs, einfo): print("Oh no! Task failed: {0!r}".format(exc)) // 對(duì)所有類型的任務(wù),當(dāng)發(fā)生執(zhí)行失敗的時(shí)候所執(zhí)行的操作 CELERY_ANNOTATIONS = {"*": {"on_failure": my_on_failure}}
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/44540.html
摘要:是什么是一個(gè)由編寫的簡(jiǎn)單靈活可靠的用來處理大量信息的分布式系統(tǒng)它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具。專注于實(shí)時(shí)任務(wù)處理,支持任務(wù)調(diào)度。說白了,它是一個(gè)分布式隊(duì)列的管理工具,我們可以用提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列。 Celery 是什么? Celery 是一個(gè)由 Python 編寫的簡(jiǎn)單、靈活、可靠的用來處理大量信息的分布式系統(tǒng),它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工...
摘要:我們將窗口切換到的啟動(dòng)窗口,會(huì)看到多了兩條日志這說明任務(wù)已經(jīng)被調(diào)度并執(zhí)行成功。本文標(biāo)題為異步任務(wù)神器簡(jiǎn)明筆記本文鏈接為參考資料使用之美分布式任務(wù)隊(duì)列的介紹思誠之道異步任務(wù)神器簡(jiǎn)明筆記 Celery 在程序的運(yùn)行過程中,我們經(jīng)常會(huì)碰到一些耗時(shí)耗資源的操作,為了避免它們阻塞主程序的運(yùn)行,我們經(jīng)常會(huì)采用多線程或異步任務(wù)。比如,在 Web 開發(fā)中,對(duì)新用戶的注冊(cè),我們通常會(huì)給他發(fā)一封激活郵件,...
摘要:介紹應(yīng)用舉例是一個(gè)基于開發(fā)的分布式異步消息任務(wù)隊(duì)列,通過它可以輕松的實(shí)現(xiàn)任務(wù)的異步處理,如果你的業(yè)務(wù)場(chǎng)景中需要用到異步任務(wù),就可以考慮使用你想對(duì)臺(tái)機(jī)器執(zhí)行一條批量命令,可能會(huì)花很長(zhǎng)時(shí)間,但你不想讓你的程序等著結(jié)果返回,? celery 1.celery介紹 1.1 celery應(yīng)用舉例 Celery 是一個(gè) 基于python開發(fā)的分布式異步消息任務(wù)隊(duì)列,通過...
摘要:今天介紹一下如何在項(xiàng)目中使用搭建一個(gè)有兩個(gè)節(jié)點(diǎn)的任務(wù)隊(duì)列一個(gè)主節(jié)點(diǎn)一個(gè)子節(jié)點(diǎn)主節(jié)點(diǎn)發(fā)布任務(wù),子節(jié)點(diǎn)收到任務(wù)并執(zhí)行。 今天介紹一下如何在django項(xiàng)目中使用celery搭建一個(gè)有兩個(gè)節(jié)點(diǎn)的任務(wù)隊(duì)列(一個(gè)主節(jié)點(diǎn)一個(gè)子節(jié)點(diǎn);主節(jié)點(diǎn)發(fā)布任務(wù),子節(jié)點(diǎn)收到任務(wù)并執(zhí)行。搭建3個(gè)或者以上的節(jié)點(diǎn)就類似了),使用到了celery,rabbitmq。這里不會(huì)單獨(dú)介紹celery和rabbitmq中的知識(shí)了...
閱讀 3897·2021-11-24 11:14
閱讀 3340·2021-11-22 13:53
閱讀 3902·2021-11-11 16:54
閱讀 1593·2021-10-13 09:49
閱讀 1237·2021-10-08 10:05
閱讀 3412·2021-09-22 15:57
閱讀 1767·2021-08-16 11:01
閱讀 983·2019-08-30 15:55