成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

django開發(fā)-使用celery搭建分布式(多節(jié)點(diǎn))任務(wù)隊(duì)列

ConardLi / 3688人閱讀

摘要:今天介紹一下如何在項(xiàng)目中使用搭建一個(gè)有兩個(gè)節(jié)點(diǎn)的任務(wù)隊(duì)列一個(gè)主節(jié)點(diǎn)一個(gè)子節(jié)點(diǎn)主節(jié)點(diǎn)發(fā)布任務(wù),子節(jié)點(diǎn)收到任務(wù)并執(zhí)行。

今天介紹一下如何在django項(xiàng)目中使用celery搭建一個(gè)有兩個(gè)節(jié)點(diǎn)的任務(wù)隊(duì)列(一個(gè)主節(jié)點(diǎn)一個(gè)子節(jié)點(diǎn);主節(jié)點(diǎn)發(fā)布任務(wù),子節(jié)點(diǎn)收到任務(wù)并執(zhí)行。搭建3個(gè)或者以上的節(jié)點(diǎn)就類似了),使用到了celery,rabbitmq。這里不會(huì)多帶帶介紹celery和rabbitmq中的知識(shí)了。

1.項(xiàng)目基礎(chǔ)環(huán)境:
兩個(gè)ubuntu18.04虛擬機(jī)、python3.6.5、django2.0.4、celery3.1.26post2

2.主節(jié)點(diǎn)django項(xiàng)目結(jié)構(gòu):

3.settings.py中關(guān)于celery的配置:

import djcelery
# 此處的Queue和Exchange都涉及到RabbitMQ中的概念,這里不做介紹
from kombu import Queue, Exchange
djcelery.setup_loader()
BROKER_URL = "amqp://test:[email protected]:5672/testhost"
CELERY_RESULT_BACKEND = "amqp://test:[email protected]:5672/testhost"

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER="json"
CELERY_RESULT_SERIALIZER="json"
# CELERY_ACCEPT_CONTENT = ["json", "pickle", "msgpack", "yaml"]

CELERY_DEFAULT_EXCHANGE = "train"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"

CELERY_IMPORTS = ("proj.celery1.tasks", )

CELERY_QUEUES = (
  Queue("train", routing_key="train"),
  Queue("predict", routing_key="predict"),
)

4.celery.py中的配置:

# coding:utf8
from __future__ import absolute_import

import os

from celery import Celery
from django.conf import settings

# set the default Django settings module for the "celery" program.

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")

app = Celery("proj")

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object("django.conf:settings")
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks(settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))

5.proj/init.py中的配置:

from __future__ import absolute_import
from .celery import app as celery_app

6.celery1/tasks.py:(主節(jié)點(diǎn)中的任務(wù)不會(huì)執(zhí)行,只執(zhí)行子節(jié)點(diǎn)中的任務(wù))

from __future__ import absolute_import
from celery import task


@task
def do_train(x, y):
    return x + y

7.celery1/views.py:

from .tasks import do_train
class Test1View(APIView):
    def get(self, request):
        try:
            # 這里的queue和routing_key也涉及到RabiitMQ中的知識(shí)
            # 關(guān)鍵,在這里控制向哪個(gè)queue中發(fā)送任務(wù),子節(jié)點(diǎn)通過(guò)這個(gè)執(zhí)行對(duì)應(yīng)queue中的任務(wù)
            ret = do_train.apply_async(args=[4, 2], queue="train", routing_key="train")
            # 獲取結(jié)果
            data = ret.get()
        except Exception as e:
            return Response(dict(msg=str(e), code=10001))
        return Response(dict(msg="OK", code=10000, data=data))

8.子節(jié)點(diǎn)目錄結(jié)構(gòu):

9.子節(jié)點(diǎn)中celery1/celery.py:

from __future__ import absolute_import
from celery import Celery
CELERY_IMPORTS = ("celery1.tasks", )
app = Celery("myapp",
             # 此處涉及到RabbitMQ的知識(shí),RabbitMQ是對(duì)應(yīng)主節(jié)點(diǎn)上的
             broker="amqp://test:[email protected]:5672/testhost",
             backend="amqp://test:[email protected]:5672/testhost",
             include=["celery1.tasks"])

app.config_from_object("celery1.config")

if __name__ == "__main__":
  app.start()

10.子節(jié)點(diǎn)中celery1/config.py:

from __future__ import absolute_import
from kombu import Queue,Exchange
from datetime import timedelta

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER="json"
CELERY_RESULT_SERIALIZER="json"
CELERY_ACCEPT_CONTENT = ["json","pickle","msgpack","yaml"]

CELERY_DEFAULT_EXCHANGE = "train"
# exchange type可以看RabbitMQ中的相關(guān)內(nèi)容
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"

CELERT_QUEUES =  (
  Queue("train",exchange="train",routing_key="train"),
)

11.子節(jié)點(diǎn)celery1/tasks.py:(這個(gè)是要真正執(zhí)行的task,每個(gè)節(jié)點(diǎn)可以不同)

