成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

在Celery中使用Flask的上下文

Sourcelink / 1114人閱讀

摘要:所以這就現(xiàn)實了在中使用的應用上下文。要引入請求上下文,需要考慮這兩個問題如何在中產(chǎn)生請求上下文。中有和可以產(chǎn)生請求上下文。具體的思路還是在中重載類,通過,在的上下文環(huán)境下執(zhí)行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數(shù)的使用情況。

其實我只是想把郵件發(fā)送這個動作移到Celery中執(zhí)行。
既然用到了Celery,那么每次發(fā)郵件都多帶帶開一個線程似乎有點多余,異步任務還是交給Celery吧。

在Flask應用中集成Celery

Celery和Flask一起使用并沒有什么不和諧的地方,都可以不用定制的Flask擴展,按照網(wǎng)上隨處可見的示例也很簡單:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0"

celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)

@celery.task
def send_email():
    ....

然而,稍微上點規(guī)模的Flask應用都會使用Factory模式(中文叫工廠函數(shù),我聽著特別扭),即只有在創(chuàng)建Flask實例時,才會初始化各種擴展,這樣可以動態(tài)的修改擴展程序的配置。比如你有一套線上部署的配置和一套本地開發(fā)測試的配置,希望通過不同的啟動入口,就使用不同的配置。
使用Factory模式的話,上面的代碼大概要修改成這個樣:

from flask import Flask
from celery import Celery

app = Flask(__name__)
celery = Celery()

def create_app(config_name):
    app.config.from_object(config[config_name])
    celery.conf.update(app.config)

通過config_name,來動態(tài)調(diào)整celery的配置。然而,這樣子是不行的!
Celery的__init__()函數(shù)會調(diào)用celery._state._register_app()直接就通過傳入的配置生成了Celery實例,上面的代碼中,celery = Celery()直接使用默認的amqp作為了broker,隨后通過celery.conf.update(app.config)是更改不了broker的。這也就是為什么網(wǎng)上的示例代碼中,在定義Celery實例時,就傳入了broker=app.config["CELERY_BROKER_URL"],而不是之后通過celery.conf.update(app.config)傳入。當你的多套配置文件中,broker設(shè)置的不同時,就悲劇了。

當然不用自己造輪子,F(xiàn)lask-Celery-Helper就是解決以上問題的FLask擴展。
看看它的__init__()函數(shù):

def __init__(self, app=None):
        """If app argument provided then initialize celery using application config values.
        If no app argument provided you should do initialization later with init_app method.
        :param app: Flask application instance.
        """
        self.original_register_app = _state._register_app  # Backup Celery app registration function.
        _state._register_app = lambda _: None  # Upon Celery app registration attempt, do nothing.
        super(Celery, self).__init__()
        if app is not None:
            self.init_app(app)

_state._register_app函數(shù)備份,再置為空。這樣__init__()就不會創(chuàng)建Celery實例了。但如果指定了app,那么進入init_app,嗯,大多數(shù)Flask擴展都有這個函數(shù),用來動態(tài)生成擴展實例。

def init_app(self, app):
        """Actual method to read celery settings from app configuration and initialize the celery instance.
        :param app: Flask application instance.
        """
        _state._register_app = self.original_register_app  # Restore Celery app registration function.
        if not hasattr(app, "extensions"):
            app.extensions = dict()
        if "celery" in app.extensions:
            raise ValueError("Already registered extension CELERY.")
        app.extensions["celery"] = _CeleryState(self, app)

        # Instantiate celery and read config.
        super(Celery, self).__init__(app.import_name, broker=app.config["CELERY_BROKER_URL"])
        ...

_state._register_app函數(shù)還原,再執(zhí)行Celery原本的__init__。這樣就達到動態(tài)生成實例的目的了。接著往下看:

task_base = self.Task

# Add Flask app context to celery instance.
class ContextTask(task_base):
    """Celery instance wrapped within the Flask app context."""
    def __call__(self, *_args, **_kwargs):
        with app.app_context():
            return task_base.__call__(self, *_args, **_kwargs)
setattr(ContextTask, "abstract", True)
setattr(self, "Task", ContextTask)

