成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Django中celery的使用項目實例

89542767 / 699人閱讀

  小編寫這篇文章的主要目的,主要是給大家去進(jìn)行講解Django項目實例情況,包括celery的一些具體使用情況介紹,學(xué)習(xí)這些的話,對我們的工作和生活幫助還是很大的,但是怎么樣才能夠更快的進(jìn)行上手呢?下面就一個具體實例給大家進(jìn)行解答。


  1、django應(yīng)用Celery


  django框架請求/響應(yīng)的過程是同步的,框架本身無法實現(xiàn)異步響應(yīng)。


  但是我們在項目過程中會經(jīng)常會遇到一些耗時的任務(wù),比如:發(fā)送郵件、發(fā)送短信、大數(shù)據(jù)統(tǒng)計等等,這些操作耗時長,同步執(zhí)行對用戶體驗非常不友好,那么在這種情況下就需要實現(xiàn)異步執(zhí)行。


  異步執(zhí)行前端一般使用ajax,后端使用Celery。


  2、項目應(yīng)用


  django項目應(yīng)用celery,主要有兩種任務(wù)方式,一是異步任務(wù)(發(fā)布者任務(wù)),一般是web請求,二是定時任務(wù)。


  celery組成


  Celery是由Python開發(fā)、簡單、靈活、可靠的分布式任務(wù)隊列,是一個處理異步任務(wù)的框架,其本質(zhì)是生產(chǎn)者消費(fèi)者模型,生產(chǎn)者發(fā)送任務(wù)到消息隊列,消費(fèi)者負(fù)責(zé)處理任務(wù)。Celery側(cè)重于實時操作,但對調(diào)度支持也很好,其每天可以處理數(shù)以百萬計的任務(wù)。特點:


  簡單:熟悉celery的工作流程后,配置使用簡單


  高可用:當(dāng)任務(wù)執(zhí)行失敗或執(zhí)行過程中發(fā)生連接中斷,celery會自動嘗試重新執(zhí)行任務(wù)


  快速:一個單進(jìn)程的celery每分鐘可處理上百萬個任務(wù)


  靈活:幾乎celery的各個組件都可以被擴(kuò)展及自定制


  Celery由三部分構(gòu)成:


  消息中間件(Broker):官方提供了很多備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached等,官方推薦RabbitMQ


  任務(wù)執(zhí)行單元(Worker):任務(wù)執(zhí)行單元,負(fù)責(zé)從消息隊列中取出任務(wù)執(zhí)行,它可以啟動一個或者多個,也可以啟動在不同的機(jī)器節(jié)點,這就是其實現(xiàn)分布式的核心


  結(jié)果存儲(Backend):官方提供了諸多的存儲方式支持:RabbitMQ、Redis、Memcached,SQLAlchemy,Django ORM、Apache Cassandra、Elasticsearch等


  架構(gòu)如下:

01.png

  工作原理:


  任務(wù)模塊Task包含異步任務(wù)和定時任務(wù)。其中,異步任務(wù)通常在業(yè)務(wù)邏輯中被觸發(fā)并發(fā)往消息隊列,而定時任務(wù)由Celery Beat進(jìn)程周期性地將任務(wù)發(fā)往消息隊列;


  任務(wù)執(zhí)行單元Worker實時監(jiān)視消息隊列獲取隊列中的任務(wù)執(zhí)行;


  Woker執(zhí)行完任務(wù)后將結(jié)果保存在Backend中;


  本文使用的是redis數(shù)據(jù)庫作為消息中間件和結(jié)果存儲數(shù)據(jù)庫


  1.異步任務(wù)redis


  1.安裝庫


  pip install celery
  pip install redis


  2.celery.py


  在主項目目錄下,新建celery.py文件:


  import os
  import django
  from celery import Celery
  from django.conf import settings
  #設(shè)置系統(tǒng)環(huán)境變量,安裝django,必須設(shè)置,否則在啟動celery時會報錯
  #celery_study是當(dāng)前項目名
  os.environ.setdefault('DJANGO_SETTINGS_MODULE','celery_study.settings')
  django.setup()
  celery_app=Celery('celery_study')
  celery_app.config_from_object('django.conf:settings')
  celery_app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)

 02.png

    注意:是和settings.py文件同目錄,一定不能建立在項目根目錄,不然會引起celery這個模塊名的命名沖突


  同時,在主項目的init.py中,添加如下代碼:


  from.celery import celery_app
  __all__=['celery_app']

