成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

Python 的異步 IO:Asyncio 之 TCP Client

anonymoussf / 2241人閱讀

摘要:當(dāng)被調(diào)用時(shí),表示已經(jīng)斷開(kāi)連接。第三版去掉第三版的目的是去掉。協(xié)程保持不變,但是已被剔除不再需要請(qǐng)求發(fā)送之后,繼續(xù)異步等待數(shù)據(jù)的接收,即。的作用是結(jié)束那個(gè)導(dǎo)致等待的,這樣也就可以結(jié)束了結(jié)束,以便結(jié)束。

關(guān)于 Asyncio 的其他文章:

Python 的異步 IO:Asyncio 簡(jiǎn)介

Python 的異步 IO:Aiohttp Client 代碼分析

如果不知道 Asyncio 是什么,先看「Asyncio 簡(jiǎn)介」那一篇。

一個(gè)簡(jiǎn)單的 HTTP Server

首先,為了便于測(cè)試,我們用 Python 內(nèi)建的 http 模塊,運(yùn)行一個(gè)簡(jiǎn)單的 HTTP Server。

新建一個(gè)目錄,添加文件 index.html,內(nèi)容為 Hello, World!(不是合法的 HTML 格式也沒(méi)有關(guān)系),然后運(yùn)行如下命令(Ubuntu 請(qǐng)用 python3):

$ python -m http.server
Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...

后面不同的 Client 實(shí)現(xiàn),都會(huì)連接這個(gè) Server:Host 為 localhost,Port 為 8000。

所有的示例代碼,import 語(yǔ)句一律從略。

import asyncio
第一版

第一版改寫(xiě)自 Python 官方文檔里的 例子。
Python 的例子是 Echo Client,我們稍微復(fù)雜一點(diǎn),是 HTTP Client,都是 TCP。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop

    def connection_made(self, transport):
        request = "GET / HTTP/1.1
Host: localhost

"
        transport.write(request.encode())

    def data_received(self, data):
        print(data.decode())

    def connection_lost(self, exc):
        self.loop.stop()

async def main(loop):
    await loop.create_connection(
        lambda: ClientProtocol(loop), "localhost", 8000)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_forever()

TCP 連接由 loop.create_connection() 創(chuàng)建,后者需要一個(gè) Protocol 工廠,即 lambda: ClientProtocol(loop)
Protocol 提供了 connection_made(),data_received(), connection_lost() 等接口,這些接口就像回調(diào)函數(shù)一樣,會(huì)在恰當(dāng)?shù)臅r(shí)候被調(diào)用。
我們?cè)?connection_made() 中,通過(guò)參數(shù) transport 發(fā)送一個(gè) HTTP GET 請(qǐng)求,隨后在 data_received() 里,將收到 HTTP 應(yīng)答。
當(dāng) connection_lost() 被調(diào)用時(shí),表示 Server 已經(jīng)斷開(kāi)連接。

運(yùn)行結(jié)果:

HTTP/1.0 200 OK
Server: SimpleHTTP/0.6 Python/3.6.3
Date: Mon, 04 Dec 2017 06:11:52 GMT
Content-type: text/html
Content-Length: 13
Last-Modified: Thu, 30 Nov 2017 05:37:31 GMT


Hello, World!

這就是一個(gè)標(biāo)準(zhǔn)的 HTTP 應(yīng)答,包含 Status Line,Headers 和 Body。

值得注意的是,loop 其實(shí)運(yùn)行了兩遍:

loop.run_until_complete(main(loop))  # 第一遍
loop.run_forever()  # 第二遍

如果沒(méi)有 run_forever(),在收到數(shù)據(jù)之前,loop 可能就結(jié)束了。協(xié)程 main() 只是創(chuàng)建好連接,隨后 run_until_complete() 自然也就無(wú)事可做而終。

加了 run_forever() 后,data_received() 等便有了被調(diào)用的機(jī)會(huì)。但是也有問(wèn)題,loop 一直在跑,程序沒(méi)辦法結(jié)束,所以才在 connection_lost() 里主動(dòng)停止 loop:

    def connection_lost(self, exc):
        self.loop.stop()
第二版:ClientSession

第一版在 connection_made() 中 hard code 了一個(gè) HTTP GET 請(qǐng)求,靈活性較差,以后必然還有 POST 等其他 HTTP 方法需要支持,所以有必要新增一個(gè) ClientSession 類(lèi),來(lái)抽象客戶(hù)端的會(huì)話。于是,HTTP 請(qǐng)求的發(fā)送,便從 connection_made() 挪到了 ClientSession.get()。

