摘要:是一個基于的分布式調(diào)度系統(tǒng),文檔在這最近有個需求想要動態(tài)的添加任務(wù)而不用重啟服務(wù)找了一圈沒找到什么好辦法也有可能是文檔沒看仔細(xì),所以只能自己實(shí)現(xiàn)囉為動態(tài)添加任務(wù),首先我想到的是傳遞一個函數(shù)進(jìn)去,讓某個特定任務(wù)去執(zhí)行這個傳遞過去的函數(shù),就像這
celery是一個基于Python的分布式調(diào)度系統(tǒng),文檔在這 ,最近有個需求,想要動態(tài)的添加任務(wù)而不用重啟celery服務(wù),找了一圈沒找到什么好辦法(也有可能是文檔沒看仔細(xì)),所以只能自己實(shí)現(xiàn)囉
為celery動態(tài)添加任務(wù),首先我想到的是傳遞一個函數(shù)進(jìn)去,讓某個特定任務(wù)去執(zhí)行這個傳遞過去的函數(shù),就像這樣
@app.task def execute(func, *args, **kwargs): return func(*args, **kwargs)
很可惜,會出現(xiàn)這樣的錯誤
kombu.exceptions.EncodeError: Object of type "function" is not JSON serializable
換一種序列化方式
@app.task(serializer="pickle") def execute(func, *args, **kwargs): return func(*args, **kwargs)
結(jié)果又出現(xiàn)一大串錯誤信息
ERROR/MainProcess] Pool callback raised exception: ContentDisallowed("Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)",) Traceback (most recent call last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: "chord" During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: "_payload"
換一種思路
func = import_string(func)
不知道這樣是否可以,結(jié)果測試: No
哎,流年不利.
最后一直測試,一直測試,終于找到了一種辦法,直接上代碼
from importlib import import_module, reload app.conf.CELERY_IMPORTS = ["task", "task.all_task"] def import_string(import_name): import_name = str(import_name).replace(":", ".") modules = import_name.split(".") mod = import_module(modules[0]) for comp in modules[1:]: if not hasattr(mod, comp): reload(mod) mod = getattr(mod, comp) return mod @app.task def execute(func, *args, **kwargs): func = import_string(func) return func(*args, **kwargs)
項(xiàng)目結(jié)構(gòu)是這樣的
├── celery_app.py
├── config.py
├── task
│?? ├── all_task.py
│?? ├── __init__.py
注意: 任務(wù)必須大于等于兩層目錄
以后每次添加任務(wù)都可以先添加到all_task.py里,調(diào)用時不用再重啟celery服務(wù)
# task/all_task.py def ee(c, d): return c, d, "你好" # example from celery_app import execute execute.delay("task.all_task.ee", 2, 444)
ok,另外發(fā)現(xiàn)celery也支持任務(wù)定時調(diào)用,就像這樣
execute.apply_async(args=["task.all_task.aa"], eta=datetime(2017, 7, 9, 8, 12, 0))
簡單實(shí)現(xiàn)一個任務(wù)重復(fù)調(diào)用的功能
@app.task def interval(func, seconds, args=(), task_id=None): next_run_time = current_time() + timedelta(seconds=seconds) kwargs = dict(args=(func, seconds, args), eta=next_run_time) if task_id is not None: kwargs.update(task_id=task_id) interval.apply_async(**kwargs) func = import_string(func) return func(*args)
大概意思就是先計算下次運(yùn)行的時間,然后把任務(wù)添加到celery隊(duì)列里,這里有個task_id有些問題,因?yàn)榧僭O(shè)添加了每隔3s執(zhí)行一個任務(wù),
它的task_id默認(rèn)會使用uuid生成,如果想要再移除這個任務(wù)就不太方便,自定task_id可能會好一些,另外也許需要判斷task_id是否存在
AsyncResult(task_id).state
ok,再獻(xiàn)上一個好用的函數(shù)
from inspect import getmembers, isfunction def get_tasks(module="task"): return [{ "name": "task:{}".format(f[1].__name__), "doc": f[1].__doc__, } for f in getmembers(import_module(module), isfunction)]
就這樣.
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38697.html
摘要:所以這就現(xiàn)實(shí)了在中使用的應(yīng)用上下文。要引入請求上下文,需要考慮這兩個問題如何在中產(chǎn)生請求上下文。中有和可以產(chǎn)生請求上下文。具體的思路還是在中重載類,通過,在的上下文環(huán)境下執(zhí)行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數(shù)的使用情況。 其實(shí)我只是想把郵件發(fā)送這個動作移到Celery中執(zhí)行。既然用到了Celery,那么每次發(fā)郵件都單獨(dú)開一個線程似乎有點(diǎn)多余,異步任務(wù)還是交給Celery吧...
摘要:使用異步框架,例如等等,裝飾異步任務(wù)。它是一個專注于實(shí)時處理的任務(wù)隊(duì)列,同時也支持任務(wù)調(diào)度。不存儲任務(wù)狀態(tài)。標(biāo)識要使用的默認(rèn)序列化方法的字符串。指定該任務(wù)的結(jié)果存儲后端用于此任務(wù)。 概述: ????????我們考慮一個場景,公司有一個需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...
摘要:的簡介是一個基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專注于實(shí)時處理,同時也支持任務(wù)調(diào)度。目前支持等作為消息代理,但適用于生產(chǎn)環(huán)境的只有和官方推薦。任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢。 celery的簡介 ??celery是一個基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專注于實(shí)時處理,同時也支持任務(wù)調(diào)度。它的執(zhí)行單元為任務(wù)(task),利用多線程,如Eventlet,gevent等,它們能被...
閱讀 2800·2021-09-01 10:30
閱讀 1690·2019-08-30 15:52
閱讀 979·2019-08-29 18:40
閱讀 1134·2019-08-28 18:30
閱讀 2405·2019-08-23 17:19
閱讀 1333·2019-08-23 16:25
閱讀 2711·2019-08-23 16:18
閱讀 2988·2019-08-23 13:53