這里重載了celery.Task類,通過with app.app_context():,在app.app_context()的上下文環(huán)境下執(zhí)行Task。對于一個已生成的Flask實例,應用上下文不會隨便改變。所以這就現(xiàn)實了在Celery中使用Flask的應用上下文。
下面是官方的示例代碼:

# extensions.py
from flask_celery import Celery
celery = Celery()

# application.py
from flask import Flask
from extensions import celery

def create_app():
    app = Flask(__name__)
    app.config["CELERY_IMPORTS"] = ("tasks.add_together", )
    app.config["CELERY_BROKER_URL"] = "redis://localhost"
    app.config["CELERY_RESULT_BACKEND"] = "redis://localhost"
    celery.init_app(app)
    return app
    
# tasks.py
from extensions import celery

@celery.task()
def add_together(a, b):
    return a + b

# manage.py
from application import create_app
app = create_app()
app.run()

跟普通的Flask擴展一樣了。

Celery中使用Flask上下文

在Flask的view函數(shù)中調(diào)用task.delay()時,這個task相當于一個離線的異步任務,它對Flask的應用上下文和請求上下文一無所知。但是這都可能是異步任務需要用到的。比如發(fā)送郵件要用到的render_templateurl_for就分別要用到應用上下文和請求上下文。不在celery中引入它們的話,就是Running code outside of a request。
引入應用上下文的工作Flask-Celery-Helper已經(jīng)幫我們做好了,在Flask的文檔中也有相關(guān)介紹。實現(xiàn)方法和上面Flask-Celery-Helper的一樣。然而,不管是Flask-Celery-Helper還是Flask文檔,都沒有提及如何在Celery中使用請求上下文。

要引入請求上下文,需要考慮這兩個問題:

如何在Celery中產(chǎn)生請求上下文。Flask中有request_contexttest_request_context可以產(chǎn)生請求上下文。區(qū)別是request_context需要WSGI環(huán)境變量environ,而test_request_context根據(jù)傳入的參數(shù)生成請求上下文。我沒有找到如何在Celery中獲取到WSGI環(huán)境變量的方法,所以只能自己傳入相關(guān)參數(shù)生成請求上下文了。

請求上下文是隨HTTP請求產(chǎn)生的,要獲取請求上下文,就必須在view函數(shù)中處理,view函數(shù)通過task.delay()發(fā)送Celery任務。所以需要重載task.delay(),以獲取請求上下文。

具體的思路還是在init_app中重載celery.Task類,通過with app.test_request_context():,在app.test_request_context()的上下文環(huán)境下執(zhí)行Task。
首先獲取request,從中整理出test_request_context()需要的參數(shù)。根據(jù)test_request_context的函數(shù)注釋,它需要的參數(shù)和werkzeug.test.EnvironBuilder類的參數(shù)一樣。

CONTEXT_ARG_NAME = "_flask_request_context"
def _include_request_context(self, kwargs):
    """Includes all the information about current Flask request context
    as an additional argument to the task.
    """
    if not has_request_context():
        return

    # keys correspond to arguments of :meth:`Flask.test_request_context`
    context = {
        "path": request.path,
        "base_url": request.url_root,
        "method": request.method,
        "headers": dict(request.headers),
        "data": request.form
    }
    if "?" in request.url:
        context["query_string"] = request.url[(request.url.find("?") + 1):]

    kwargs[self.CONTEXT_ARG_NAME] = context

_include_request_context函數(shù)從request中提取path,base_url,method,headers,data,query_string。將他們傳入test_request_context,生成偽造的請求上下文可以覆蓋大多數(shù)的使用情況。
Celery通過apply_async,apply,retry調(diào)用異步任務(delayapply_async的簡化方法)。這里需要重載它們,讓這些函數(shù)獲取request:

def apply_async(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).apply_async(args, kwargs, **rest)

def apply(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).apply(args, kwargs, **rest)

def retry(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).retry(args, kwargs, **rest)

最后重載celery.Task__call__方法:

def __call__(self, *args, **kwargs):
    """Execute task code with given arguments."""
    call = lambda: super(ContextTask, self).__call__(*args, **kwargs)

    context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
    if context is None or has_request_context():
        return call()

    with app.test_request_context(**context):
        result = call()

        # process a fake "Response" so that
        # ``@after_request`` hooks are executed
        app.process_response(make_response(result or ""))

    return result