ClientSession 應(yīng)該為每一個(gè) HTTP 方法提供一個(gè)相應(yīng)的方法,比如 postput 等等,雖然我們只考慮 HTTP GET。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print(data.decode())

    def connection_lost(self, exc):
        self.loop.stop()

class ClientSession:
    def __init__(self, loop):
        self._loop = loop

    async def get(self, url, host, port):
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop), host, port)

        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        transport.write(request.encode())

首先,ClientProtocol 新增了一個(gè)屬性 transport,是在 connection_made() 時(shí)保存下來(lái)的,這樣在 ClientSession 里才能通過(guò)它來(lái)發(fā)送請(qǐng)求。

第三版:去掉 run_forever()

第三版的目的是:去掉 run_forever() 。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None
        self._eof = False  # 有沒(méi)有收到 EOF
        self._waiter = None  # 用來(lái)等待接收數(shù)據(jù)的 future

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print(data.decode())

    def eof_received(self):
        self._eof = True
        self._wakeup_waiter()

    def connection_lost(self, exc):
        pass  # 不再調(diào)用 self.loop.stop()

    async def wait_for_data(self):
        assert not self._eof
        assert not self._waiter

        self._waiter = self.loop.create_future()
        await self._waiter
        self._waiter = None

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            waiter.set_result(None)

class ClientSession:
    def __init__(self, loop):
        self._loop = loop

    async def get(self, url, host, port):
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop), host, port)

        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        transport.write(request.encode())

        # 等待接收數(shù)據(jù)。
        await protocol.wait_for_data()

協(xié)程 main() 保持不變,但是 loop.run_forever() 已被剔除:

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# 不再需要 loop.run_forever()

HTTP 請(qǐng)求發(fā)送之后,繼續(xù)異步等待(await)數(shù)據(jù)的接收,即 protocol.wait_for_data()。
這個(gè)等待動(dòng)作,是通過(guò)往 loop 里新增一個(gè) future 來(lái)實(shí)現(xiàn)的:

    async def wait_for_data(self):
        # ...
        self._waiter = self.loop.create_future()
        await self._waiter
        self._waiter = None

self._waiter 就是這個(gè)導(dǎo)致等待的 future,它會(huì)保證 loop 一直運(yùn)行,直到數(shù)據(jù)接收完畢。
eof_received() 被調(diào)用時(shí),數(shù)據(jù)就接收完畢了(EOF 的意思不用多說(shuō)了吧?)。

    def eof_received(self):
        self._eof = True
        self._wakeup_waiter()

_wakeup_waiter() 的作用是結(jié)束那個(gè)導(dǎo)致等待的 future,這樣 loop 也就可以結(jié)束了:

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            # 結(jié)束 waiter future,以便 loop 結(jié)束。
            waiter.set_result(None)
第四版:Reader

data_received() 里直接輸出 HTTP 的應(yīng)答結(jié)果,實(shí)在算不上什么完美的做法。

    def data_received(self, data):
        print(data.decode())

為了解決這一問(wèn)題,我們引入一個(gè) Reader 類(lèi),用來(lái)緩存收到的數(shù)據(jù),并提供「讀」的接口給用戶(hù)。

首先,Protocol 被簡(jiǎn)化了,前一版引入的各種處理,都轉(zhuǎn)交給了 Reader。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop, reader):
        self.loop = loop
        self.transport = None
        self._reader = reader

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self._reader.feed(data)  # 轉(zhuǎn)交給 Reader

    def eof_received(self):
        self._reader.feed_eof()  # 轉(zhuǎn)交給 Reader

    def connection_lost(self, exc):
        pass

下面是 ClientSession.get() 基于 Reader 的實(shí)現(xiàn):

class ClientSession:
    async def get(self, url, host, port):
        reader = Reader(self._loop)
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop, reader), host, port)
        # 發(fā)送請(qǐng)求,代碼從略...
        data = await reader.read()
        print(data.decode())

Reader 本身是從上一版的 Protocol 抽取出來(lái)的,唯一不同的是,接收的數(shù)據(jù)被臨時(shí)放在了一個(gè) bytearray 緩存里。