03.png

  3.settings.py


  在配置文件中配置對應(yīng)的redis配置:


  #Broker配置,使用Redis作為消息中間件
  BROKER_URL='redis://127.0.0.1:6379/0'
  #BACKEND配置,這里使用redis
  CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0'
  #結(jié)果序列化方案
  CELERY_RESULT_SERIALIZER='json'
  #任務(wù)結(jié)果過期時間,秒
  CELERY_TASK_RESULT_EXPIRES=60*60*24
  #時區(qū)配置
  CELERY_TIMEZONE='Asia/Shanghai'
  #指定導(dǎo)入的任務(wù)模塊,可以指定多個
  #CELERY_IMPORTS=(
  #'other_dir.tasks',
  #)

04.png

  注意:所有配置的官方文檔:Configuration and defaults—Celery 5.2.0b3 documentation


  4.tasks.py


  在子應(yīng)用下建立各自對應(yīng)的任務(wù)文件tasks.py(必須是tasks.py這個名字,不允許修改)


  from celery import shared_task
  shared_task
  def add(x,y):
  return x+y
  shared_task
  def mul(x,y):
  return x*y
  shared_task
  def xsum(numbers):
  return sum(numbers)

05.png

  5.調(diào)用任務(wù)


  from.tasks import*
  #Create your views here.
  def task_add_view(request):
  add.delay(100,200)
  return HttpResponse(f'調(diào)用函數(shù)結(jié)果')

06.png

  6.啟動celery


  pip install eventlet
  celery-A celery_study worker-l debug-P eventlet

  注意:celery_study是項目名


  使用redis時,有可能會出現(xiàn)如下類似的異常

  AttributeError:'str'object has no attribute'items'


  這是由于版本差異,需要卸載已經(jīng)安裝的python環(huán)境中的redis庫,重新指定安裝特定版本(celery4.x以下適用redis2.10.6,celery4.3以上使用redis3.2.0以上):

  xxxxxxxxxx pip install redis==2.10.6


  7.獲取任務(wù)結(jié)果


  在views.py中,通過AsyncResult.get()獲取結(jié)果


  from celery import result
  def get_result_by_taskid(request):
  task_id=request.GET.get('task_id')
  #異步執(zhí)行
  ar=result.AsyncResult(task_id)
  if ar.ready():
  return JsonResponse({'status':ar.state,'result':ar.get()})
  else:
  return JsonResponse({'status':ar.state,'result':''})

  

      AsyncResult類的常用的屬性和方法:


  state:返回任務(wù)狀態(tài),等同status;


  task_id:返回任務(wù)id;


  result:返回任務(wù)結(jié)果,同get()方法;


  ready():判斷任務(wù)是否執(zhí)行以及有結(jié)果,有結(jié)果為True,否則False;


  info():獲取任務(wù)信息,默認(rèn)為結(jié)果;


  wait(t):等待t秒后獲取結(jié)果,若任務(wù)執(zhí)行完畢,則不等待直接獲取結(jié)果,若任務(wù)在執(zhí)行中,則wait期間一直阻塞,直到超時報錯;


  successful():判斷任務(wù)是否成功,成功為True,否則為False;


  2.定時任務(wù)


  在第一步的異步任務(wù)的基礎(chǔ)上,進(jìn)行部分修改即可


  1.settings.py


  from celery.schedules import crontab
  CELERYBEAT_SCHEDULE={
  'mul_every_30_seconds':{
  #任務(wù)路徑
  'task':'celery_app.tasks.mul',
  #每30秒執(zhí)行一次
  'schedule':5,
  'args':(14,5)
  }
  }