context是我們從request中獲取的參數(shù),將它傳給test_request_context,偽造請求上下文,并在這個上下文環(huán)境中執(zhí)行task。既然偽造了請求,那也得為這個假請求生成響應,萬一你定義了after_request這個在響應后執(zhí)行的鉤子呢?通過process_response就可以激活after_request。
注意這里并沒有傳入應用上下文,因為Flask在創(chuàng)建請求上下文時,會判斷應用上下文是否為空,為空就先創(chuàng)建應用上下文,再創(chuàng)建請求上下文。

完整代碼在這里。
celery = CeleryWithContext()創(chuàng)建的Celery實例就可以給各種task使用了。
另外創(chuàng)建一個celery_worker.py文件,生成一個Flask實例,供Celery的worker使用。

# celery_worker.py

#!/usr/bin/env python
from app import create_app
from app.extensions import celery

app = create_app()

啟動worker:celery -A celery_worker.celery worker -l info
這下就可以使用Celery發(fā)郵件了。唉,還真是麻煩。

reference

http://xion.io/post/code/celery-include-flask-request-context.html

博客地址

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38572.html

相關(guān)文章

  • 基于websocketcelery任務狀態(tài)監(jiān)控

    摘要:目的曾經(jīng)想向前臺實時返回任務的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個小的,實現(xiàn)前臺實時獲取后臺傳輸?shù)娜蝿諣顟B(tài)。實現(xiàn)仿照其他例子實現(xiàn)了一個簡單的后臺任務監(jiān)控。 1. 目的曾經(jīng)想向前臺實時返回Celery任務的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個小的demo,實現(xiàn)前臺實時獲取后...

    microelec 評論0 收藏0
  • Flask+Celery+Redis實現(xiàn)隊列化異步任務

    摘要:使用異步框架,例如等等,裝飾異步任務。它是一個專注于實時處理的任務隊列,同時也支持任務調(diào)度。不存儲任務狀態(tài)。標識要使用的默認序列化方法的字符串。指定該任務的結(jié)果存儲后端用于此任務。 概述: ????????我們考慮一個場景,公司有一個需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...

    Ali_ 評論0 收藏0
  • 基于Flask-Angular項目組網(wǎng)架構(gòu)與部署

    摘要:基于網(wǎng),分享項目的組網(wǎng)架構(gòu)和部署。項目組網(wǎng)架構(gòu)架構(gòu)說明流項目訪問分為兩個流,通過分兩個端口暴露給外部使用數(shù)據(jù)流用戶訪問網(wǎng)站。通過進行配置,使用作為異步隊列來存儲任務,并將處理結(jié)果存儲在中。 基于Raindrop網(wǎng),分享項目的組網(wǎng)架構(gòu)和部署。 項目組網(wǎng)架構(gòu) showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...

    kelvinlee 評論0 收藏0
  • Flask 下載文名文件

    摘要:解決辦法如下測試表格我們從引入,首先對文件名進行編碼,然后中作為的參數(shù),這時候能成功下載文件,但是文件名是編碼后的名字,要解碼的話,我們需要在里面聲明編碼格式,即這樣的話,對文件名進行解碼,我們的文件名就是中文了。 在寫 flask 后端的時候,特別是在做數(shù)據(jù)相關(guān)的操作的時候,產(chǎn)品往往需要我們做一個導出數(shù)據(jù)的需求,一般都是導出 excel 格式的文件。 那在 flask 上,如何實現(xiàn)請...

    harriszh 評論0 收藏0
  • 手把手教你如何用Crawlab構(gòu)建技術(shù)文章聚合平臺(一)

    摘要:本文將介紹如何使用和抓取主流的技術(shù)博客文章,然后用搭建一個小型的技術(shù)文章聚合平臺。是谷歌開源的基于和的自動化測試工具,可以很方便的讓程序模擬用戶的操作,對瀏覽器進行程序化控制。相對于,是新的開源項目,而且是谷歌開發(fā),可以使用很多新的特性。 背景 說到爬蟲,大多數(shù)程序員想到的是scrapy這樣受人歡迎的框架。scrapy的確不錯,而且有很強大的生態(tài)圈,有g(shù)erapy等優(yōu)秀的可視化界面。但...

    LinkedME2016 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<