摘要:所以這就現(xiàn)實了在中使用的應用上下文。要引入請求上下文,需要考慮這兩個問題如何在中產(chǎn)生請求上下文。中有和可以產(chǎn)生請求上下文。具體的思路還是在中重載類,通過,在的上下文環(huán)境下執(zhí)行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數(shù)的使用情況。
其實我只是想把郵件發(fā)送這個動作移到Celery中執(zhí)行。
既然用到了Celery,那么每次發(fā)郵件都多帶帶開一個線程似乎有點多余,異步任務還是交給Celery吧。
Celery和Flask一起使用并沒有什么不和諧的地方,都可以不用定制的Flask擴展,按照網(wǎng)上隨處可見的示例也很簡單:
from flask import Flask from celery import Celery app = Flask(__name__) app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0" celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"]) celery.conf.update(app.config) @celery.task def send_email(): ....
然而,稍微上點規(guī)模的Flask應用都會使用Factory模式(中文叫工廠函數(shù),我聽著特別扭),即只有在創(chuàng)建Flask實例時,才會初始化各種擴展,這樣可以動態(tài)的修改擴展程序的配置。比如你有一套線上部署的配置和一套本地開發(fā)測試的配置,希望通過不同的啟動入口,就使用不同的配置。
使用Factory模式的話,上面的代碼大概要修改成這個樣:
from flask import Flask from celery import Celery app = Flask(__name__) celery = Celery() def create_app(config_name): app.config.from_object(config[config_name]) celery.conf.update(app.config)
通過config_name,來動態(tài)調(diào)整celery的配置。然而,這樣子是不行的!
Celery的__init__()函數(shù)會調(diào)用celery._state._register_app()直接就通過傳入的配置生成了Celery實例,上面的代碼中,celery = Celery()直接使用默認的amqp作為了broker,隨后通過celery.conf.update(app.config)是更改不了broker的。這也就是為什么網(wǎng)上的示例代碼中,在定義Celery實例時,就傳入了broker=app.config["CELERY_BROKER_URL"],而不是之后通過celery.conf.update(app.config)傳入。當你的多套配置文件中,broker設(shè)置的不同時,就悲劇了。
當然不用自己造輪子,F(xiàn)lask-Celery-Helper就是解決以上問題的FLask擴展。
看看它的__init__()函數(shù):
def __init__(self, app=None): """If app argument provided then initialize celery using application config values. If no app argument provided you should do initialization later with init_app method. :param app: Flask application instance. """ self.original_register_app = _state._register_app # Backup Celery app registration function. _state._register_app = lambda _: None # Upon Celery app registration attempt, do nothing. super(Celery, self).__init__() if app is not None: self.init_app(app)
將_state._register_app函數(shù)備份,再置為空。這樣__init__()就不會創(chuàng)建Celery實例了。但如果指定了app,那么進入init_app,嗯,大多數(shù)Flask擴展都有這個函數(shù),用來動態(tài)生成擴展實例。
def init_app(self, app): """Actual method to read celery settings from app configuration and initialize the celery instance. :param app: Flask application instance. """ _state._register_app = self.original_register_app # Restore Celery app registration function. if not hasattr(app, "extensions"): app.extensions = dict() if "celery" in app.extensions: raise ValueError("Already registered extension CELERY.") app.extensions["celery"] = _CeleryState(self, app) # Instantiate celery and read config. super(Celery, self).__init__(app.import_name, broker=app.config["CELERY_BROKER_URL"]) ...
將_state._register_app函數(shù)還原,再執(zhí)行Celery原本的__init__。這樣就達到動態(tài)生成實例的目的了。接著往下看:
task_base = self.Task # Add Flask app context to celery instance. class ContextTask(task_base): """Celery instance wrapped within the Flask app context.""" def __call__(self, *_args, **_kwargs): with app.app_context(): return task_base.__call__(self, *_args, **_kwargs) setattr(ContextTask, "abstract", True) setattr(self, "Task", ContextTask)
這里重載了celery.Task類,通過with app.app_context():,在app.app_context()的上下文環(huán)境下執(zhí)行Task。對于一個已生成的Flask實例,應用上下文不會隨便改變。所以這就現(xiàn)實了在Celery中使用Flask的應用上下文。
下面是官方的示例代碼:
# extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config["CELERY_IMPORTS"] = ("tasks.add_together", ) app.config["CELERY_BROKER_URL"] = "redis://localhost" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost" celery.init_app(app) return app # tasks.py from extensions import celery @celery.task() def add_together(a, b): return a + b # manage.py from application import create_app app = create_app() app.run()
跟普通的Flask擴展一樣了。
Celery中使用Flask上下文在Flask的view函數(shù)中調(diào)用task.delay()時,這個task相當于一個離線的異步任務,它對Flask的應用上下文和請求上下文一無所知。但是這都可能是異步任務需要用到的。比如發(fā)送郵件要用到的render_template和url_for就分別要用到應用上下文和請求上下文。不在celery中引入它們的話,就是Running code outside of a request。
引入應用上下文的工作Flask-Celery-Helper已經(jīng)幫我們做好了,在Flask的文檔中也有相關(guān)介紹。實現(xiàn)方法和上面Flask-Celery-Helper的一樣。然而,不管是Flask-Celery-Helper還是Flask文檔,都沒有提及如何在Celery中使用請求上下文。
要引入請求上下文,需要考慮這兩個問題:
如何在Celery中產(chǎn)生請求上下文。Flask中有request_context和test_request_context可以產(chǎn)生請求上下文。區(qū)別是request_context需要WSGI環(huán)境變量environ,而test_request_context根據(jù)傳入的參數(shù)生成請求上下文。我沒有找到如何在Celery中獲取到WSGI環(huán)境變量的方法,所以只能自己傳入相關(guān)參數(shù)生成請求上下文了。
請求上下文是隨HTTP請求產(chǎn)生的,要獲取請求上下文,就必須在view函數(shù)中處理,view函數(shù)通過task.delay()發(fā)送Celery任務。所以需要重載task.delay(),以獲取請求上下文。
具體的思路還是在init_app中重載celery.Task類,通過with app.test_request_context():,在app.test_request_context()的上下文環(huán)境下執(zhí)行Task。
首先獲取request,從中整理出test_request_context()需要的參數(shù)。根據(jù)test_request_context的函數(shù)注釋,它需要的參數(shù)和werkzeug.test.EnvironBuilder類的參數(shù)一樣。
CONTEXT_ARG_NAME = "_flask_request_context" def _include_request_context(self, kwargs): """Includes all the information about current Flask request context as an additional argument to the task. """ if not has_request_context(): return # keys correspond to arguments of :meth:`Flask.test_request_context` context = { "path": request.path, "base_url": request.url_root, "method": request.method, "headers": dict(request.headers), "data": request.form } if "?" in request.url: context["query_string"] = request.url[(request.url.find("?") + 1):] kwargs[self.CONTEXT_ARG_NAME] = context
_include_request_context函數(shù)從request中提取path,base_url,method,headers,data,query_string。將他們傳入test_request_context,生成偽造的請求上下文可以覆蓋大多數(shù)的使用情況。
Celery通過apply_async,apply,retry調(diào)用異步任務(delay是apply_async的簡化方法)。這里需要重載它們,讓這些函數(shù)獲取request:
def apply_async(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply_async(args, kwargs, **rest) def apply(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply(args, kwargs, **rest) def retry(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).retry(args, kwargs, **rest)
最后重載celery.Task的__call__方法:
def __call__(self, *args, **kwargs): """Execute task code with given arguments.""" call = lambda: super(ContextTask, self).__call__(*args, **kwargs) context = kwargs.pop(self.CONTEXT_ARG_NAME, None) if context is None or has_request_context(): return call() with app.test_request_context(**context): result = call() # process a fake "Response" so that # ``@after_request`` hooks are executed app.process_response(make_response(result or "")) return result
context是我們從request中獲取的參數(shù),將它傳給test_request_context,偽造請求上下文,并在這個上下文環(huán)境中執(zhí)行task。既然偽造了請求,那也得為這個假請求生成響應,萬一你定義了after_request這個在響應后執(zhí)行的鉤子呢?通過process_response就可以激活after_request。
注意這里并沒有傳入應用上下文,因為Flask在創(chuàng)建請求上下文時,會判斷應用上下文是否為空,為空就先創(chuàng)建應用上下文,再創(chuàng)建請求上下文。
完整代碼在這里。
celery = CeleryWithContext()創(chuàng)建的Celery實例就可以給各種task使用了。
另外創(chuàng)建一個celery_worker.py文件,生成一個Flask實例,供Celery的worker使用。
# celery_worker.py #!/usr/bin/env python from app import create_app from app.extensions import celery app = create_app()
啟動worker:celery -A celery_worker.celery worker -l info
這下就可以使用Celery發(fā)郵件了。唉,還真是麻煩。
http://xion.io/post/code/celery-include-flask-request-context.html
博客地址
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38572.html
摘要:目的曾經(jīng)想向前臺實時返回任務的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個小的,實現(xiàn)前臺實時獲取后臺傳輸?shù)娜蝿諣顟B(tài)。實現(xiàn)仿照其他例子實現(xiàn)了一個簡單的后臺任務監(jiān)控。 1. 目的曾經(jīng)想向前臺實時返回Celery任務的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個小的demo,實現(xiàn)前臺實時獲取后...
摘要:使用異步框架,例如等等,裝飾異步任務。它是一個專注于實時處理的任務隊列,同時也支持任務調(diào)度。不存儲任務狀態(tài)。標識要使用的默認序列化方法的字符串。指定該任務的結(jié)果存儲后端用于此任務。 概述: ????????我們考慮一個場景,公司有一個需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...
摘要:基于網(wǎng),分享項目的組網(wǎng)架構(gòu)和部署。項目組網(wǎng)架構(gòu)架構(gòu)說明流項目訪問分為兩個流,通過分兩個端口暴露給外部使用數(shù)據(jù)流用戶訪問網(wǎng)站。通過進行配置,使用作為異步隊列來存儲任務,并將處理結(jié)果存儲在中。 基于Raindrop網(wǎng),分享項目的組網(wǎng)架構(gòu)和部署。 項目組網(wǎng)架構(gòu) showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...
摘要:解決辦法如下測試表格我們從引入,首先對文件名進行編碼,然后中作為的參數(shù),這時候能成功下載文件,但是文件名是編碼后的名字,要解碼的話,我們需要在里面聲明編碼格式,即這樣的話,對文件名進行解碼,我們的文件名就是中文了。 在寫 flask 后端的時候,特別是在做數(shù)據(jù)相關(guān)的操作的時候,產(chǎn)品往往需要我們做一個導出數(shù)據(jù)的需求,一般都是導出 excel 格式的文件。 那在 flask 上,如何實現(xiàn)請...
摘要:本文將介紹如何使用和抓取主流的技術(shù)博客文章,然后用搭建一個小型的技術(shù)文章聚合平臺。是谷歌開源的基于和的自動化測試工具,可以很方便的讓程序模擬用戶的操作,對瀏覽器進行程序化控制。相對于,是新的開源項目,而且是谷歌開發(fā),可以使用很多新的特性。 背景 說到爬蟲,大多數(shù)程序員想到的是scrapy這樣受人歡迎的框架。scrapy的確不錯,而且有很強大的生態(tài)圈,有g(shù)erapy等優(yōu)秀的可視化界面。但...
閱讀 3760·2021-10-15 09:42
閱讀 2630·2021-09-03 10:50
閱讀 1678·2021-09-03 10:28
閱讀 1808·2019-08-30 15:54
閱讀 2546·2019-08-30 12:46
閱讀 436·2019-08-30 11:06
閱讀 2846·2019-08-30 10:54
閱讀 557·2019-08-29 12:59