class Reader:
    def __init__(self, loop):
        self._loop = loop
        self._buffer = bytearray()  # 緩存
        self._eof = False
        self._waiter = None

    def feed(self, data):
        self._buffer.extend(data)
        self._wakeup_waiter()

    def feed_eof(self):
        self._eof = True
        self._wakeup_waiter()

    async def read(self):
        if not self._buffer and not self._eof:
            await self._wait_for_data()
            
        data = bytes(self._buffer)
        del self._buffer[:]
        return data

    async def _wait_for_data(self):
        assert not self._eof
        assert not self._waiter

        self._waiter = self._loop.create_future()
        await self._waiter
        self._waiter = None

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            waiter.set_result(None)

稍微解釋一下 read(),比較重要的是開(kāi)始的一句判斷:

        # 如果緩存為空,并且 EOF 還沒(méi)收到,那就(繼續(xù))等待接收數(shù)據(jù)。
        if not self._buffer and not self._eof:
            # read() 會(huì)停在這個(gè)地方,直到 feed() 或 feed_eof() 被調(diào)用,
            # 也就是說(shuō)有數(shù)據(jù)可讀了。
            await self._wait_for_data()

接下來(lái)就是把緩存倒空:

        data = bytes(self._buffer)
        del self._buffer[:]

運(yùn)行一下,不難發(fā)現(xiàn),ClientSession.get() 里讀數(shù)據(jù)的那一句是有問(wèn)題的。

        data = await reader.read()

收到的 data 并不是完整的 HTTP 應(yīng)答,可能只包含了 HTTP 的 Headers,而沒(méi)有 Body。

一個(gè) HTTP 應(yīng)答,Server 端可能分多次發(fā)送過(guò)來(lái)。比如這個(gè)測(cè)試用的 Hello World Server,Headers 和 Body 就分了兩次發(fā)送,也就是說(shuō) data_received() 會(huì)被調(diào)用兩次。

之前我們?cè)?eof_received() 里才喚醒 waiter(_wakeup_waiter()),現(xiàn)在在 data_received() 里就喚醒了,于是第一次數(shù)據(jù)收完, waiter 就結(jié)束了,loop 也便跟著結(jié)束。

為了讀到完整的 HTTP 應(yīng)答,方法也很簡(jiǎn)單,把 read() 放在循環(huán)里:

        blocks = []
        while True:
            block = await reader.read()
            if not block:
                break
            blocks.append(block)
        data = b"".join(blocks)
        print(data.decode())

每一次 read(),如果緩存為空,并且 EOF 還沒(méi)收到的話,就會(huì)再次創(chuàng)建 waiter,放到 loop 里,繼續(xù)等待接收數(shù)據(jù)。

這個(gè)循環(huán)顯然應(yīng)該交給 Reader 處理,對(duì) ClientSession 需保持透明。

class Reader:
    async def read(self):
        blocks = []
        while True:
            block = await self._read()
            if not block:
                break
            blocks.append(block)
        data = b"".join(blocks)
        return data

    async def _read(self):
        if not self._buffer and not self._eof:
            await self._wait_for_data()
            
        data = bytes(self._buffer)
        del self._buffer[:]
        return data

最后,原來(lái)的 read() 重命名為 _read(),新的 read() 在循環(huán)中反復(fù)調(diào)用 _read(),直到無(wú)數(shù)據(jù)可讀。ClientSession 這邊直接調(diào)用新的 read() 即可。

第五版:Writer

到目前為止,發(fā)送 HTTP 請(qǐng)求時(shí),都是直接調(diào)用較為底層的 transport.write()

    async def get(self, url, host, port):
        # ...
        transport.write(request.encode())

可以把它封裝在 Writer 中,與 Reader 的做法類(lèi)似,但是 Writer 要簡(jiǎn)單得多:

class Writer:
    def __init__(self, transport):
        self._transport = transport

    def write(self, data):
        self._transport.write(data)

然后在 ClientSession.get() 中創(chuàng)建 Writer

    async def get(self, url, host, port):
        reader = Reader(self._loop)
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop, reader), host, port)

        writer = Writer(transport)
        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        writer.write(request.encode())
        # ...

對(duì) ClientSession 來(lái)說(shuō),只需知道 ReaderWriter 就足夠了,所以不妨提供一個(gè)函數(shù) open_connection(),直接返回 ReaderWriter。

async def open_connection(host, port, loop):
    reader = Reader(loop)
    protocol = ClientProtocol(loop, reader)
    transport, _ = await loop.create_connection(lambda: protocol, host, port)
    writer = Writer(transport)
    return reader, writer

然后 ClientSession 就可以簡(jiǎn)化成這樣:

