成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

python并發(fā)3:使用asyncio編寫服務(wù)器

rollback / 2397人閱讀

摘要:上一篇我們介紹了包,以及如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)。倒排索引保存在本地一個(gè)名為的文件中。運(yùn)行示例如下這個(gè)模塊沒有使用并發(fā),主要作用是為使用包編寫的服務(wù)器提供支持。

asyncio 上一篇我們介紹了 asyncio 包,以及如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)。在這一篇,我們主要介紹使用 asyncio 包編程的兩個(gè)例子。

async/await語法

我們先介紹下 async/await 語法,要不然看完這篇可能會(huì)困惑,為什么之前使用 asyncio.coroutine 裝飾器 和 yield from,這里都是 用的 async 和 await?

python并發(fā)2:使用asyncio處理并發(fā)

async/await 是Python3.5 的新語法,語法如下:

async def read_data(db):
    pass

async 是明確將函數(shù)聲明為協(xié)程的關(guān)鍵字,即使沒有await表達(dá)式,函數(shù)執(zhí)行也會(huì)返回一個(gè)協(xié)程對象。
在協(xié)程函數(shù)內(nèi)部,可以在某個(gè)表達(dá)式之前使用 await 關(guān)鍵字來暫停協(xié)程的執(zhí)行,以等待某協(xié)程完成:

async def read_data(db):
    data = await db.fetch("SELECT ...")

這個(gè)代碼如果使用 asyncio.coroutine 裝飾器語法為:

@asyncio.coroutine
def read_data(db):
    data = yield from db.fetch("SELECT ...")

這兩段代碼執(zhí)行的結(jié)果是一樣的,也就是說 可以把 asyncio.coroutine 替換為 async, yield from 替換為 await。

使用新的語法有什么好處呢:

使生成器和協(xié)程的概念更容易理解,因?yàn)檎Z法不同

可以消除由于重構(gòu)時(shí)不小心移出協(xié)程中yield 聲明而導(dǎo)致的不明確錯(cuò)誤,這回導(dǎo)致協(xié)程變成普通的生成器。

使用 asyncio 包編寫服務(wù)器

這個(gè)例子主要是使用 asyncio 包 和 unicodedata 模塊,實(shí)現(xiàn)通過規(guī)范名稱查找Unicode 字符。

我們先來看一下代碼:

# charfinder.py
import sys
import re
import unicodedata
import pickle
import warnings
import itertools
import functools
from collections import namedtuple

RE_WORD = re.compile("w+")
RE_UNICODE_NAME = re.compile("^[A-Z0-9 -]+$")
RE_CODEPOINT = re.compile("U+[0-9A-F]{4, 6}")

INDEX_NAME = "charfinder_index.pickle"
MINIMUM_SAVE_LEN = 10000
CJK_UNI_PREFIX = "CJK UNIFIED IDEOGRAPH"
CJK_CMP_PREFIX = "CJK COMPATIBILITY IDEOGRAPH"

sample_chars = [
    "$",  # DOLLAR SIGN
    "A",  # LATIN CAPITAL LETTER A
    "a",  # LATIN SMALL LETTER A
    "u20a0",  # EURO-CURRENCY SIGN
    "u20ac",  # EURO SIGN
]

CharDescription = namedtuple("CharDescription", "code_str char name")

QueryResult = namedtuple("QueryResult", "count items")


def tokenize(text):
    """
    :param text: 
    :return: return iterable of uppercased words 
    """
    for match in RE_WORD.finditer(text):
        yield match.group().upper()


def query_type(text):
    text_upper = text.upper()
    if "U+" in text_upper:
        return "CODEPOINT"
    elif RE_UNICODE_NAME.match(text_upper):
        return "NAME"
    else:
        return "CHARACTERS"