08.png

  說明(更多內(nèi)容見文檔:Periodic Tasks—Celery 5.2.0b3 documentation):


  task:任務(wù)函數(shù)


  schedule:執(zhí)行頻率,可以是整型(秒數(shù)),也可以是timedelta對象,也可以是crontab對象,也可以是自定義類(繼承celery.schedules.schedule)


  args:位置參數(shù),列表或元組


  kwargs:關(guān)鍵字參數(shù),字典


  options:可選參數(shù),字典,任何apply_async()支持的參數(shù)


  relative:默認(rèn)是False,取相對于beat的開始時間;設(shè)置為True,則取設(shè)置的timedelta時間


  在task.py中設(shè)置了日志


  from celery import shared_task
  import logging
  logger=logging.getLogger(__name__))
  shared_task
  def mul(x,y):
  logger.info('___mul__'*10)
  return x*y


  2.啟動celery


  (兩個cmd)分別啟動worker和beat


  celery-A worker celery_study-l debug-P eventlet
  celery beat-A celery_study-l debug


  3.任務(wù)綁定


  Celery可通過task綁定到實例獲取到task的上下文,這樣我們可以在task運(yùn)行時候獲取到task的狀態(tài),記錄相關(guān)日志等


  方法:


  在裝飾器中加入?yún)?shù)bind=True


  在task函數(shù)中的第一個參數(shù)設(shè)置為self


  在task.py里面寫


  from celery import shared_task
  import logging
  logger=logging.getLogger(__name__)
  #任務(wù)綁定
  shared_task(bind=True)
  def add(self,x,y):
  logger.info('add__-----'*10)
  logger.info('name:',self.name)
  logger.info('dir(self)',dir(self))
  return x+y


  其中:self對象是celery.app.task.Task的實例,可以用于實現(xiàn)重試等多種功能


  from celery import shared_task
  import logging
  logger=logging.getLogger(__name__)
  #任務(wù)綁定
  shared_task(bind=True)
  def add(self,x,y):
  try:
  logger.info('add__-----'*10)
  logger.info('name:',self.name)
  logger.info('dir(self)',dir(self))
  raise Exception
  except Exception as e:
  #出錯每4秒嘗試一次,總共嘗試4次
  self.retry(exc=e,countdown=4,max_retries=4)
  return x+y
  啟動celery
  celery-A worker celery_study-l debug-P eventlet


  4.任務(wù)鉤子


  Celery在執(zhí)行任務(wù)時,提供了鉤子方法用于在任務(wù)執(zhí)行完成時候進(jìn)行對應(yīng)的操作,在Task源碼中提供了很多狀態(tài)鉤子函數(shù)如:on_success(成功后執(zhí)行)、on_failure(失敗時候執(zhí)行)、on_retry(任務(wù)重試時候執(zhí)行)、after_return(任務(wù)返回時候執(zhí)行)


  方法:通過繼承Task類,重寫對應(yīng)方法即可,


  from celery import Task
  class MyHookTask(Task):
  def on_success(self,retval,task_id,args,kwargs):
  logger.info(f'task id:{task_id},arg:{args},successful!')
  def on_failure(self,exc,task_id,args,kwargs,einfo):
  logger.info(f'task id:{task_id},arg:{args},failed!erros:{exc}')
  def on_retry(self,exc,task_id,args,kwargs,einfo):
  logger.info(f'task id:{task_id},arg:{args},retry!erros:{exc}')
  #在對應(yīng)的task函數(shù)的裝飾器中,通過base=MyHookTask指定
  shared_task(base=MyHookTask,bind=True)
  def add(self,x,y):
  logger.info('add__-----'*10)
  logger.info('name:',self.name)
  logger.info('dir(self)',dir(self))
  return x+y


  啟動celery

  celery-A worker celery_study-l debug-P eventlet


  5.任務(wù)編排


  在很多情況下,一個任務(wù)需要由多個子任務(wù)或者一個任務(wù)需要很多步驟才能完成,Celery也能實現(xiàn)這樣的任務(wù),完成這類型的任務(wù)通過以下模塊完成:


  group:并行調(diào)度任務(wù)


  chain:鏈?zhǔn)饺蝿?wù)調(diào)度


  chord:類似group,但分header和body2個部分,header可以是一個group任務(wù),執(zhí)行完成后調(diào)用body的任務(wù)


  map:映射調(diào)度,通過輸入多個入?yún)矶啻握{(diào)度同一個任務(wù)


  starmap:類似map,入?yún)㈩愃疲猘rgs


  chunks:將任務(wù)按照一定數(shù)量進(jìn)行分組


  文檔:Next Steps—Celery 5.2.0b3 documentation


  1.group


  urls.py:

  path('primitive/',views.test_primitive),


  views.py:


  from.tasks import*
  from celery import group
  def test_primitive(request):


  #創(chuàng)建10個并列的任務(wù)


  lazy_group=group(add.s(i,i)for i in range(10))


  promise=lazy_group()


  result=promise.get()


  return JsonResponse({'function':'test_primitive','result':result})


  說明:


  通過task函數(shù)的s方法傳入?yún)?shù),啟動任務(wù)


  上面這種方法需要進(jìn)行等待,如果依然想實現(xiàn)異步的方式,那么就必須在tasks.py中新建一個task方法,調(diào)用group,示例如下:


  tasks.py:


  shared_task
  def group_task(num):
  return group(add.s(i,i)for i in range(num))().get()
  urls.py:
  path('first_group/',views.first_group),
  views.py:
  def first_group(request):
  ar=tasks.group_task.delay(10)
  return HttpResponse('返回first_group任務(wù),task_id:'+ar.task_id)


  2.chain


  默認(rèn)上一個任務(wù)的結(jié)果作為下一個任務(wù)的第一個參數(shù)


  def test_primitive(request):
  #等同調(diào)用mul(add(add(2,2),5),8)
  promise=chain(tasks.add.s(2,2),tasks.add.s(5),tasks.mul.s(8))()
  #72
  result=promise.get()
  return JsonResponse({'function':'test_primitive','result':result})

  3.chord


  任務(wù)分割,分為header和body兩部分,hearder任務(wù)執(zhí)行完在執(zhí)行body,其中hearder返回結(jié)果作為參數(shù)傳遞給body


  def test_primitive(request):
  #header:[3,12]
  #body:xsum([3,12])
  promise=chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
  result=promise.get()
  return JsonResponse({'function':'test_primitive','result':result})


  6、celery管理和監(jiān)控


  celery通過flower組件實現(xiàn)管理和監(jiān)控功能,flower組件不僅僅提供監(jiān)控功能,還提供HTTP API可實現(xiàn)對woker和task的管理


  官網(wǎng):flower·PyPI


  文檔:Flower-Celery monitoring tool—Flower 1.0.1 documentation


  安裝flower

  pip install flower


  啟動flower

  flower-A celery_study--port=5555


  說明:


  -A:項目名


  --port:端口號


  訪問


  在瀏覽器輸入:http://127.0.0.1:5555


  通過api操作

  curl http://127.0.0.1:5555/api/workers


  到此為止,這篇文章就給大家介紹完畢了,希望能給大家?guī)韼椭?/p>

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/128278.html