class ClientSession:
    async def get(self, url, host, port):
        reader, writer = await open_connection(host, port, self._loop)
        # ...
第六版:Asyncio Streams

其實(shí) Asyncio 已經(jīng)提供了 Reader 和 Writer,詳見(jiàn) 官方文檔。

下面以 Asyncio Streams 實(shí)現(xiàn) ClientSession.get()

class ClientSession:
    async def get(self, url, host, port):
        reader, writer = await asyncio.open_connection(
            host, port, loop=self._loop)

        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        writer.write(request.encode())

        data = await reader.read(-1)
        print(data.decode())
        writer.close()

asyncio.open_connection() 就相當(dāng)于我們的 open_connection()。ReaderWriter 也都類(lèi)似,只是復(fù)雜了一些。

全文完

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/44528.html

相關(guān)文章

  • python基礎(chǔ)教程:異步IO API

    摘要:具有以下基本同步原語(yǔ)子進(jìn)程提供了通過(guò)創(chuàng)建和管理子進(jìn)程的。雖然隊(duì)列不是線程安全的,但它們被設(shè)計(jì)為專(zhuān)門(mén)用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語(yǔ)法編寫(xiě)并發(fā)代碼的標(biāo)準(zhǔn)庫(kù)。通過(guò)上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個(gè)版本,asyncio又做了比較大的調(diào)整,把這個(gè)庫(kù)的API分為了 高層級(jí)API和...

    vboy1010 評(píng)論0 收藏0
  • Python 異步 IO:Aiohttp Client 代碼分析

    摘要:的異步代碼分析是的一個(gè)框架,基于,所以叫。不可避免的,可讀性會(huì)比較差。想找教程的話,請(qǐng)移步官方教程,寫(xiě)得還是挺不錯(cuò)的。建議不要直接使用,而只把它當(dāng)成的一個(gè)樣例。 Python 的異步 IO:Aiohttp Client 代碼分析 Aiohttp 是 Python 的一個(gè) HTTP 框架,基于 asyncio,所以叫 Aiohttp。 我主要是看源碼,想理解它的設(shè)計(jì),所以附上了類(lèi)圖與時(shí)序...

    fai1017 評(píng)論0 收藏0
  • python基礎(chǔ)教程:異步IO 概念和歷史

    摘要:并發(fā)的方式有多種,多線程,多進(jìn)程,異步等。多線程和多進(jìn)程之間的場(chǎng)景切換和通訊代價(jià)很高,不適合密集型的場(chǎng)景關(guān)于多線程和多進(jìn)程的特點(diǎn)已經(jīng)超出本文討論的范疇,有興趣的同學(xué)可以自行搜索深入理解。 編程中,我們經(jīng)常會(huì)遇到并發(fā)這個(gè)概念,目的是讓軟件能充分利用硬件資源,提高性能。并發(fā)的方式有多種,多線程,多進(jìn)程,異步IO等。多線程和多進(jìn)程更多應(yīng)用于CPU密集型的場(chǎng)景,比如科學(xué)計(jì)算的時(shí)間都耗費(fèi)在CPU...

    BicycleWarrior 評(píng)論0 收藏0
  • python基礎(chǔ)教程:異步IO 編程例子

    摘要:創(chuàng)建第一個(gè)協(xié)程推薦使用語(yǔ)法來(lái)聲明協(xié)程,來(lái)編寫(xiě)異步應(yīng)用程序。協(xié)程兩個(gè)緊密相關(guān)的概念是協(xié)程函數(shù)通過(guò)定義的函數(shù)協(xié)程對(duì)象調(diào)用協(xié)程函數(shù)返回的對(duì)象。它是一個(gè)低層級(jí)的可等待對(duì)象,表示一個(gè)異步操作的最終結(jié)果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...

    wangxinarhat 評(píng)論0 收藏0
  • sanic異步框架中文文檔

    摘要:實(shí)例實(shí)例測(cè)試結(jié)果增加路由實(shí)例測(cè)試結(jié)果提供了一個(gè)方法,根據(jù)處理程序方法名生成。異常拋出異常要拋出異常,只需從異常模塊中提出相應(yīng)的異常。 typora-copy-images-to: ipic [TOC] 快速開(kāi)始 在安裝Sanic之前,讓我們一起來(lái)看看Python在支持異步的過(guò)程中,都經(jīng)歷了哪些比較重大的更新。 首先是Python3.4版本引入了asyncio,這讓Python有了支...

    elliott_hu 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<