摘要:上一篇我們介紹了包,以及如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)。倒排索引保存在本地一個(gè)名為的文件中。運(yùn)行示例如下這個(gè)模塊沒有使用并發(fā),主要作用是為使用包編寫的服務(wù)器提供支持。
async/await語法asyncio 上一篇我們介紹了 asyncio 包,以及如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)。在這一篇,我們主要介紹使用 asyncio 包編程的兩個(gè)例子。
我們先介紹下 async/await 語法,要不然看完這篇可能會(huì)困惑,為什么之前使用 asyncio.coroutine 裝飾器 和 yield from,這里都是 用的 async 和 await?
python并發(fā)2:使用asyncio處理并發(fā)
async/await 是Python3.5 的新語法,語法如下:
async def read_data(db): pass
async 是明確將函數(shù)聲明為協(xié)程的關(guān)鍵字,即使沒有await表達(dá)式,函數(shù)執(zhí)行也會(huì)返回一個(gè)協(xié)程對象。
在協(xié)程函數(shù)內(nèi)部,可以在某個(gè)表達(dá)式之前使用 await 關(guān)鍵字來暫停協(xié)程的執(zhí)行,以等待某協(xié)程完成:
async def read_data(db): data = await db.fetch("SELECT ...")
這個(gè)代碼如果使用 asyncio.coroutine 裝飾器語法為:
@asyncio.coroutine def read_data(db): data = yield from db.fetch("SELECT ...")
這兩段代碼執(zhí)行的結(jié)果是一樣的,也就是說 可以把 asyncio.coroutine 替換為 async, yield from 替換為 await。
使用新的語法有什么好處呢:
使生成器和協(xié)程的概念更容易理解,因?yàn)檎Z法不同
可以消除由于重構(gòu)時(shí)不小心移出協(xié)程中yield 聲明而導(dǎo)致的不明確錯(cuò)誤,這回導(dǎo)致協(xié)程變成普通的生成器。
使用 asyncio 包編寫服務(wù)器這個(gè)例子主要是使用 asyncio 包 和 unicodedata 模塊,實(shí)現(xiàn)通過規(guī)范名稱查找Unicode 字符。
我們先來看一下代碼:
# charfinder.py import sys import re import unicodedata import pickle import warnings import itertools import functools from collections import namedtuple RE_WORD = re.compile("w+") RE_UNICODE_NAME = re.compile("^[A-Z0-9 -]+$") RE_CODEPOINT = re.compile("U+[0-9A-F]{4, 6}") INDEX_NAME = "charfinder_index.pickle" MINIMUM_SAVE_LEN = 10000 CJK_UNI_PREFIX = "CJK UNIFIED IDEOGRAPH" CJK_CMP_PREFIX = "CJK COMPATIBILITY IDEOGRAPH" sample_chars = [ "$", # DOLLAR SIGN "A", # LATIN CAPITAL LETTER A "a", # LATIN SMALL LETTER A "u20a0", # EURO-CURRENCY SIGN "u20ac", # EURO SIGN ] CharDescription = namedtuple("CharDescription", "code_str char name") QueryResult = namedtuple("QueryResult", "count items") def tokenize(text): """ :param text: :return: return iterable of uppercased words """ for match in RE_WORD.finditer(text): yield match.group().upper() def query_type(text): text_upper = text.upper() if "U+" in text_upper: return "CODEPOINT" elif RE_UNICODE_NAME.match(text_upper): return "NAME" else: return "CHARACTERS" class UnicodeNameIndex: # unicode name 索引類 def __init__(self, chars=None): self.load(chars) def load(self, chars=None): # 加載 unicode name self.index = None if chars is None: try: with open(INDEX_NAME, "rb") as fp: self.index = pickle.load(fp) except OSError: pass if self.index is None: self.build_index(chars) if len(self.index) > MINIMUM_SAVE_LEN: try: self.save() except OSError as exc: warnings.warn("Could not save {!r}: {}" .format(INDEX_NAME, exc)) def save(self): with open(INDEX_NAME, "wb") as fp: pickle.dump(self.index, fp) def build_index(self, chars=None): if chars is None: chars = (chr(i) for i in range(32, sys.maxunicode)) index = {} for char in chars: try: name = unicodedata.name(char) except ValueError: continue if name.startswith(CJK_UNI_PREFIX): name = CJK_UNI_PREFIX elif name.startswith(CJK_CMP_PREFIX): name = CJK_CMP_PREFIX for word in tokenize(name): index.setdefault(word, set()).add(char) self.index = index def word_rank(self, top=None): # (len(self.index[key], key) 是一個(gè)生成器,需要用list 轉(zhuǎn)成列表,要不然下邊排序會(huì)報(bào)錯(cuò) res = [list((len(self.index[key], key)) for key in self.index)] res.sort(key=lambda item: (-item[0], item[1])) if top is not None: res = res[:top] return res def word_report(self, top=None): for postings, key in self.word_rank(top): print("{:5} {}".format(postings, key)) def find_chars(self, query, start=0, stop=None): stop = sys.maxsize if stop is None else stop result_sets = [] for word in tokenize(query): # tokenize 是query 的生成器 a b 會(huì)是 ["a", "b"] 的生成器 chars = self.index.get(word) if chars is None: result_sets = [] break result_sets.append(chars) if not result_sets: return QueryResult(0, ()) result = functools.reduce(set.intersection, result_sets) result = sorted(result) # must sort to support start, stop result_iter = itertools.islice(result, start, stop) return QueryResult(len(result), (char for char in result_iter)) def describe(self, char): code_str = "U+{:04X}".format(ord(char)) name = unicodedata.name(char) return CharDescription(code_str, char, name) def find_descriptions(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe(char) def get_descriptions(self, chars): for char in chars: yield self.describe(char) def describe_str(self, char): return "{:7} {} {}".format(*self.describe(char)) def find_description_strs(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe_str(char) @staticmethod # not an instance method due to concurrency def status(query, counter): if counter == 0: msg = "No match" elif counter == 1: msg = "1 match" else: msg = "{} matches".format(counter) return "{} for {!r}".format(msg, query) def main(*args): index = UnicodeNameIndex() query = " ".join(args) n = 0 for n, line in enumerate(index.find_description_strs(query), 1): print(line) print("({})".format(index.status(query, n))) if __name__ == "__main__": if len(sys.argv) > 1: main(*sys.argv[1:]) else: print("Usage: {} word1 [word2]...".format(sys.argv[0]))
這個(gè)模塊讀取Python內(nèi)建的Unicode數(shù)據(jù)庫,為每個(gè)字符名稱中的每個(gè)單詞建立索引,然后倒排索引,存入一個(gè)字典。
例如,在倒排索引中,"SUN" 鍵對應(yīng)的條目是一個(gè)集合,里面是名稱中包含"SUN" 這個(gè)詞的10個(gè)Unicode字符。倒排索引保存在本地一個(gè)名為charfinder_index.pickle 的文件中。如果查詢多個(gè)單詞,會(huì)計(jì)算從索引中所得集合的交集。
運(yùn)行示例如下:
>>> main("rook") # doctest: +NORMALIZE_WHITESPACE U+2656 ? WHITE CHESS ROOK U+265C ? BLACK CHESS ROOK (2 matches for "rook") >>> main("rook", "black") # doctest: +NORMALIZE_WHITESPACE U+265C ? BLACK CHESS ROOK (1 match for "rook black") >>> main("white bishop") # doctest: +NORMALIZE_WHITESPACE U+2657 ? WHITE CHESS BISHOP (1 match for "white bishop") >>> main("jabberwocky"s vest") (No match for "jabberwocky"s vest")
這個(gè)模塊沒有使用并發(fā),主要作用是為使用 asyncio 包編寫的服務(wù)器提供支持。
下面我們來看下 tcp_charfinder.py 腳本:
# tcp_charfinder.py import sys import asyncio # 用于構(gòu)建索引,提供查詢方法 from charfinder import UnicodeNameIndex CRLF = b" " PROMPT = b"?> " # 實(shí)例化UnicodeNameIndex 類,它會(huì)使用charfinder_index.pickle 文件 index = UnicodeNameIndex() async def handle_queries(reader, writer): # 這個(gè)協(xié)程要傳給asyncio.start_server 函數(shù),接收的兩個(gè)參數(shù)是asyncio.StreamReader 對象和 asyncio.StreamWriter 對象 while True: # 這個(gè)循環(huán)處理會(huì)話,直到從客戶端收到控制字符后退出 writer.write(PROMPT) # can"t await! # 這個(gè)方法不是協(xié)程,只是普通函數(shù);這一行發(fā)送 ?> 提示符 await writer.drain() # must await! # 這個(gè)方法刷新writer 緩沖;因?yàn)樗菂f(xié)程,所以要用 await data = await reader.readline() # 這個(gè)方法也是協(xié)程,返回一個(gè)bytes對象,也要用await try: query = data.decode().strip() except UnicodeDecodeError: # Telenet 客戶端發(fā)送控制字符時(shí),可能會(huì)拋出UnicodeDecodeError異常 # 我們這里默認(rèn)發(fā)送空字符 query = "x00" client = writer.get_extra_info("peername") # 返回套接字連接的遠(yuǎn)程地址 print("Received from {}: {!r}".format(client, query)) # 在控制臺(tái)打印查詢記錄 if query: if ord(query[:1]) < 32: # 如果收到控制字符或者空字符,退出循環(huán) break # 返回一個(gè)生成器,產(chǎn)出包含Unicode 碼位、真正的字符和字符名稱的字符串 lines = list(index.find_description_strs(query)) if lines: # 使用默認(rèn)的UTF-8 編碼把lines 轉(zhuǎn)換成bytes 對象,并在每一行末添加回車符合換行符 # 參數(shù)列表是一個(gè)生成器 writer.writelines(line.encode() + CRLF for line in lines) writer.write(index.status(query, len(lines)).encode() + CRLF) # 輸出狀態(tài) await writer.drain() # 刷新輸出緩沖 print("Sent {} results".format(len(lines))) # 在服務(wù)器控制臺(tái)記錄響應(yīng) print("Close the client socket") # 在控制臺(tái)記錄會(huì)話結(jié)束 writer.close() # 關(guān)閉StreamWriter流 def main(address="127.0.0.1", port=2323): # 添加默認(rèn)地址和端口,所以調(diào)用默認(rèn)可以不加參數(shù) port = int(port) loop = asyncio.get_event_loop() # asyncio.start_server 協(xié)程運(yùn)行結(jié)束后, # 返回的協(xié)程對象返回一個(gè)asyncio.Server 實(shí)例,即一個(gè)TCP套接字服務(wù)器 server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro) # 驅(qū)動(dòng)server_coro 協(xié)程,啟動(dòng)服務(wù)器 host = server.sockets[0].getsockname() # 獲得這個(gè)服務(wù)器的第一個(gè)套接字的地址和端口 print("Serving on {}. Hit CTRL-C to stop.".format(host)) # 在控制臺(tái)中顯示地址和端口 try: loop.run_forever() # 運(yùn)行事件循環(huán) main 函數(shù)在這里阻塞,直到服務(wù)器的控制臺(tái)中按CTRL-C 鍵 except KeyboardInterrupt: # CTRL+C pressed pass print("Server shutting down.") server.close() # server.wait_closed返回一個(gè) future # 調(diào)用loop.run_until_complete 方法,運(yùn)行 future loop.run_until_complete(server.wait_closed()) loop.close() # 終止事件循環(huán) if __name__ == "__main__": main(*sys.argv[1:])
運(yùn)行 tcp_charfinders.py
python tcp_charfinders.py
打開終端,使用 telnet 命令請求服務(wù),運(yùn)行結(jié)果如下所示:
main 函數(shù)幾乎會(huì)立即顯示 Serving on... 消息,然后在調(diào)用loop.run_forever() 方法時(shí)阻塞。這時(shí),控制權(quán)流動(dòng)到事件循環(huán)中,而且一直等待,偶爾會(huì)回到handle_queries 協(xié)程,這個(gè)協(xié)程需要等待網(wǎng)絡(luò)發(fā)送或接收數(shù)據(jù)時(shí),控制權(quán)又交給事件循環(huán)。
handle_queries 協(xié)程可以處理多個(gè)客戶端發(fā)來的多次請求。只要有新客戶端連接服務(wù)器,就會(huì)啟動(dòng)一個(gè)handle_queries 協(xié)程實(shí)例。
handle_queries 的I/O操作都是使用bytes格式。我們從網(wǎng)絡(luò)得到的數(shù)據(jù)要解碼,發(fā)出去的數(shù)據(jù)也要編碼
asyncio包提供了高層的流API,提供了現(xiàn)成的服務(wù)器,我們只需要實(shí)現(xiàn)一個(gè)處理程序。詳細(xì)信息可以查看文檔:https://docs.python.org/3/library/asyncio-stream.html
雖然,asyncio包提供了服務(wù)器,但是功能相對來說還是比較簡陋的,現(xiàn)在我們使用一下 基于asyncio包的 web 框架 sanci,用它來實(shí)現(xiàn)一個(gè)http版的簡易服務(wù)器
使用 sanic 包編寫web 服務(wù)器sanic 的簡單入門在上一篇文章有介紹,python web 框架 Sanci 快速入門
Sanic 是一個(gè)和類Flask 的基于Python3.5+的web框架,提供了比較高階的API,比如路由、request參數(shù),response等,我們只需要實(shí)現(xiàn)處理邏輯即可。
下邊是使用 sanic 實(shí)現(xiàn)的簡易的 字符查詢http web 服務(wù):
from sanic import Sanic from sanic import response from charfinder import UnicodeNameIndex app = Sanic() index = UnicodeNameIndex() html_temp = "{char}
" @app.route("/charfinder") # app.route 函數(shù)的第一個(gè)參數(shù)是url path,我們這里指定路徑是charfinder async def charfinder(request): # request.args 可以取到url 的查詢參數(shù) # ?key1=value1&key2=value2 的結(jié)果是 {"key1": ["value1"], "key2": ["value2"]} # 我們這里支持傳入多個(gè)查詢參數(shù),所以這里使用 request.args.getlist("char") # 如果我們 使用 request.args.get("char") 只能取到第一個(gè)參數(shù) query = request.args.getlist("char") query = " ".join(query) lines = list(index.find_description_strs(query)) # 將得到的結(jié)果生成html html = " ".join([html_temp.format(char=line) for line in lines]) return response.html(html) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000) # 設(shè)置服務(wù)器運(yùn)行地址和端口號(hào)
對比兩段代碼可以發(fā)現(xiàn),使用 sanic 非常簡單。
運(yùn)行服務(wù):
python http_charsfinder.py
我們在瀏覽器輸入地址 http://0.0.0.0:8000/charfinde... 結(jié)果示例如下
現(xiàn)在對比下兩段代碼在TCP 的示例中,服務(wù)器通過main函數(shù)下的這兩行代碼創(chuàng)建并排定運(yùn)行時(shí)間:
server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro)
而在sanic的HTTP示例中,使用,創(chuàng)建服務(wù)器:
app.run(host="0.0.0.0", port=8000)
這兩個(gè)看起來運(yùn)行方式完全不同,但如果我們翻開sanic的源碼會(huì)看到 app.run() 內(nèi)部是調(diào)用 的 server_coroutine = loop.create_server()創(chuàng)建服務(wù)器,
server_coroutine 是通過 loop.run_until_complete()驅(qū)動(dòng)的。
所以說,為了啟動(dòng)服務(wù)器,這兩個(gè)都是由 loop.run_until_complete 驅(qū)動(dòng),完成運(yùn)行的。只不過 sanic 封裝了run 方法,使得使用更加方便。
這里可以得到一個(gè)基本事實(shí):只有驅(qū)動(dòng)協(xié)程,協(xié)程才能做事,而驅(qū)動(dòng) asyncio.coroutine 裝飾的協(xié)程有兩種方式,使用 yield from 或者傳給asyncio 包中某個(gè)參數(shù)為協(xié)程或future的函數(shù),例如 run_until_complete
現(xiàn)在如果你搜索 cjk,會(huì)得到7萬多條數(shù)據(jù)3M 的一個(gè)html文件,耗時(shí)大約2s,這如果是生產(chǎn)服務(wù)的一個(gè)請求,耗時(shí)2s是不能接收的,我們可以使用分頁,這樣我們可以每次只取200條數(shù)據(jù),當(dāng)用戶想看更多數(shù)據(jù)時(shí)再使用 ajax 或者 websockets發(fā)送下一批數(shù)據(jù)。
這一篇我們使用 asyncio 包實(shí)現(xiàn)了TCP服務(wù)器,使用sanic(基于asyncio sanic 默認(rèn)使用 uvloop替代asyncio)實(shí)現(xiàn)了HTTP服務(wù)器,用于按名稱搜索Unicode 字符。但是并沒有涉及服務(wù)器并發(fā)部分,這部分可以以后再討論。
參考鏈接這一篇還是 《流暢的python》asyncio 一章的讀書筆記,下一篇將是python并發(fā)的第三篇,《使用線程處理并發(fā)》。
Python 3.5將支持Async/Await異步編程:http://www.infoq.com/cn/news/2015/05/python-async-await
python web 框架 Sanci 快速入門
python并發(fā)2:使用asyncio處理并發(fā)
最后,感謝女朋友支持。
>歡迎關(guān)注 | >請我喝芬達(dá) |
---|---|
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38648.html
摘要:并發(fā)用于制定方案,用來解決可能但未必并行的問題。在協(xié)程中使用需要注意兩點(diǎn)使用鏈接的多個(gè)協(xié)程最終必須由不是協(xié)程的調(diào)用方驅(qū)動(dòng),調(diào)用方顯式或隱式在最外層委派生成器上調(diào)用函數(shù)或方法。對象可以取消取消后會(huì)在協(xié)程當(dāng)前暫停的處拋出異常。 導(dǎo)語:本文章記錄了本人在學(xué)習(xí)Python基礎(chǔ)之控制流程篇的重點(diǎn)知識(shí)及個(gè)人心得,打算入門Python的朋友們可以來一起學(xué)習(xí)并交流。 本文重點(diǎn): 1、了解asyncio...
摘要:具有以下基本同步原語子進(jìn)程提供了通過創(chuàng)建和管理子進(jìn)程的。雖然隊(duì)列不是線程安全的,但它們被設(shè)計(jì)為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準(zhǔn)庫。通過上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個(gè)版本,asyncio又做了比較大的調(diào)整,把這個(gè)庫的API分為了 高層級API和...
摘要:是之后引入的標(biāo)準(zhǔn)庫的,這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。沒有能從外部終止線程,因?yàn)榫€程隨時(shí)可能被中斷。上一篇并發(fā)使用處理并發(fā)我們介紹過的,在中,只是調(diào)度執(zhí)行某物的結(jié)果。 asyncio asyncio 是Python3.4 之后引入的標(biāo)準(zhǔn)庫的,這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。asyncio 包在引入標(biāo)準(zhǔn)庫之前代號(hào) Tulip(郁金香),所以在網(wǎng)上搜索資料時(shí),會(huì)經(jīng)??吹竭@種花的...
摘要:我們以請求網(wǎng)絡(luò)服務(wù)為例,來實(shí)際測試一下加入多線程之后的效果。所以,執(zhí)行密集型操作時(shí),多線程是有用的,對于密集型操作,則每次只能使用一個(gè)線程。說到這里,對于密集型,可以使用多線程或者多進(jìn)程來提高效率。 為了提高系統(tǒng)密集型運(yùn)算的效率,我們常常會(huì)使用到多個(gè)進(jìn)程或者是多個(gè)線程,python中的Threading包實(shí)現(xiàn)了線程,multiprocessing 包則實(shí)現(xiàn)了多進(jìn)程。而在3.2版本的py...
摘要:創(chuàng)建第一個(gè)協(xié)程推薦使用語法來聲明協(xié)程,來編寫異步應(yīng)用程序。協(xié)程兩個(gè)緊密相關(guān)的概念是協(xié)程函數(shù)通過定義的函數(shù)協(xié)程對象調(diào)用協(xié)程函數(shù)返回的對象。它是一個(gè)低層級的可等待對象,表示一個(gè)異步操作的最終結(jié)果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...
閱讀 3026·2020-01-08 12:17
閱讀 2000·2019-08-30 15:54
閱讀 1157·2019-08-30 15:52
閱讀 2043·2019-08-29 17:18
閱讀 1053·2019-08-29 15:34
閱讀 2466·2019-08-27 10:58
閱讀 1868·2019-08-26 12:24
閱讀 377·2019-08-23 18:23