相關(guān)文章

  • Celery實際使用與內(nèi)存泄漏問題(面試)

    摘要:結(jié)論執(zhí)行完任務(wù)不釋放內(nèi)存與原一直沒有被銷毀有關(guān),因此可以適當(dāng)配置小點,而任務(wù)并發(fā)數(shù)與配置項有關(guān),每增加一個必然增加內(nèi)存消耗,同時也影響到一個何時被銷毀,因為是均勻調(diào)度任務(wù)至每個,因此也不宜配置過大,適當(dāng)配置。 1.實際使用 ? 監(jiān)控task的執(zhí)行結(jié)果:任務(wù)id,結(jié)果,traceback,children,任務(wù)狀態(tài) ? 配置 backend=redis://127...

    0x584a 評論0 收藏0
  • django開發(fā)-使用celery搭建分布式(多節(jié)點)任務(wù)隊列

    摘要:今天介紹一下如何在項目中使用搭建一個有兩個節(jié)點的任務(wù)隊列一個主節(jié)點一個子節(jié)點主節(jié)點發(fā)布任務(wù),子節(jié)點收到任務(wù)并執(zhí)行。 今天介紹一下如何在django項目中使用celery搭建一個有兩個節(jié)點的任務(wù)隊列(一個主節(jié)點一個子節(jié)點;主節(jié)點發(fā)布任務(wù),子節(jié)點收到任務(wù)并執(zhí)行。搭建3個或者以上的節(jié)點就類似了),使用到了celery,rabbitmq。這里不會單獨介紹celery和rabbitmq中的知識了...

    ConardLi 評論0 收藏0
  • django使用celery

    摘要:前言針對高延時任務(wù)直接在一次網(wǎng)絡(luò)請求中處理完畢會導(dǎo)致很不好的體驗則可以不阻塞請求后臺處理這些任務(wù)并且可以使用的進(jìn)行數(shù)據(jù)庫操作環(huán)境其他創(chuàng)建工程此時項目結(jié)構(gòu)如下修改添加修改創(chuàng)建新創(chuàng) 前言: 針對高延時任務(wù), 直接在一次網(wǎng)絡(luò)請求中處理完畢會導(dǎo)致很不好的體驗, celery則可以不阻塞請求后臺處理這些任務(wù), 并且可以使用django的models進(jìn)行數(shù)據(jù)庫操作. 環(huán)境 python model...

    meislzhua 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<