from __future__ import absolute_import
from celery1.celery import app


import time
from celery import task


@task
def do_train(x, y):
    """
    訓(xùn)練
    :param data:
    :return:
    """
    time.sleep(3)
    return dict(data=str(x+y),msg="train")

12.啟動(dòng)子節(jié)點(diǎn)中的celery:
celery1是項(xiàng)目,-Q train表示從train這個(gè)queue中接收任務(wù)

celery -A celery1 worker -l info -Q train

13.啟動(dòng)主節(jié)點(diǎn)中的django項(xiàng)目:

python manage.py runserver

14.使用Postman請(qǐng)求對(duì)應(yīng)的view

請(qǐng)求url:http://127.0.0.1:8000/api/v1/celery1/test/
返回的結(jié)果是:
{
    "msg": "OK",
    "code": 10000,
    "data": {
        "data": "6",
        "msg": "train"
    }
}

15.遇到的問(wèn)題:
1)celery隊(duì)列報(bào)錯(cuò): AttributeError: ‘str’ object has no attribute ‘items’
解決:將redis庫(kù)從3.0回退到了2.10,pip install redis==2.10
解決方法參考鏈接:https://stackoverflow.com/que...

今天就說(shuō)到這里,如有疑問(wèn),歡迎交流。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/42714.html

相關(guān)文章

  • Django使用celery 異步發(fā)送短信驗(yàn)證碼

    摘要:介紹應(yīng)用舉例是一個(gè)基于開發(fā)的分布式異步消息任務(wù)隊(duì)列,通過(guò)它可以輕松的實(shí)現(xiàn)任務(wù)的異步處理,如果你的業(yè)務(wù)場(chǎng)景中需要用到異步任務(wù),就可以考慮使用你想對(duì)臺(tái)機(jī)器執(zhí)行一條批量命令,可能會(huì)花很長(zhǎng)時(shí)間,但你不想讓你的程序等著結(jié)果返回,? celery 1.celery介紹 1.1 celery應(yīng)用舉例 Celery 是一個(gè) 基于python開發(fā)的分布式異步消息任務(wù)隊(duì)列,通過(guò)...

    everfly 評(píng)論0 收藏0
  • celery 使用

    摘要:使用消息來(lái)通信,流程為客戶端添加消息到隊(duì)列來(lái)初始化一個(gè)任務(wù),然后消息隊(duì)列系統(tǒng)把消息分發(fā)給工作進(jìn)程??梢园鄠€(gè)工作進(jìn)程和消息系統(tǒng),來(lái)保證高可用性和進(jìn)行水平擴(kuò)展。保存結(jié)果可以使用很多例如的,,。 celery是一個(gè)簡(jiǎn)單的、靈活的、可靠的分布式系統(tǒng),提供了工具來(lái)維護(hù)這樣一個(gè)系統(tǒng),用于處理大量的信息(實(shí)時(shí)信息、定時(shí)任務(wù)安排),是一個(gè)任務(wù)隊(duì)列,易于使用,易于和其他語(yǔ)言進(jìn)行配合。 任務(wù)隊(duì)列 任務(wù)...

    GraphQuery 評(píng)論0 收藏0
  • Celery任務(wù)隊(duì)列

    摘要:文檔中文文檔官方文檔定時(shí)服務(wù)與結(jié)合使用簡(jiǎn)介是一個(gè)自帶電池的的任務(wù)隊(duì)列。追蹤任務(wù)在不同狀態(tài)間的遷移,并檢視返回值。 文檔 中文文檔 官方文檔 celery定時(shí)服務(wù)、celery與django結(jié)合使用 簡(jiǎn)介 Celery 是一個(gè)自帶電池的的任務(wù)隊(duì)列。它易于使用,所以你可以無(wú)視其所解決問(wèn)題的復(fù)雜程度而輕松入門。它遵照最佳實(shí)踐設(shè)計(jì),所以你的產(chǎn)品可以擴(kuò)展,或與其他語(yǔ)言集成,并且它自帶了在生產(chǎn)...

    Lorry_Lu 評(píng)論0 收藏0
  • Djangocelery使用項(xiàng)目實(shí)例

      小編寫這篇文章的主要目的,主要是給大家去進(jìn)行講解Django項(xiàng)目實(shí)例情況,包括celery的一些具體使用情況介紹,學(xué)習(xí)這些的話,對(duì)我們的工作和生活幫助還是很大的,但是怎么樣才能夠更快的進(jìn)行上手呢?下面就一個(gè)具體實(shí)例給大家進(jìn)行解答。  1、django應(yīng)用Celery  django框架請(qǐng)求/響應(yīng)的過(guò)程是同步的,框架本身無(wú)法實(shí)現(xiàn)異步響應(yīng)。  但是我們?cè)陧?xiàng)目過(guò)程中會(huì)經(jīng)常會(huì)遇到一些耗時(shí)的任務(wù),比如:...

    89542767 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<