摘要:主要是為了實(shí)現(xiàn)系統(tǒng)之間的雙向解耦而實(shí)現(xiàn)的。問題及優(yōu)化隊(duì)列過長(zhǎng)問題使用上述方案的異步非阻塞可能會(huì)依賴于的任務(wù)隊(duì)列長(zhǎng)度,若隊(duì)列中的任務(wù)過多,則可能導(dǎo)致長(zhǎng)時(shí)間等待,降低效率。
Tornado和Celery介紹 1.Tornado
Tornado是一個(gè)用python編寫的一個(gè)強(qiáng)大的、可擴(kuò)展的異步HTTP服務(wù)器,同時(shí)也是一個(gè)web開發(fā)框架。tornado是一個(gè)非阻塞式web服務(wù)器,其速度相當(dāng)快。得利于其非阻塞的方式和對(duì) epoll的運(yùn)用,tornado每秒可以處理數(shù)以千計(jì)的連接,這意味著對(duì)于實(shí)時(shí)web服務(wù)來說,tornado是一個(gè)理想的web框架。它在處理嚴(yán)峻的網(wǎng)絡(luò)流量時(shí)表現(xiàn)得足夠強(qiáng)健,但卻在創(chuàng)建和編寫時(shí)有著足夠的輕量級(jí),并能夠被用在大量的應(yīng)用和工具中。2.Celery
進(jìn)一步了解和學(xué)習(xí)tornado可移步:tornado官方文檔
Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列, 同時(shí)也支持任務(wù)調(diào)度。Celery 中有兩個(gè)比較關(guān)鍵的概念:
Worker: worker 是一個(gè)獨(dú)立的進(jìn)程,它持續(xù)監(jiān)視隊(duì)列中是否有需要處理的任務(wù);
Broker: broker 也被稱為中間人或者協(xié)調(diào)者,broker 負(fù)責(zé)協(xié)調(diào)客戶端和 worker 的溝通。客戶端向 隊(duì)列添加消息,broker 負(fù)責(zé)把消息派發(fā)給 worker。
3.RabbitMQRabbitMQ是實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件的一種,最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。RabbitMQ主要是為了實(shí)現(xiàn)系統(tǒng)之間的雙向解耦而實(shí)現(xiàn)的。當(dāng)生產(chǎn)者大量產(chǎn)生數(shù)據(jù)時(shí),消費(fèi)者無法快速消費(fèi),那么需要一個(gè)中間層。保存這個(gè)數(shù)據(jù)。
例如一個(gè)日志系統(tǒng),很容易使用RabbitMQ簡(jiǎn)化工作量,一個(gè)Consumer可以進(jìn)行消息的正常處理,另一個(gè)Consumer負(fù)責(zé)對(duì)消息進(jìn)行日志記錄,只要在程序中指定兩個(gè)Consumer所監(jiān)聽的queue以相同的方式綁定到同一exchange即可,剩下的消息分發(fā)工作由RabbitMQ完成。
一般情況下,一個(gè)工具庫或者一個(gè)框架都是獨(dú)立的,有自己的feature或者功能點(diǎn),可能依賴其他的庫,但絕不依賴于其他服務(wù)。但是celery是一個(gè)特例,如果celery沒有broker這個(gè)服務(wù),那就完全不能用了。celery 支持多種 broker, 但主要以 RabbitMQ 和 Redis 為主,其他都是試驗(yàn)性的,雖然也可以使用, 但是沒有專門的維護(hù)者。官方推薦使用rabbitmq作為生產(chǎn)環(huán)境下的broker,redis雖然也在官方指名的broker之列,但是實(shí)際使用上有可能還會(huì)出現(xiàn)以下莫名其妙的問題。
Celery的配置和使用方法詳見:官方文檔
從Tornado的異步講起 tornado的同步阻塞用tornado進(jìn)行web開發(fā)的過程中(實(shí)際上用任何語言或者框架開發(fā)都會(huì)遇到),開發(fā)者可能會(huì)發(fā)現(xiàn)有時(shí)候tornado的響應(yīng)會(huì)變慢,追根溯源會(huì)發(fā)現(xiàn)原因之一就是因?yàn)樵撜?qǐng)求被其他請(qǐng)求阻塞了。這就有問題了?。。?!tornado不是標(biāo)榜自己是異步Http Web Server嗎?不是號(hào)稱自己解決了C10K問題了嗎?這是欺騙消費(fèi)者?。。?!
但是,深入了解tornado之后才發(fā)現(xiàn),人家說的異步非阻塞是有條件的,只有按照它說的來,才能實(shí)現(xiàn)真正的異步非阻塞。。。
我們先來看一個(gè)小例子:
#!/bin/env python import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import torndb import time from tornado.options import define, options define("port", default=8000, help="run on the given port", type=int) db = torndb.Connection("127.0.0.1:3306", "user_db", "username", "passwd") class MysqlHandler(tornado.web.RequestHandler): def get(self, flag): self.write(db.query("select * from table where flag=%s", flag)) class NowHandler(tornado.web.RequestHandler): def get(self): self.write("i want you, right now!") if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[ (r"/mysql_query/(d+)", MysqlHandler), (r"/i_want_you_now", NowHandler)]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()
當(dāng)我們先請(qǐng)求/mysql_query接口時(shí)再請(qǐng)求/i_want_you_now接口,會(huì)發(fā)現(xiàn)原來可以立刻返回的第二個(gè)請(qǐng)求卻被一直阻塞到第一個(gè)接口執(zhí)行完之后才返回。為什么?因?yàn)榇蟛糠謜eb框架都是使用的同步阻塞模型來處理請(qǐng)求的,tornado的默認(rèn)模型也不例外。但是tornado可是一個(gè)異步http服務(wù)器啊,不會(huì)這么弱吧?而且不上場(chǎng)景下都有一些相當(dāng)耗時(shí)的操作,這些操作就會(huì)阻塞其他一些普通的請(qǐng)求,應(yīng)該怎么解決這個(gè)問題?
相信很多使用過tornado的人會(huì)想到@tornado.web.asynchronous這個(gè)裝飾器,但是這就是tornado官方雞賊的地方了?。。⊙b飾器 web.asynchronous 只能用在verb函數(shù)之前(即get/post/delete等),并且需要搭配tornado異步客戶端使用,如httpclient.AsyncHTTPClient,或者,你需要異步執(zhí)行的那個(gè)函數(shù)(操作)必須也是異步的。。。(我是怨念滿滿的粗體?。。。?/strong>,而且加上這個(gè)裝飾器后,開發(fā)者必須在異步回調(diào)函數(shù)里顯式調(diào)用 RequestHandler.finish 才會(huì)結(jié)束這次 HTTP 請(qǐng)求。(因?yàn)閠ornado默認(rèn)在函數(shù)處理返回時(shí)會(huì)自動(dòng)關(guān)閉客戶端的連接)
什么意思呢?就是說,tornado:老子只給你提供異步的入口,你要是真想異步操作,要不你就使用我提供的一些異步客戶端來搞,不然你就自己實(shí)現(xiàn)一個(gè)異步的操作。
以操作MongoDB為例,如果你的函數(shù)中含有調(diào)用mongo的調(diào)用(使用pymongo庫),那么這時(shí)候你加asynchronous這個(gè)裝飾器就沒有任何效果了,因?yàn)槟愕膍ongo調(diào)用本身是同步的,如果想做成異步非阻塞的效果,需要使用mongo出品的另一個(gè)python driver -- motor,這個(gè)driver支持異步操作mongo,這時(shí)候你再加asynchronous裝飾器并操作mongo就可以實(shí)現(xiàn)異步非阻塞的效果了。
異步非阻塞的實(shí)現(xiàn)所以,如果要使用tornado的異步調(diào)用,第一,使用tornado內(nèi)置的異步客戶端如httpclient.AsyncHTTPClient等;第二,可參考內(nèi)置異步客戶端,借助tornado.ioloop.IOLoop封裝一個(gè)自己的異步客戶端,但開發(fā)成本并不小。
然而,天無絕人之路,還是有辦法可以用較低的成本實(shí)現(xiàn)tornado的異步非阻塞的,那就是借助celery項(xiàng)目。前面說了,它是一個(gè)分布式的實(shí)時(shí)處理消息隊(duì)列調(diào)度系統(tǒng),tornado接到請(qǐng)求后,可以把所有的復(fù)雜業(yè)務(wù)邏輯處理、數(shù)據(jù)庫操作以及IO等各種耗時(shí)的同步任務(wù)交給celery,由這個(gè)任務(wù)隊(duì)列異步處理完后,再返回給tornado。這樣只要保證tornado和celery的交互是異步的,那么整個(gè)服務(wù)是完全異步的。至于如何保證tornado和celery之間的交互是異步的,可以借助tornado-celery這個(gè)適配器來實(shí)現(xiàn)。
celery配合rabbitmq的工作流程如下:
這里我們來使用這幾個(gè)組件重寫前面的同步阻塞的例子:
#!/bin/env python import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import time import tcelery, tasks from tornado.options import define, options tcelery.setup_nonblocking_producer() define("port", default=8000, help="run on the given port", type=int) class AsyncMysqlHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self, flag): res = yield tornado.gen.Task(tasks.query_mysql.apply_async, args=[flag]) self.write(res.result) self.finish() class NowHandler(tornado.web.RequestHandler): def get(self): self.write("i want you, right now!") if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[ (r"/mysql_query/(d+)", AsyncMysqlHandler), (r"/i_want_you_now", NowHandler)]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()
這里有個(gè)新的tornado.gen.coroutine裝飾器, coroutine是3.0之后新增的裝飾器.以前的辦法是用回調(diào)函數(shù)的方式進(jìn)行異步調(diào)用,如果使用回調(diào)函數(shù)的方式,則代碼如下:
#!/bin/env python import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import time import tcelery, tasks from tornado.options import define, options tcelery.setup_nonblocking_producer() define("port", default=8000, help="run on the given port", type=int) class AsyncMysqlHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, flag): tasks.query_mysql.apply_async(args=[flag], callback=self.on_result) def on_result(self, response): self.write(response.result) self.finish() class NowHandler(tornado.web.RequestHandler): def get(self): self.write("i want you, right now!") if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[ (r"/mysql_query/(d+)", AsyncMysqlHandler), (r"/i_want_you_now", NowHandler)]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()
使用callback的話始終覺得會(huì)是的代碼結(jié)構(gòu)變得比較混亂,試想如果有大量異步回調(diào),每一個(gè)都寫一個(gè)回調(diào)函數(shù)的話,勢(shì)必導(dǎo)致項(xiàng)目代碼結(jié)構(gòu)變得不那么清晰和優(yōu)雅,畢竟回調(diào)這種反人類的寫法還是很多人不喜歡的,但也看個(gè)人喜好,不喜歡callback風(fēng)格的可以使用yield來進(jìn)行異步調(diào)用。
tasks.py集中放置開發(fā)者需要異步執(zhí)行的函數(shù)。
import time import torndb from celery import Celery db = torndb.Connection("127.0.0.1:3306", "user_db", "username", "passwd") app = Celery("tasks", broker="amqp://guest:guest@localhost:5672") app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672" @app.task(name="task.query_users") def query_mysql(flag): return db.query("select * from table where flag=%s", flag) if __name__ == "__main__": app.start()
然后啟動(dòng)celery worker監(jiān)聽任務(wù)隊(duì)列(消費(fèi)者會(huì)從任務(wù)隊(duì)列中取走一個(gè)個(gè)的task并執(zhí)行):
celery -A tasks worker --loglevel=info
自此,依靠這種架構(gòu),可以實(shí)現(xiàn)tornado處理請(qǐng)求的完全異步調(diào)用。
問題及優(yōu)化 1.隊(duì)列過長(zhǎng)問題使用上述方案的異步非阻塞可能會(huì)依賴于celery的任務(wù)隊(duì)列長(zhǎng)度,若隊(duì)列中的任務(wù)過多,則可能導(dǎo)致長(zhǎng)時(shí)間等待,降低效率。
解決方案:
啟動(dòng)多個(gè)celery worker監(jiān)聽任務(wù)隊(duì)列,使用多進(jìn)程并發(fā)消費(fèi)任務(wù)隊(duì)列,celery命令可以通過-concurrency參數(shù)來指定用來執(zhí)行任務(wù)而prefork的worker進(jìn)程,如果所有的worker都在執(zhí)行任務(wù),那么新添加的任務(wù)必須要等待有一個(gè)正在執(zhí)行的任務(wù)完成后才能被執(zhí)行,默認(rèn)的concurrency數(shù)量是機(jī)器上CPU的數(shù)量。另外,celery是支持好幾個(gè)并發(fā)模式的,有prefork,threading,協(xié)程(gevent,eventlet),prefork在celery的介紹是,默認(rèn)是用了multiprocess來實(shí)現(xiàn)的;可以通過-p參數(shù)指定其他的并發(fā)模型,如gevent(需自己配置好gevent環(huán)境)。
建立多個(gè)任務(wù)queue,把大量的任務(wù)分發(fā)到不同的queue中,減輕單個(gè)queue時(shí)可能出現(xiàn)的任務(wù)數(shù)量過載。
2.水平擴(kuò)展優(yōu)化前面說了celery是一個(gè)分布式系統(tǒng),也就是說,基于celery的項(xiàng)目可無痛實(shí)現(xiàn)分布式擴(kuò)展,前面寫的tornado和celery配合的demo,也可以實(shí)現(xiàn)獨(dú)立部署,即tornado server和celery server其實(shí)可以分開部署,即分布在不同的服務(wù)器上,celery server部署自己的tasks.py任務(wù),并啟動(dòng)celery worker監(jiān)聽,然后在tornado server上添加以下代碼:
from celery import Celery app = Celery(broker = "amqp://",)
并使用Celery的send_task函數(shù)調(diào)用任務(wù):
app.send_task("function_name", args=[param1, param2, param3...])
即可實(shí)現(xiàn)tornado和celery的完全解耦。
后續(xù):另外,了解到tornado.concurrent.futures(py3自帶這個(gè)庫,py2需多帶帶安裝)這個(gè)module可以實(shí)現(xiàn)自定義函數(shù)的異步化,目前還沒有深入了解這個(gè)東西,有時(shí)間去研究一下這個(gè)東西,有心得再分享一下這個(gè)module相關(guān)的知識(shí)。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/41978.html
摘要:中常用的幾個(gè)框架有等,今天來總結(jié)一下和的不同。本文使用的環(huán)境是。文件可以加載路由信息和項(xiàng)目配置信息,文件負(fù)責(zé)啟動(dòng)項(xiàng)目。以上就簡(jiǎn)單的比較了和幾個(gè)方面的不同,它們各有優(yōu)缺點(diǎn),實(shí)際工作中可以根據(jù)不同的需求選擇不同的框架進(jìn)行開發(fā)。 python中常用的幾個(gè)web框架有django, tornado, flask等,今天來總結(jié)一下django和tornado的不同。工作中django和torna...
摘要:是什么是一個(gè)由編寫的簡(jiǎn)單靈活可靠的用來處理大量信息的分布式系統(tǒng)它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具。專注于實(shí)時(shí)任務(wù)處理,支持任務(wù)調(diào)度。說白了,它是一個(gè)分布式隊(duì)列的管理工具,我們可以用提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列。 Celery 是什么? Celery 是一個(gè)由 Python 編寫的簡(jiǎn)單、靈活、可靠的用來處理大量信息的分布式系統(tǒng),它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工...
摘要:這是我重新復(fù)習(xí)的原因放棄了之前自己實(shí)現(xiàn)的全面擁抱的這個(gè)改動(dòng)是非常大的而且閱讀的源碼可以發(fā)現(xiàn)其中大部分函數(shù)都支持了類型檢驗(yàn)和返回值提示值得閱讀 廢話不多說,直接上代碼 __auth__ = aleimu __doc__ = 學(xué)習(xí)tornado6.0+ 版本與python3.7+ import time import asyncio import tornado.gen import t...
摘要:初步分析提升可從兩方面入手,一個(gè)是增加并發(fā)數(shù),其二是減少平均響應(yīng)時(shí)間。大部分的時(shí)間花在系統(tǒng)與數(shù)據(jù)庫的交互上,到這,便有了一個(gè)優(yōu)化的主題思路最大限度的降低平均響應(yīng)時(shí)間。不要輕易否定一項(xiàng)公認(rèn)的技術(shù)真理,要拿數(shù)據(jù)說話。 本文最早發(fā)表于個(gè)人博客:PylixmWiki 應(yīng)項(xiàng)目的需求,我們使用tornado開發(fā)了一個(gè)api系統(tǒng),系統(tǒng)開發(fā)完后,在8核16G的虛機(jī)上經(jīng)過壓測(cè)qps只有200+。與我們當(dāng)...
摘要:使用異步框架,例如等等,裝飾異步任務(wù)。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。不存儲(chǔ)任務(wù)狀態(tài)。標(biāo)識(shí)要使用的默認(rèn)序列化方法的字符串。指定該任務(wù)的結(jié)果存儲(chǔ)后端用于此任務(wù)。 概述: ????????我們考慮一個(gè)場(chǎng)景,公司有一個(gè)需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...
閱讀 736·2023-04-25 19:43
閱讀 3981·2021-11-30 14:52
閱讀 3807·2021-11-30 14:52
閱讀 3871·2021-11-29 11:00
閱讀 3802·2021-11-29 11:00
閱讀 3904·2021-11-29 11:00
閱讀 3580·2021-11-29 11:00
閱讀 6183·2021-11-29 11:00