摘要:清楚了以上流程,我們直接來看函數(shù)主要用作初始化應(yīng)用監(jiān)聽端口以及啟動。其中就是保存聊天室所有聊天消息的結(jié)構(gòu)。關(guān)于的解讀我會放到閱讀源碼時(shí)講。然后把消息加到緩存里,如果緩存大于限制則取最新的條消息。
tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應(yīng)用: chatdemo
首先看 chatdemo 的目錄結(jié)構(gòu):
├── chatdemo.py ├── static │?? ├── chat.css │?? └── chat.js └── templates ├── index.html ├── message.html └── room.html
非常簡單,基本沒有分層,三個(gè)模版一個(gè) js 一個(gè) css ,還有一個(gè)最重要的 chatdemo.py
本文的重點(diǎn)是弄清楚 chatdemo.py 的運(yùn)行流程,所以對于此項(xiàng)目的其他文件,包括模版及 chat.js 的實(shí)現(xiàn)都不會分析,只要知道 chat.js 的工作流程相信對于理解 chatdemo.py 沒有任何問題
此 demo 主要基于長輪詢。 獲取新消息的原理:
在 chat.js 中有一個(gè)定時(shí)器會定時(shí)執(zhí)行 update 操作
當(dāng)沒有新消息時(shí) tornado 會一直 hold 住 chat.js 發(fā)來的 update 請求
當(dāng)有新消息時(shí) tornado 將包含新消息的數(shù)據(jù)返回給所有 hold 的 update 請求
此時(shí) chat.js 收到 update 回復(fù)后更新返回?cái)?shù)據(jù)在聊天室中,同時(shí)再進(jìn)行一次 update 請求, 然后又從 1. 開始執(zhí)行。
發(fā)送新消息的原理:
輸入消息, 點(diǎn)擊 post 按鈕, chat.js 獲取表單后用 ajax 方式發(fā)送請求 new
tornado 收到請求 new ,返回消息本身, 同時(shí)通知所有 hold 住的 update 請求 ( 這里也包括發(fā)送 new 請求的 chat.js 所發(fā)送的 update 請求 ) 返回新消息
所有在線的 chat.js 收到 update 請求回復(fù),更新返回信息到聊天室,同時(shí)再進(jìn)行一次 update 請求。
清楚了以上流程,我們直接來看 chatdemo.py :
def main(): parse_command_line() app = tornado.web.Application( [ (r"/", MainHandler), (r"/a/message/new", MessageNewHandler), (r"/a/message/updates", MessageUpdatesHandler), ], cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, debug=options.debug, ) app.listen(options.port) tornado.ioloop.IOLoop.current().start() if __name__ == "__main__": main()
main 函數(shù)主要用作初始化應(yīng)用、監(jiān)聽端口以及啟動 tornado server 。
我們看路由:
主頁對應(yīng) MainHandler
new 請求對應(yīng) MessageNewHandler
updates 請求對應(yīng) MessageUpdatesHandler
下面來看 MainHandler :
# Making this a non-singleton is left as an exercise for the reader. global_message_buffer = MessageBuffer() class MainHandler(tornado.web.RequestHandler): def get(self): self.render("index.html", messages=global_message_buffer.cache)
只有一行代碼,就是渲染并返回 index.html,渲染的附加信息就是 global_message_buffer 的所有緩存消息。 global_message_buffer 是 MessageBuffer 的一個(gè)實(shí)例。 我們先不關(guān)心 MessageBuffer 內(nèi)部是什么,現(xiàn)在我們只要記住它主要是用來儲存聊天消息和連接到此聊天室的人的信息的類。 其中 MessageBuffer().cache 就是保存聊天室所有聊天消息的結(jié)構(gòu)。
然后來看 MessageNewHandler :
class MessageNewHandler(tornado.web.RequestHandler): def post(self): message = { "id": str(uuid.uuid4()), "body": self.get_argument("body"), } # to_basestring is necessary for Python 3"s json encoder, # which doesn"t accept byte strings. message["html"] = tornado.escape.to_basestring( self.render_string("message.html", message=message)) if self.get_argument("next", None): self.redirect(self.get_argument("next")) else: self.write(message) global_message_buffer.new_messages([message])
同樣很簡單,從 post 信息里獲取發(fā)來的新消息 ( body ) ,然后給消息分配一個(gè)唯一的 uuid,接著把這段消息渲染成一段 html ,然后 self.write(message) 返回這段 html, 同時(shí)給 global_message_buffer ( MessageBuffer 的實(shí)例 ) 添加這條新信息。 這里其實(shí)我更傾向于返回 json 之類的數(shù)據(jù),這樣更加直觀和規(guī)范,可能寫 demo 的人考慮到讀者對 json 之類的協(xié)議可能不熟悉故而選擇了返回渲染好的 html 直接讓 chat.js append 到 index.html 里。
接著來看 MessageUpdatesHandler :
class MessageUpdatesHandler(tornado.web.RequestHandler): @gen.coroutine def post(self): cursor = self.get_argument("cursor", None) # Save the future returned by wait_for_messages so we can cancel # it in wait_for_messages self.future = global_message_buffer.wait_for_messages(cursor=cursor) messages = yield self.future if self.request.connection.stream.closed(): return self.write(dict(messages=messages)) def on_connection_close(self): global_message_buffer.cancel_wait(self.future)
重點(diǎn)就在這里,可以看到其內(nèi)部的 post 方法被 gen.coroutine 修飾器修飾,也就是說這個(gè) post 方法現(xiàn)在是 協(xié)程 ( coroutine ) 方式工作。 對于協(xié)程比較陌生的童鞋,你可以直接把它當(dāng)作是單線程解決 io ( 網(wǎng)絡(luò)請求 ) 密集運(yùn)算被阻塞而導(dǎo)致低效率的解決方案。 當(dāng)然這樣理解協(xié)程還比較籠統(tǒng),之后我會詳細(xì)寫一篇關(guān)于協(xié)程的文章,但在這里這樣理解是沒有問題的。
現(xiàn)在來看代碼內(nèi)容,首先獲取 cursor ,一個(gè)用來標(biāo)識我們已經(jīng)獲取的消息的指針,這樣 tornado 就不會把你已經(jīng)獲取的消息重復(fù)的發(fā)給你。 然后調(diào)用 global_message_buffer.wait_for_messages(cursor=cursor) 獲取一個(gè) future 對象。 future 對象是 tornado 實(shí)現(xiàn)的一個(gè)特殊的類的實(shí)例,它的作用就是包含之后 ( 未來 ) 將會返回的數(shù)據(jù),我們現(xiàn)在不用關(guān)心 Future() 內(nèi)部如何實(shí)現(xiàn),只要記住上面它的作用就行。 關(guān)于 Future 的解讀我會放到閱讀 Future 源碼時(shí)講。
然后看最關(guān)鍵的這句: messages = yield self.future 注意這里的 yield 就是 hold updates 請求的關(guān)鍵,它到這里相當(dāng)于暫停了整個(gè) post 函數(shù) ( updates 請求被 hold )同時(shí)也相當(dāng)于 updates 這次網(wǎng)絡(luò)請求被阻塞,這個(gè)時(shí)候協(xié)程發(fā)揮作用,把這個(gè)函數(shù)暫停的地方的所有信息保存掛起,然后把工作線程釋放,這樣 tornado 可以繼續(xù)接受 new、 updates 等請求然后運(yùn)行相應(yīng)的方法處理請求。
當(dāng)有新的消息返回時(shí),tornado 底層的 ioloop 實(shí)例將會調(diào)用 gen.send(value) 返回新消息( value )給每個(gè)被暫停的方法的 yield 處, 此時(shí)協(xié)程依次恢復(fù)這些被暫停的方法, 同時(shí)用獲得的返回消息繼續(xù)執(zhí)行方法, 這時(shí) messages = yield self.future 繼續(xù)執(zhí)行,messages 獲得 yield 的返回值 value ( python 中調(diào)用 gen.send(value) 將會把 value 值返回到 yield 處并替換原前 yield 后的值 ),然后判斷下用戶是否已經(jīng)離開,如果還在線則返回新消息。
明白了以上流程,我們最后來看 MessageBuffer:
class MessageBuffer(object): def __init__(self): self.waiters = set() self.cache = [] self.cache_size = 200 def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future def cancel_wait(self, future): self.waiters.remove(future) # Set an empty result to unblock any coroutines waiting. future.set_result([]) def new_messages(self, messages): logging.info("Sending new message to %r listeners", len(self.waiters)) for future in self.waiters: future.set_result(messages) self.waiters = set() self.cache.extend(messages) if len(self.cache) > self.cache_size: self.cache = self.cache[-self.cache_size:]
初始化方法中 self.waiters 就是一個(gè)等待新消息的 listener 集合 ( 直接理解成所有被 hold 住的 updates 請求隊(duì)列可能更清晰 )
self.cache 就是儲存所有聊天消息的列表,self.cache_size = 200 則定義了 cache 的大小 是存 200 條消息。
然后先來看簡單的 new_messages:
遍歷 waiters 列表,然后給所有的等待者返回新消息,同時(shí)清空等待者隊(duì)列。 然后把消息加到緩存 cache 里,如果緩存大于限制則取最新的 200 條消息。這里只要注意到 future.set_result(messages) 就是用來給 future 對象添加返回?cái)?shù)據(jù) ( 之前被 yield 暫停的地方此時(shí)因?yàn)?set_result() 方法將會獲得 "未來" 的數(shù)據(jù) ) 這一點(diǎn)即可。
然后來看 wait_for_messages :
def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future
首先初始化一個(gè) Future 對象,然后根據(jù) cursor 判斷哪些消息已經(jīng)獲取了哪些還沒獲取,如果緩存中有對于這個(gè) waiter 還沒獲取過的消息,則直接調(diào)用 set_result() 返回這些緩存中已有的但對于這個(gè) waiter 來說是新的的數(shù)據(jù)。 如果這個(gè) waiter 已經(jīng)有緩存中的所有數(shù)據(jù),那么就把它加到等待者隊(duì)列里保持等待,直到有新消息來時(shí)調(diào)用 new_messages 再返回。
而最后一個(gè) cancel_wait 就很簡單了,當(dāng)有用戶退出聊天室時(shí),直接從 self.waiters 中移除他所對應(yīng)的等待者。
當(dāng)明白了整個(gè)代碼的運(yùn)行流程后,我們可以基于這個(gè)簡單的 demo 而寫出更加豐富的例子,比如加入 session ,做登陸、做好友關(guān)系,做單聊做群聊等等。
chatdemo with room是我添加的一個(gè)簡單功能,輸入聊天室房號后再進(jìn)行聊天,只有同一房間中的人才能收到彼此的消息。
以上就是鄙人對整個(gè) chatdemo.py 的解讀。 在閱讀此 demo 時(shí),我沒有參考其他源碼解讀,只是通過閱讀 tornado 底層的源碼而得出的個(gè)人的理解,因此肯定會有很多理解不成熟甚至錯(cuò)誤的地方,還望大家多多指教。
原文地址
作者:rapospectre
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38035.html
摘要:也就是說用于設(shè)定與處理類間的映射關(guān)系。在中,默認(rèn)使用和函數(shù)分別處理兩種請求。因?yàn)楸韱稳蕴峤坏疆?dāng)前頁面,所以還是由處理。載入時(shí)間相關(guān)的的一個(gè)類,獲取當(dāng)前時(shí)間戳。獲取數(shù)據(jù)庫中的名為的。 前面的話 Demo1是一個(gè)簡單的博客系統(tǒng)(=。=什么網(wǎng)站都叫系統(tǒng))。我們從這個(gè)簡單的系統(tǒng)入手,去了解P+T+M網(wǎng)站的內(nèi)部邏輯,并記住一些規(guī)則,方便我們進(jìn)一步自己開發(fā)。 規(guī)則這個(gè)詞特意打上了雙引號,目的是...
摘要:學(xué)習(xí)筆記七數(shù)學(xué)形態(tài)學(xué)關(guān)注的是圖像中的形狀,它提供了一些方法用于檢測形狀和改變形狀。學(xué)習(xí)筆記十一尺度不變特征變換,簡稱是圖像局部特征提取的現(xiàn)代方法基于區(qū)域圖像塊的分析。本文的目的是簡明扼要地說明的編碼機(jī)制,并給出一些建議。 showImg(https://segmentfault.com/img/bVRJbz?w=900&h=385); 前言 開始之前,我們先來看這樣一個(gè)提問: pyth...
摘要:在被收購之后,維護(hù)并繼續(xù)發(fā)展。設(shè)置是告訴應(yīng)用在目錄尋找應(yīng)用模板。設(shè)置告訴應(yīng)用使用目錄里面的類似圖像文件等靜態(tài)文件。我們會在應(yīng)用開發(fā)過程中,保持著調(diào)試器在后臺運(yùn)行。這能提供高效的開發(fā)環(huán)境。我們會把回應(yīng)狀態(tài)設(shè)為已創(chuàng)建。 編者注:我們發(fā)現(xiàn)了有趣的系列文章《30天學(xué)習(xí)30種新技術(shù)》,正在翻譯,一天一篇更新,年終禮包。下面是第23天的內(nèi)容。 今天的《30天學(xué)習(xí)30種新技術(shù)》,我決定暫時(shí)放下...
摘要:主要是為了實(shí)現(xiàn)系統(tǒng)之間的雙向解耦而實(shí)現(xiàn)的。問題及優(yōu)化隊(duì)列過長問題使用上述方案的異步非阻塞可能會依賴于的任務(wù)隊(duì)列長度,若隊(duì)列中的任務(wù)過多,則可能導(dǎo)致長時(shí)間等待,降低效率。 Tornado和Celery介紹 1.Tornado Tornado是一個(gè)用python編寫的一個(gè)強(qiáng)大的、可擴(kuò)展的異步HTTP服務(wù)器,同時(shí)也是一個(gè)web開發(fā)框架。tornado是一個(gè)非阻塞式web服務(wù)器,其速度相當(dāng)快。...
閱讀 2032·2023-04-25 22:50
閱讀 2845·2021-09-29 09:35
閱讀 3400·2021-07-29 10:20
閱讀 3169·2019-08-29 13:57
閱讀 3368·2019-08-29 13:50
閱讀 3043·2019-08-26 12:10
閱讀 3538·2019-08-23 18:41
閱讀 2646·2019-08-23 18:01