成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

celery動態(tài)添加任務(wù)

yanbingyun1990 / 3475人閱讀

摘要:是一個基于的分布式調(diào)度系統(tǒng),文檔在這最近有個需求想要動態(tài)的添加任務(wù)而不用重啟服務(wù)找了一圈沒找到什么好辦法也有可能是文檔沒看仔細(xì),所以只能自己實(shí)現(xiàn)囉為動態(tài)添加任務(wù),首先我想到的是傳遞一個函數(shù)進(jìn)去,讓某個特定任務(wù)去執(zhí)行這個傳遞過去的函數(shù),就像這

celery是一個基于Python的分布式調(diào)度系統(tǒng),文檔在這 ,最近有個需求,想要動態(tài)的添加任務(wù)而不用重啟celery服務(wù),找了一圈沒找到什么好辦法(也有可能是文檔沒看仔細(xì)),所以只能自己實(shí)現(xiàn)囉

為celery動態(tài)添加任務(wù),首先我想到的是傳遞一個函數(shù)進(jìn)去,讓某個特定任務(wù)去執(zhí)行這個傳遞過去的函數(shù),就像這樣

@app.task
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

很可惜,會出現(xiàn)這樣的錯誤

kombu.exceptions.EncodeError: Object of type "function" is not JSON serializable

換一種序列化方式

@app.task(serializer="pickle")
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

結(jié)果又出現(xiàn)一大串錯誤信息

ERROR/MainProcess] Pool callback raised exception: ContentDisallowed("Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)",)
Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: "chord"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: "_payload"

換一種思路

func = import_string(func)

不知道這樣是否可以,結(jié)果測試: No

哎,流年不利.

最后一直測試,一直測試,終于找到了一種辦法,直接上代碼

from importlib import import_module, reload

app.conf.CELERY_IMPORTS = ["task", "task.all_task"]

def import_string(import_name):
    import_name = str(import_name).replace(":", ".")
    modules = import_name.split(".")
    mod = import_module(modules[0])
    for comp in modules[1:]:
        if not hasattr(mod, comp):
            reload(mod)
        mod = getattr(mod, comp)
    return mod

@app.task
def execute(func, *args, **kwargs):
    func = import_string(func)
    return func(*args, **kwargs)

項(xiàng)目結(jié)構(gòu)是這樣的

├── celery_app.py
├── config.py
├── task
│?? ├── all_task.py
│?? ├── __init__.py

注意: 任務(wù)必須大于等于兩層目錄

以后每次添加任務(wù)都可以先添加到all_task.py里,調(diào)用時不用再重啟celery服務(wù)

# task/all_task.py

def ee(c, d):
    return c, d, "你好"

# example
from celery_app import execute

execute.delay("task.all_task.ee", 2, 444)

ok,另外發(fā)現(xiàn)celery也支持任務(wù)定時調(diào)用,就像這樣

execute.apply_async(args=["task.all_task.aa"], eta=datetime(2017, 7, 9, 8, 12, 0))

簡單實(shí)現(xiàn)一個任務(wù)重復(fù)調(diào)用的功能

@app.task
def interval(func, seconds, args=(), task_id=None):
    next_run_time = current_time() + timedelta(seconds=seconds)
    kwargs = dict(args=(func, seconds, args), eta=next_run_time)
    if task_id is not None:
        kwargs.update(task_id=task_id)
    interval.apply_async(**kwargs)
    func = import_string(func)
    return func(*args)

大概意思就是先計算下次運(yùn)行的時間,然后把任務(wù)添加到celery隊(duì)列里,這里有個task_id有些問題,因?yàn)榧僭O(shè)添加了每隔3s執(zhí)行一個任務(wù),
它的task_id默認(rèn)會使用uuid生成,如果想要再移除這個任務(wù)就不太方便,自定task_id可能會好一些,另外也許需要判斷task_id是否存在

AsyncResult(task_id).state

ok,再獻(xiàn)上一個好用的函數(shù)

from inspect import getmembers, isfunction

def get_tasks(module="task"):
    return [{
        "name": "task:{}".format(f[1].__name__),
        "doc": f[1].__doc__,
    } for f in getmembers(import_module(module), isfunction)]

就這樣.

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38697.html

相關(guān)文章

  • Celery中使用Flask的上下文

    摘要:所以這就現(xiàn)實(shí)了在中使用的應(yīng)用上下文。要引入請求上下文,需要考慮這兩個問題如何在中產(chǎn)生請求上下文。中有和可以產(chǎn)生請求上下文。具體的思路還是在中重載類,通過,在的上下文環(huán)境下執(zhí)行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數(shù)的使用情況。 其實(shí)我只是想把郵件發(fā)送這個動作移到Celery中執(zhí)行。既然用到了Celery,那么每次發(fā)郵件都單獨(dú)開一個線程似乎有點(diǎn)多余,異步任務(wù)還是交給Celery吧...

    Sourcelink 評論0 收藏0
  • Flask+Celery+Redis實(shí)現(xiàn)隊(duì)列化異步任務(wù)

    摘要:使用異步框架,例如等等,裝飾異步任務(wù)。它是一個專注于實(shí)時處理的任務(wù)隊(duì)列,同時也支持任務(wù)調(diào)度。不存儲任務(wù)狀態(tài)。標(biāo)識要使用的默認(rèn)序列化方法的字符串。指定該任務(wù)的結(jié)果存儲后端用于此任務(wù)。 概述: ????????我們考慮一個場景,公司有一個需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...

    Ali_ 評論0 收藏0
  • Python之celery的簡介與使用

    摘要:的簡介是一個基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專注于實(shí)時處理,同時也支持任務(wù)調(diào)度。目前支持等作為消息代理,但適用于生產(chǎn)環(huán)境的只有和官方推薦。任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢。 celery的簡介 ??celery是一個基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專注于實(shí)時處理,同時也支持任務(wù)調(diào)度。它的執(zhí)行單元為任務(wù)(task),利用多線程,如Eventlet,gevent等,它們能被...

    LeexMuller 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<