class UnicodeNameIndex:
    # unicode name 索引類

    def __init__(self, chars=None):
        self.load(chars)

    def load(self, chars=None):
        # 加載 unicode name    
        self.index = None
        if chars is None:
            try:
                with open(INDEX_NAME, "rb") as fp:
                    self.index = pickle.load(fp)
            except OSError:
                pass
        if self.index is None:
            self.build_index(chars)
        if len(self.index) > MINIMUM_SAVE_LEN:
            try:
                self.save()
            except OSError as exc:
                warnings.warn("Could not save {!r}: {}"
                              .format(INDEX_NAME, exc))

    def save(self):
        with open(INDEX_NAME, "wb") as fp:
            pickle.dump(self.index, fp)

    def build_index(self, chars=None):
        if chars is None:
            chars = (chr(i) for i in range(32, sys.maxunicode))
        index = {}
        for char in chars:
            try:
                name = unicodedata.name(char)
            except ValueError:
                continue
            if name.startswith(CJK_UNI_PREFIX):
                name = CJK_UNI_PREFIX
            elif name.startswith(CJK_CMP_PREFIX):
                name = CJK_CMP_PREFIX

            for word in tokenize(name):
                index.setdefault(word, set()).add(char)

        self.index = index

    def word_rank(self, top=None):
        # (len(self.index[key], key) 是一個(gè)生成器,需要用list 轉(zhuǎn)成列表,要不然下邊排序會(huì)報(bào)錯(cuò)
        res = [list((len(self.index[key], key)) for key in self.index)]
        res.sort(key=lambda  item: (-item[0], item[1]))
        if top is not None:
            res = res[:top]
        return res

    def word_report(self, top=None):
        for postings, key in self.word_rank(top):
            print("{:5} {}".format(postings, key))

    def find_chars(self, query, start=0, stop=None):
        stop = sys.maxsize if stop is None else stop
        result_sets = []
        for word in tokenize(query):
            # tokenize 是query 的生成器 a b 會(huì)是 ["a", "b"] 的生成器
            chars = self.index.get(word)
            if chars is None:
                result_sets = []
                break
            result_sets.append(chars)

        if not result_sets:
            return QueryResult(0, ())

        result = functools.reduce(set.intersection, result_sets)
        result = sorted(result)  # must sort to support start, stop
        result_iter = itertools.islice(result, start, stop)
        return QueryResult(len(result),
                           (char for char in result_iter))

    def describe(self, char):
        code_str = "U+{:04X}".format(ord(char))
        name = unicodedata.name(char)
        return CharDescription(code_str, char, name)

    def find_descriptions(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe(char)

    def get_descriptions(self, chars):
        for char in chars:
            yield self.describe(char)

    def describe_str(self, char):
        return "{:7}	{}	{}".format(*self.describe(char))

    def find_description_strs(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe_str(char)

    @staticmethod  # not an instance method due to concurrency
    def status(query, counter):
        if counter == 0:
            msg = "No match"
        elif counter == 1:
            msg = "1 match"
        else:
            msg = "{} matches".format(counter)
        return "{} for {!r}".format(msg, query)

def main(*args):
    index = UnicodeNameIndex()
    query = " ".join(args)
    n = 0
    for n, line in enumerate(index.find_description_strs(query), 1):
        print(line)
    print("({})".format(index.status(query, n)))


if __name__ == "__main__":
    if len(sys.argv) > 1:
        main(*sys.argv[1:])
    else:
        print("Usage: {} word1 [word2]...".format(sys.argv[0]))

這個(gè)模塊讀取Python內(nèi)建的Unicode數(shù)據(jù)庫,為每個(gè)字符名稱中的每個(gè)單詞建立索引,然后倒排索引,存入一個(gè)字典。
例如,在倒排索引中,"SUN" 鍵對應(yīng)的條目是一個(gè)集合,里面是名稱中包含"SUN" 這個(gè)詞的10個(gè)Unicode字符。倒排索引保存在本地一個(gè)名為charfinder_index.pickle 的文件中。如果查詢多個(gè)單詞,會(huì)計(jì)算從索引中所得集合的交集。
運(yùn)行示例如下:

    >>> main("rook")  # doctest: +NORMALIZE_WHITESPACE
    U+2656  ?  WHITE CHESS ROOK
    U+265C  ?  BLACK CHESS ROOK
    (2 matches for "rook")
    >>> main("rook", "black")  # doctest: +NORMALIZE_WHITESPACE
    U+265C  ?  BLACK CHESS ROOK
    (1 match for "rook black")
    >>> main("white bishop")  # doctest: +NORMALIZE_WHITESPACE
    U+2657  ?   WHITE CHESS BISHOP
    (1 match for "white bishop")
    >>> main("jabberwocky"s vest")
    (No match for "jabberwocky"s vest")

這個(gè)模塊沒有使用并發(fā),主要作用是為使用 asyncio 包編寫的服務(wù)器提供支持。
下面我們來看下 tcp_charfinder.py 腳本:

# tcp_charfinder.py
import sys
import asyncio

# 用于構(gòu)建索引,提供查詢方法
from charfinder import UnicodeNameIndex

CRLF = b"
"
PROMPT = b"?> "

# 實(shí)例化UnicodeNameIndex 類,它會(huì)使用charfinder_index.pickle 文件
index = UnicodeNameIndex()

async def handle_queries(reader, writer):
    # 這個(gè)協(xié)程要傳給asyncio.start_server 函數(shù),接收的兩個(gè)參數(shù)是asyncio.StreamReader 對象和 asyncio.StreamWriter 對象
    while True:  # 這個(gè)循環(huán)處理會(huì)話,直到從客戶端收到控制字符后退出
        writer.write(PROMPT)  # can"t await!  # 這個(gè)方法不是協(xié)程,只是普通函數(shù);這一行發(fā)送 ?> 提示符
        await writer.drain()  # must await!  # 這個(gè)方法刷新writer 緩沖;因?yàn)樗菂f(xié)程,所以要用 await
        data = await reader.readline()  # 這個(gè)方法也是協(xié)程,返回一個(gè)bytes對象,也要用await
        try:
            query = data.decode().strip()
        except UnicodeDecodeError:
            # Telenet 客戶端發(fā)送控制字符時(shí),可能會(huì)拋出UnicodeDecodeError異常
            # 我們這里默認(rèn)發(fā)送空字符
            query = "x00"
        client = writer.get_extra_info("peername")  # 返回套接字連接的遠(yuǎn)程地址
        print("Received from {}: {!r}".format(client, query))  # 在控制臺(tái)打印查詢記錄
        if query:
            if ord(query[:1]) < 32:  # 如果收到控制字符或者空字符,退出循環(huán)
                break
            # 返回一個(gè)生成器,產(chǎn)出包含Unicode 碼位、真正的字符和字符名稱的字符串
            lines = list(index.find_description_strs(query)) 
            if lines:
                # 使用默認(rèn)的UTF-8 編碼把lines    轉(zhuǎn)換成bytes 對象,并在每一行末添加回車符合換行符
                # 參數(shù)列表是一個(gè)生成器
                writer.writelines(line.encode() + CRLF for line in lines) 
            writer.write(index.status(query, len(lines)).encode() + CRLF) # 輸出狀態(tài)

            await writer.drain()  # 刷新輸出緩沖
            print("Sent {} results".format(len(lines)))  # 在服務(wù)器控制臺(tái)記錄響應(yīng)

    print("Close the client socket")  # 在控制臺(tái)記錄會(huì)話結(jié)束
    writer.close()  # 關(guān)閉StreamWriter流



def main(address="127.0.0.1", port=2323):  # 添加默認(rèn)地址和端口,所以調(diào)用默認(rèn)可以不加參數(shù)
    port = int(port)
    loop = asyncio.get_event_loop()
    # asyncio.start_server 協(xié)程運(yùn)行結(jié)束后,
    # 返回的協(xié)程對象返回一個(gè)asyncio.Server 實(shí)例,即一個(gè)TCP套接字服務(wù)器
    server_coro = asyncio.start_server(handle_queries, address, port,
                                loop=loop) 
    server = loop.run_until_complete(server_coro) # 驅(qū)動(dòng)server_coro 協(xié)程,啟動(dòng)服務(wù)器

    host = server.sockets[0].getsockname()  # 獲得這個(gè)服務(wù)器的第一個(gè)套接字的地址和端口
    print("Serving on {}. Hit CTRL-C to stop.".format(host))  # 在控制臺(tái)中顯示地址和端口
    try:
        loop.run_forever()  # 運(yùn)行事件循環(huán) main 函數(shù)在這里阻塞,直到服務(wù)器的控制臺(tái)中按CTRL-C 鍵
    except KeyboardInterrupt:  # CTRL+C pressed
        pass

    print("Server shutting down.")
    server.close()
    # server.wait_closed返回一個(gè) future
    # 調(diào)用loop.run_until_complete 方法,運(yùn)行 future
    loop.run_until_complete(server.wait_closed())  
    loop.close()  # 終止事件循環(huán)


if __name__ == "__main__":
    main(*sys.argv[1:])

運(yùn)行 tcp_charfinders.py

python tcp_charfinders.py

打開終端,使用 telnet 命令請求服務(wù),運(yùn)行結(jié)果如下所示:

main 函數(shù)幾乎會(huì)立即顯示 Serving on... 消息,然后在調(diào)用loop.run_forever() 方法時(shí)阻塞。這時(shí),控制權(quán)流動(dòng)到事件循環(huán)中,而且一直等待,偶爾會(huì)回到handle_queries 協(xié)程,這個(gè)協(xié)程需要等待網(wǎng)絡(luò)發(fā)送或接收數(shù)據(jù)時(shí),控制權(quán)又交給事件循環(huán)。

handle_queries 協(xié)程可以處理多個(gè)客戶端發(fā)來的多次請求。只要有新客戶端連接服務(wù)器,就會(huì)啟動(dòng)一個(gè)handle_queries 協(xié)程實(shí)例。

handle_queries 的I/O操作都是使用bytes格式。我們從網(wǎng)絡(luò)得到的數(shù)據(jù)要解碼,發(fā)出去的數(shù)據(jù)也要編碼

asyncio包提供了高層的流API,提供了現(xiàn)成的服務(wù)器,我們只需要實(shí)現(xiàn)一個(gè)處理程序。詳細(xì)信息可以查看文檔:https://docs.python.org/3/library/asyncio-stream.html

雖然,asyncio包提供了服務(wù)器,但是功能相對來說還是比較簡陋的,現(xiàn)在我們使用一下 基于asyncio包的 web 框架 sanci,用它來實(shí)現(xiàn)一個(gè)http版的簡易服務(wù)器

sanic 的簡單入門在上一篇文章有介紹,python web 框架 Sanci 快速入門

使用 sanic 包編寫web 服務(wù)器

Sanic 是一個(gè)和類Flask 的基于Python3.5+的web框架,提供了比較高階的API,比如路由、request參數(shù),response等,我們只需要實(shí)現(xiàn)處理邏輯即可。

下邊是使用 sanic 實(shí)現(xiàn)的簡易的 字符查詢http web 服務(wù):

from sanic import Sanic
from sanic import response

from charfinder import UnicodeNameIndex

app = Sanic()

index = UnicodeNameIndex()

html_temp = "

{char}

" @app.route("/charfinder") # app.route 函數(shù)的第一個(gè)參數(shù)是url path,我們這里指定路徑是charfinder async def charfinder(request): # request.args 可以取到url 的查詢參數(shù) # ?key1=value1&key2=value2 的結(jié)果是 {"key1": ["value1"], "key2": ["value2"]} # 我們這里支持傳入多個(gè)查詢參數(shù),所以這里使用 request.args.getlist("char") # 如果我們 使用 request.args.get("char") 只能取到第一個(gè)參數(shù) query = request.args.getlist("char") query = " ".join(query) lines = list(index.find_description_strs(query)) # 將得到的結(jié)果生成html html = " ".join([html_temp.format(char=line) for line in lines]) return response.html(html) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000) # 設(shè)置服務(wù)器運(yùn)行地址和端口號(hào)

對比兩段代碼可以發(fā)現(xiàn),使用 sanic 非常簡單。

運(yùn)行服務(wù):

python http_charsfinder.py

我們在瀏覽器輸入地址 http://0.0.0.0:8000/charfinde... 結(jié)果示例如下

現(xiàn)在對比下兩段代碼

在TCP 的示例中,服務(wù)器通過main函數(shù)下的這兩行代碼創(chuàng)建并排定運(yùn)行時(shí)間:

server_coro = asyncio.start_server(handle_queries, address, port,
                                loop=loop)
server = loop.run_until_complete(server_coro)

而在sanic的HTTP示例中,使用,創(chuàng)建服務(wù)器:

app.run(host="0.0.0.0", port=8000)

這兩個(gè)看起來運(yùn)行方式完全不同,但如果我們翻開sanic的源碼會(huì)看到 app.run() 內(nèi)部是調(diào)用 的 server_coroutine = loop.create_server()創(chuàng)建服務(wù)器,
server_coroutine 是通過 loop.run_until_complete()驅(qū)動(dòng)的。

所以說,為了啟動(dòng)服務(wù)器,這兩個(gè)都是由 loop.run_until_complete 驅(qū)動(dòng),完成運(yùn)行的。只不過 sanic 封裝了run 方法,使得使用更加方便。

這里可以得到一個(gè)基本事實(shí):只有驅(qū)動(dòng)協(xié)程,協(xié)程才能做事,而驅(qū)動(dòng) asyncio.coroutine 裝飾的協(xié)程有兩種方式,使用 yield from 或者傳給asyncio 包中某個(gè)參數(shù)為協(xié)程或future的函數(shù),例如 run_until_complete

現(xiàn)在如果你搜索 cjk,會(huì)得到7萬多條數(shù)據(jù)3M 的一個(gè)html文件,耗時(shí)大約2s,這如果是生產(chǎn)服務(wù)的一個(gè)請求,耗時(shí)2s是不能接收的,我們可以使用分頁,這樣我們可以每次只取200條數(shù)據(jù),當(dāng)用戶想看更多數(shù)據(jù)時(shí)再使用 ajax 或者 websockets發(fā)送下一批數(shù)據(jù)。

這一篇我們使用 asyncio 包實(shí)現(xiàn)了TCP服務(wù)器,使用sanic(基于asyncio sanic 默認(rèn)使用 uvloop替代asyncio)實(shí)現(xiàn)了HTTP服務(wù)器,用于按名稱搜索Unicode 字符。但是并沒有涉及服務(wù)器并發(fā)部分,這部分可以以后再討論。

這一篇還是 《流暢的python》asyncio 一章的讀書筆記,下一篇將是python并發(fā)的第三篇,《使用線程處理并發(fā)》。

參考鏈接

Python 3.5將支持Async/Await異步編程:http://www.infoq.com/cn/news/2015/05/python-async-await

python web 框架 Sanci 快速入門

python并發(fā)2:使用asyncio處理并發(fā)

最后,感謝女朋友支持。

>歡迎關(guān)注 >請我喝芬達(dá)

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38648.html

相關(guān)文章

  • Python中的并發(fā)處理之使用asyncio

    摘要:并發(fā)用于制定方案,用來解決可能但未必并行的問題。在協(xié)程中使用需要注意兩點(diǎn)使用鏈接的多個(gè)協(xié)程最終必須由不是協(xié)程的調(diào)用方驅(qū)動(dòng),調(diào)用方顯式或隱式在最外層委派生成器上調(diào)用函數(shù)或方法。對象可以取消取消后會(huì)在協(xié)程當(dāng)前暫停的處拋出異常。 導(dǎo)語:本文章記錄了本人在學(xué)習(xí)Python基礎(chǔ)之控制流程篇的重點(diǎn)知識(shí)及個(gè)人心得,打算入門Python的朋友們可以來一起學(xué)習(xí)并交流。 本文重點(diǎn): 1、了解asyncio...

    tuniutech 評論0 收藏0
  • python基礎(chǔ)教程:異步IO 之 API

    摘要:具有以下基本同步原語子進(jìn)程提供了通過創(chuàng)建和管理子進(jìn)程的。雖然隊(duì)列不是線程安全的,但它們被設(shè)計(jì)為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準(zhǔn)庫。通過上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個(gè)版本,asyncio又做了比較大的調(diào)整,把這個(gè)庫的API分為了 高層級API和...

    vboy1010 評論0 收藏0
  • python并發(fā)2:使用asyncio處理并發(fā)

    摘要:是之后引入的標(biāo)準(zhǔn)庫的,這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。沒有能從外部終止線程,因?yàn)榫€程隨時(shí)可能被中斷。上一篇并發(fā)使用處理并發(fā)我們介紹過的,在中,只是調(diào)度執(zhí)行某物的結(jié)果。 asyncio asyncio 是Python3.4 之后引入的標(biāo)準(zhǔn)庫的,這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。asyncio 包在引入標(biāo)準(zhǔn)庫之前代號(hào) Tulip(郁金香),所以在網(wǎng)上搜索資料時(shí),會(huì)經(jīng)??吹竭@種花的...

    wushuiyong 評論0 收藏0
  • python并發(fā)編程的思考

    摘要:我們以請求網(wǎng)絡(luò)服務(wù)為例,來實(shí)際測試一下加入多線程之后的效果。所以,執(zhí)行密集型操作時(shí),多線程是有用的,對于密集型操作,則每次只能使用一個(gè)線程。說到這里,對于密集型,可以使用多線程或者多進(jìn)程來提高效率。 為了提高系統(tǒng)密集型運(yùn)算的效率,我們常常會(huì)使用到多個(gè)進(jìn)程或者是多個(gè)線程,python中的Threading包實(shí)現(xiàn)了線程,multiprocessing 包則實(shí)現(xiàn)了多進(jìn)程。而在3.2版本的py...

    sshe 評論0 收藏0
  • python基礎(chǔ)教程:異步IO 之編程例子

    摘要:創(chuàng)建第一個(gè)協(xié)程推薦使用語法來聲明協(xié)程,來編寫異步應(yīng)用程序。協(xié)程兩個(gè)緊密相關(guān)的概念是協(xié)程函數(shù)通過定義的函數(shù)協(xié)程對象調(diào)用協(xié)程函數(shù)返回的對象。它是一個(gè)低層級的可等待對象,表示一個(gè)異步操作的最終結(jié)果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...

    wangxinarhat 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<