摘要:當(dāng)一個客戶端連接時,它將返回一個新的對象,對象中有表示當(dāng)前連接的和一個由主機(jī)端口號組成的連接的元組,更多關(guān)于元組值的內(nèi)容可以查看地址族一節(jié)中的詳情這里必須要明白我們通過調(diào)用方法擁有了一個新的對象。
博客原文: https://keelii.com/2018/09/24/socket-programming-in-python/
說明本書翻譯自 realpython 網(wǎng)站上的文章教程 Socket Programming in Python (Guide),由于原文比較長,所以整理成了 Gitbook 方便閱讀
原作者Nathan Jennings 是 Real Python 教程團(tuán)隊的一員,他在很早之前就使用 C 語言開始了自己的編程生涯,但是最終發(fā)現(xiàn)了 Python,從 Web 應(yīng)用和網(wǎng)絡(luò)數(shù)據(jù)收集到網(wǎng)絡(luò)安全,他喜歡任何 Pythonic 的東西譯者注
—— realpython
譯者 是一名前端工程師,平常會寫很多的 JavaScript。但是當(dāng)我使用 JavaScript 很長一段時間后,會對一些 語言無關(guān) 的編程概念感興趣,比如:網(wǎng)絡(luò)/socket 編程、異步/并發(fā)、線/進(jìn)程通信等。然而恰好這些內(nèi)容在 JavasScript 領(lǐng)域很少見
因為一直從事 Web 開發(fā),所以我認(rèn)為理解了網(wǎng)絡(luò)通信及其 socket 編程就理解了 Web 開發(fā)的某些本質(zhì)。過程中我發(fā)現(xiàn) Python 社區(qū)有很多我喜歡的內(nèi)容,并且很多都是高質(zhì)量的公開發(fā)布且開源的。
最近我發(fā)現(xiàn)了這篇文章,系統(tǒng)地從底層網(wǎng)絡(luò)通信講到了應(yīng)用層協(xié)議及其 C/S 架構(gòu)的應(yīng)用程序,由淺入深。雖然代碼、API 使用了 Python,但是底層原因都是相通的。非常值得一讀,推薦給大家
另外,由于本人水平所限,翻譯的內(nèi)容難免出現(xiàn)偏差,如果你在閱讀的過程中發(fā)現(xiàn)問題,請毫不猶豫的提醒我或者開新 PR?;蛘哂惺裁床焕斫獾牡胤揭部梢蚤_ issue 討論
授權(quán)本文(翻譯版)通過了 realpython 官方授權(quán),原文版權(quán)歸其所有,任何轉(zhuǎn)載請聯(lián)系他們
開始網(wǎng)絡(luò)中的 Socket 和 Socket API 是用來跨網(wǎng)絡(luò)的消息傳送的,它提供了 進(jìn)程間通信(IPC) 的一種形式。網(wǎng)絡(luò)可以是邏輯的、本地的電腦網(wǎng)絡(luò),或者是可以物理連接到外網(wǎng)的網(wǎng)絡(luò),并且可以連接到其它網(wǎng)絡(luò)。英特網(wǎng)就是一個明顯的例子,就是那個你通過 ISP 連接到的網(wǎng)絡(luò)
本篇教程有三個不同的迭代階段,來展示如何使用 Python 構(gòu)建一個 Socket 服務(wù)器和客戶端
我們將以一個簡單的 Socket 服務(wù)器和客戶端程序來開始本教程
當(dāng)你看完 API 了解例子是怎么運行起來以后,我們將會看到一個具有同時處理多個連接能力的例子的改進(jìn)版
最后,我們將會開發(fā)出一個更加完善且具有完整的自定義頭信息和內(nèi)容的 Socket 應(yīng)用
教程結(jié)束后,你將學(xué)會如何使用 Python 中的 socket 模塊 來寫一個自己的客戶端/服務(wù)器應(yīng)用。以及向你展示如何在你的應(yīng)用中使用自定義類在不同的端之間發(fā)送消息和數(shù)據(jù)
所有的例子程序都使用 Python 3.6 編寫,你可以在 Github 上找到 源代碼
網(wǎng)絡(luò)和 Socket 是個很大的話題。網(wǎng)上已經(jīng)有了關(guān)于它們的字面解釋,如果你還不是很了解 Socket 和網(wǎng)絡(luò)。當(dāng)你你讀到那些解釋的時候會感到不知所措,這是非常正常的。因為我也是這樣過來的
盡管如此也不要氣餒。 我已經(jīng)為你寫了這個教程。 就像學(xué)習(xí) Python 一樣,我們可以一次學(xué)習(xí)一點。用你的瀏覽器保存本頁面到書簽,以便你學(xué)習(xí)下一部分時能找到
讓我們開始吧!
背景Socket 有一段很長的歷史,最初是在 1971 年被用于 ARPANET,隨后就成了 1983 年發(fā)布的 Berkeley Software Distribution (BSD) 操作系統(tǒng)的 API,并且被命名為 Berkeleysocket
當(dāng)互聯(lián)網(wǎng)在 20 世紀(jì) 90 年代隨萬維網(wǎng)興起時,網(wǎng)絡(luò)編程也火了起來。Web 服務(wù)和瀏覽器并不是唯一使用新的連接網(wǎng)絡(luò)和 Socket 的應(yīng)用程序。各種類型不同規(guī)模的客戶端/服務(wù)器應(yīng)用都廣泛地使用著它們
時至今日,盡管 Socket API 使用的底層協(xié)議已經(jīng)進(jìn)化了很多年,也出現(xiàn)了許多新的協(xié)議,但是底層的 API 仍然保持不變
Socket 應(yīng)用最常見的類型就是 客戶端/服務(wù)器 應(yīng)用,服務(wù)器用來等待客戶端的鏈接。我們教程中涉及到的就是這類應(yīng)用。更明確地說,我們將看到用于 InternetSocket 的 Socket API,有時稱為 Berkeley 或 BSD Socket。當(dāng)然也有 Unix domain sockets —— 一種用于 同一主機(jī) 進(jìn)程間的通信
Socket API 概覽Python 的 socket 模塊提供了使用 Berkeley sockets API 的接口。這將會在我們這個教程里使用和討論到
主要的用到的 Socket API 函數(shù)和方法有下面這些:
socket()
bind()
listen()
accept()
connect()
connect_ex()
send()
recv()
close()
Python 提供了和 C 語言一致且方便的 API。我們將在下面一節(jié)中用到它們
作為標(biāo)準(zhǔn)庫的一部分,Python 也有一些類可以讓我們方便的調(diào)用這些底層 Socket 函數(shù)。盡管這個教程中并沒有涉及這部分內(nèi)容,你也可以通過socketserver 模塊 中找到文檔。當(dāng)然還有很多實現(xiàn)了高層網(wǎng)絡(luò)協(xié)議(比如:HTTP, SMTP)的的模塊,可以在下面的鏈接中查到 Internet Protocols and Support
TCP Sockets就如你馬上要看到的,我們將使用 socket.socket() 創(chuàng)建一個類型為 socket.SOCK_STREAM 的 socket 對象,默認(rèn)將使用 Transmission Control Protocol(TCP) 協(xié)議,這基本上就是你想使用的默認(rèn)值
為什么應(yīng)該使用 TCP 協(xié)議?
可靠的:網(wǎng)絡(luò)傳輸中丟失的數(shù)據(jù)包會被檢測到并重新發(fā)送
有序傳送:數(shù)據(jù)按發(fā)送者寫入的順序被讀取
相反,使用 socket.SOCK_DGRAM 創(chuàng)建的 用戶數(shù)據(jù)報協(xié)議(UDP) Socket 是 不可靠 的,而且數(shù)據(jù)的讀取寫發(fā)送可以是 無序的
為什么這個很重要?網(wǎng)絡(luò)總是會盡最大的努力去傳輸完整數(shù)據(jù)(往往不盡人意)。沒法保證你的數(shù)據(jù)一定被送到目的地或者一定能接收到別人發(fā)送給你的數(shù)據(jù)
網(wǎng)絡(luò)設(shè)備(比如:路由器、交換機(jī))都有帶寬限制,或者系統(tǒng)本身的極限。它們也有 CPU、內(nèi)存、總線和接口包緩沖區(qū),就像我們的客戶端和服務(wù)器。TCP 消除了你對于丟包、亂序以及其它網(wǎng)絡(luò)通信中通常出現(xiàn)的問題的顧慮
下面的示意圖中,我們將看到 Socket API 的調(diào)用順序和 TCP 的數(shù)據(jù)流:
左邊表示服務(wù)器,右邊則是客戶端
左上方開始,注意服務(wù)器創(chuàng)建「監(jiān)聽」Socket 的 API 調(diào)用:
socket()
bind()
listen()
accept()
「監(jiān)聽」Socket 做的事情就像它的名字一樣。它會監(jiān)聽客戶端的連接,當(dāng)一個客戶端連接進(jìn)來的時候,服務(wù)器將調(diào)用 accept() 來「接受」或者「完成」此連接
客戶端調(diào)用 connect() 方法來建立與服務(wù)器的鏈接,并開始三次握手。握手很重要是因為它保證了網(wǎng)絡(luò)的通信的雙方可以到達(dá),也就是說客戶端可以正常連接到服務(wù)器,反之亦然
上圖中間部分往返部分表示客戶端和服務(wù)器的數(shù)據(jù)交換過程,調(diào)用了 send() 和 recv()方法
下面部分,客戶端和服務(wù)器調(diào)用 close() 方法來關(guān)閉各自的 socket
打印客戶端和服務(wù)端你現(xiàn)在已經(jīng)了解了基本的 socket API 以及客戶端和服務(wù)器是如何通信的,讓我們來創(chuàng)建一個客戶端和服務(wù)器。我們將會以一個簡單的實現(xiàn)開始。服務(wù)器將打印客戶端發(fā)送回來的內(nèi)容
打印程序服務(wù)端下面就是服務(wù)器代碼,echo-server.py:
#!/usr/bin/env python3 import socket HOST = "127.0.0.1" # 標(biāo)準(zhǔn)的回環(huán)地址 (localhost) PORT = 65432 # 監(jiān)聽的端口 (非系統(tǒng)級的端口: 大于 1023) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen() conn, addr = s.accept() with conn: print("Connected by", addr) while True: data = conn.recv(1024) if not data: break conn.sendall(data)
注意:上面的代碼你可能還沒法完全理解,但是不用擔(dān)心。這幾行代碼做了很多事情,這
只是一個起點,幫你看見這個簡單的服務(wù)器是如何運行的
教程后面有引用部分,里面有很多額外的引用資源鏈接,這個教程中我將把鏈接放在那兒
讓我們一起來看一下 API 調(diào)用以及發(fā)生了什么
socket.socket() 創(chuàng)建了一個 socket 對象,并且支持 context manager type,你可以使用 with 語句,這樣你就不用再手動調(diào)用 s.close() 來關(guān)閉 socket 了
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: pass # Use the socket object without calling s.close().
調(diào)用 socket() 時傳入的 socket 地址族參數(shù) socket.AF_INET 表示因特網(wǎng) IPv4 地址族,SOCK_STREAM 表示使用 TCP 的 socket 類型,協(xié)議將被用來在網(wǎng)絡(luò)中傳輸消息
bind() 用來關(guān)聯(lián) socket 到指定的網(wǎng)絡(luò)接口(IP 地址)和端口號:
HOST = "127.0.0.1" PORT = 65432 # ... s.bind((HOST, PORT))
bind() 方法的入?yún)⑷Q于 socket 的地址族,在這個例子中我們使用了 socket.AF_INET (IPv4),它將返回兩個元素的元組:(host, port)
host 可以是主機(jī)名稱、IP 地址、空字符串,如果使用 IP 地址,host 就應(yīng)該是 IPv4 格式的字符串,127.0.0.1 是標(biāo)準(zhǔn)的 IPv4 回環(huán)地址,只有主機(jī)上的進(jìn)程可以連接到服務(wù)器,如果你傳了空字符串,服務(wù)器將接受本機(jī)所有可用的 IPv4 地址
端口號應(yīng)該是 1-65535 之間的整數(shù)(0是保留的),這個整數(shù)就是用來接受客戶端鏈接的 TCP 端口號,如果端口號小于 1024,有的操作系統(tǒng)會要求管理員權(quán)限
使用 bind() 傳參為主機(jī)名稱的時候需要注意:
如果你在 host 部分 主機(jī)名稱 作為 IPv4/v6 socket 的地址,程序可能會產(chǎn)生非確
定性的行為,因為 Python 會使用 DNS 解析后的 第一個 地址,根據(jù) DNS 解析的結(jié)
果或者 host 配置 socket 地址將會以不同方式解析為實際的 IPv4/v6 地址。如果想得
到確定的結(jié)果傳入的 host 參數(shù)建議使用數(shù)字格式的地址 引用
我稍后將在 使用主機(jī)名 部分討論這個問題,但是現(xiàn)在也值得一提。目前來說你只需要知道當(dāng)使用主機(jī)名時,你將會因為 DNS 解析的原因得到不同的結(jié)果
可能是任何地址。比如第一次運行程序時是 10.1.2.3,第二次是 192.168.0.1,第三次是 172.16.7.8 等等
繼續(xù)看上面的服務(wù)器代碼示例,listen() 方法調(diào)用使服務(wù)器可以接受連接請求,這使它成為一個「監(jiān)聽中」的 socket
s.listen() conn, addr = s.accept()
listen() 方法有一個 backlog 參數(shù)。它指定在拒絕新的連接之前系統(tǒng)將允許使用的 未接受的連接 數(shù)量。從 Python 3.5 開始,這是可選參數(shù)。如果不指定,Python 將取一個默認(rèn)值
如果你的服務(wù)器需要同時接收很多連接請求,增加 backlog 參數(shù)的值可以加大等待鏈接請求隊列的長度,最大長度取決于操作系統(tǒng)。比如在 Linux 下,參考 /proc/sys/net/core/somaxconn
accept() 方法阻塞并等待傳入連接。當(dāng)一個客戶端連接時,它將返回一個新的 socket 對象,對象中有表示當(dāng)前連接的 conn 和一個由主機(jī)、端口號組成的 IPv4/v6 連接的元組,更多關(guān)于元組值的內(nèi)容可以查看 socket 地址族 一節(jié)中的詳情
這里必須要明白我們通過調(diào)用 accept() 方法擁有了一個新的 socket 對象。這非常重要,因為你將用這個 socket 對象和客戶端進(jìn)行通信。和監(jiān)聽一個 socket 不同的是后者只用來授受新的連接請求
conn, addr = s.accept() with conn: print("Connected by", addr) while True: data = conn.recv(1024) if not data: break conn.sendall(data)
從 accept() 獲取客戶端 socket 連接對象 conn 后,使用一個無限 while 循環(huán)來阻塞調(diào)用 conn.recv(),無論客戶端傳過來什么數(shù)據(jù)都會使用 conn.sendall() 打印出來
如果 conn.recv() 方法返回一個空 byte 對象(b""),然后客戶端關(guān)閉連接,循環(huán)結(jié)束,with 語句和 conn 一起使用時,通信結(jié)束的時候會自動關(guān)閉 socket 鏈接
打印程序客戶端現(xiàn)在我們來看下客戶端的程序, echo-client.py:
#!/usr/bin/env python3 import socket HOST = "127.0.0.1" # 服務(wù)器的主機(jī)名或者 IP 地址 PORT = 65432 # 服務(wù)器使用的端口 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((HOST, PORT)) s.sendall(b"Hello, world") data = s.recv(1024) print("Received", repr(data))
與服務(wù)器程序相比,客戶端程序簡單很多。它創(chuàng)建了一個 socket 對象,連接到服務(wù)器并且調(diào)用 s.sendall() 方法發(fā)送消息,然后再調(diào)用 s.recv() 方法讀取服務(wù)器返回的內(nèi)容并打印出來
運行打印程序的客戶端和服務(wù)端讓我們運行打印程序的客戶端和服務(wù)端,觀察他們的表現(xiàn),看看發(fā)生了什么事情
如果你在運行示例代碼時遇到了問題,可以閱讀 如何使用 Python 開發(fā)命令行命令,如果
你使用的是 windows 操作系統(tǒng),請查看 Python Windows FAQ
打開命令行程序,進(jìn)入你的代碼所在的目錄,運行打印程序的服務(wù)端:
$ ./echo-server.py
你的命令行將被掛起,因為程序有一個阻塞調(diào)用
conn, addr = s.accept()
它將等待客戶端的連接,現(xiàn)在再打開一個命令行窗口運行打印程序的客戶端:
$ ./echo-client.py Received b"Hello, world"
在服務(wù)端的窗口你將看見:
$ ./echo-server.py Connected by ("127.0.0.1", 64623)
上面的輸出中,服務(wù)端打印出了 s.accept() 返回的 addr 元組,這就是客戶端的 IP 地址和 TCP 端口號。示例中的端口號是 64623 這很可能是和你機(jī)器上運行的結(jié)果不同
查看 socket 狀態(tài)想查找你主機(jī)上 socket 的當(dāng)前狀態(tài),可以使用 netstat 命令。這個命令在 macOS, Window, Linux 系統(tǒng)上默認(rèn)可用
下面這個就是啟動服務(wù)后 netstat 命令的輸出結(jié)果:
$ netstat -an Active Internet connections (including servers) Proto Recv-Q Send-Q Local Address Foreign Address (state) tcp4 0 0 127.0.0.1.65432 *.* LISTEN
注意本地地址是 127.0.0.1.65432,如果 echo-server.py 文件中 HOST 設(shè)置成空字符串 "" 的話,netstat 命令將顯示如下:
$ netstat -an Active Internet connections (including servers) Proto Recv-Q Send-Q Local Address Foreign Address (state) tcp4 0 0 *.65432 *.* LISTEN
本地地址是 *.65432,這表示所有主機(jī)支持的 IP 地址族都可以接受傳入連接,在我們的例子里面調(diào)用 socket() 時傳入的參數(shù) socket.AF_INET 表示使用了 IPv4 的 TCP socket,你可以在輸出結(jié)果中的 Proto 列中看到(tcp4)
上面的輸出是我截取的只顯示了咱們的打印程序服務(wù)端進(jìn)程,你可能會看到更多輸出,具體取決于你運行的系統(tǒng)。需要注意的是 Proto, Local Address 和 state 列。分別表示 TCP socket 類型、本地地址端口、當(dāng)前狀態(tài)
另外一個查看這些信息的方法是使用 lsof 命令,這個命令在 macOS 上是默認(rèn)安裝的,Linux 上需要你手動安裝
$ lsof -i -n COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME Python 67982 nathan 3u IPv4 0xecf272 0t0 TCP *:65432 (LISTEN)
isof 命令使用 -i 參數(shù)可以查看打開的 socket 連接的 COMMAND, PID(process id) 和 USER(user id),上面的輸出就是打印程序服務(wù)端
netstat 和 isof 命令有許多可用的參數(shù),這取決于你使用的操作系統(tǒng)??梢允褂?man page 來查看他們的使用文檔,這些文檔絕對值得花一點時間去了解,你將受益匪淺,macOS 和 Linux 中使用命令 man netstat 或者 man lsof 命令,windows 下使用 netstat /? 來查看幫助文檔
一個通常會犯的錯誤是在沒有監(jiān)聽 socket 端口的情況下嘗試連接:
$ ./echo-client.py Traceback (most recent call last): File "./echo-client.py", line 9, ins.connect((HOST, PORT)) ConnectionRefusedError: [Errno 61] Connection refused
也可能是端口號出錯、服務(wù)端沒啟動或者有防火墻阻止了連接,這些原因可能很難記住,或許你也會碰到 Connection timed out 的錯誤,記得給你的防火墻添加允許我們使用的端口規(guī)則
引用部分有一些常見的 錯誤
通信的流程分解讓我們再仔細(xì)的觀察下客戶端是如何與服務(wù)端進(jìn)行通信的:
當(dāng)使用回環(huán)地址時,數(shù)據(jù)將不會接觸到外部網(wǎng)絡(luò),上圖中,回環(huán)地址包含在了 host 里面。這就是回環(huán)地址的本質(zhì),連接數(shù)據(jù)傳輸是從本地到主機(jī),這就是為什么你會聽到有回環(huán)地址或者 127.0.0.1、::1 的 IP 地址和表示本地主機(jī)
應(yīng)用程序使用回環(huán)地址來與主機(jī)上的其它進(jìn)程通信,這使得它與外部網(wǎng)絡(luò)安全隔離。由于它是內(nèi)部的,只能從主機(jī)內(nèi)訪問,所以它不會被暴露出去
如果你的應(yīng)用程序服務(wù)器使用自己的專用數(shù)據(jù)庫(非公用的),則可以配置服務(wù)器僅監(jiān)聽回環(huán)地址,這樣的話網(wǎng)絡(luò)上的其它主機(jī)就無法連接到你的數(shù)據(jù)庫
如果你的應(yīng)用程序中使用的 IP 地址不是 127.0.0.1 或者 ::1,那就可能會綁定到連接到外部網(wǎng)絡(luò)的以太網(wǎng)上。這就是你通往 localhost 王國之外的其他主機(jī)的大門
這里需要小心,并且可能讓你感到難受甚至懷疑全世界。在你探索 localhost 的安全限制之前,確認(rèn)讀過 使用主機(jī)名 一節(jié)。 一個安全注意事項是 **不要使用主機(jī)名,要使用
IP 地址**
打印程序的服務(wù)端肯定有它自己的一些局限。這個程序只能服務(wù)于一個客戶端然后結(jié)束。打印程序的客戶端也有它自己的局限,但是還有一個問題,如果客戶端調(diào)用了下面的方法s.recv() 方法將返回 b"Hello, world" 中的一個字節(jié) b"H"
data = s.recv(1024)
1024 是緩沖區(qū)數(shù)據(jù)大小限制最大值參數(shù) bufsize,并不是說 recv() 方法只返回 1024個字節(jié)的內(nèi)容
send() 方法也是這個原理,它返回發(fā)送內(nèi)容的字節(jié)數(shù),結(jié)果可能小于傳入的發(fā)送內(nèi)容,你得處理這處情況,按需多次調(diào)用 send() 方法來發(fā)送完整的數(shù)據(jù)
應(yīng)用程序負(fù)責(zé)檢查是否已發(fā)送所有數(shù)據(jù);如果僅傳輸了一些數(shù)據(jù),則應(yīng)用程序需要嘗試傳
遞剩余數(shù)據(jù) 引用
我們可以使用 sendall() 方法來回避這個過程
和 send() 方法不一樣的是,sendall() 方法會一直發(fā)送字節(jié),只到所有的數(shù)據(jù)傳輸完成
或者中途出現(xiàn)錯誤。成功的話會返回 None 引用
到目前為止,我們有兩個問題:
如何同時處理多個連接請求
我們需要一直調(diào)用 send() 或者 recv() 直到所有數(shù)據(jù)傳輸完成
應(yīng)該怎么做呢,有很多方式可以實現(xiàn)并發(fā)。最近,有一個非常流程的庫叫做 Asynchronous I/O 可以實現(xiàn),asyncio 庫在 Python 3.4 后默認(rèn)添加到了標(biāo)準(zhǔn)庫里面。傳統(tǒng)的方法是使用線程
并發(fā)的問題是很難做到正確,有許多細(xì)微之處需要考慮和防范??赡芷渲幸粋€細(xì)節(jié)的問題都會導(dǎo)致整個程序崩潰
我說這些并不是想嚇跑你或者讓你遠(yuǎn)離學(xué)習(xí)和使用并發(fā)編程。如果你想讓程序支持大規(guī)模使用,使用多處理器、多核是很有必要的。然而在這個教程中我們將使用比線程更傳統(tǒng)的方法使得邏輯更容易推理。我們將使用一個非常古老的系統(tǒng)調(diào)用:select()
select() 允許你檢查多個 socket 的 I/O 完成情況,所以你可以使用它來檢測哪個 socket I/O 是就緒狀態(tài)從而執(zhí)行讀取或?qū)懭氩僮?,但是這是 Python,總會有更多其它的選擇,我們將使用標(biāo)準(zhǔn)庫中的selectors 模塊,所以我們使用了最有效的實現(xiàn),不用在意你使用的操作系統(tǒng):
這個模塊提供了高層且高效的 I/O 多路復(fù)用,基于原始的 select 模塊構(gòu)建,推薦用
戶使用這個模塊,除非他們需要精確到操作系統(tǒng)層面的使用控制 [引用
](https://docs.python.org/3/lib...
盡管如此,使用 select() 也無法并發(fā)執(zhí)行。這取決于您的工作負(fù)載,這種實現(xiàn)仍然會很快。這也取決于你的應(yīng)用程序?qū)B接所做的具體事情或者它需要支持的客戶端數(shù)量
asyncio 使用單線程來處理多任務(wù),使用事件循環(huán)來管理任務(wù)。通過使用 select(),我們可以創(chuàng)建自己的事件循環(huán),更簡單且同步化。當(dāng)使用多線程時,即使要處理并發(fā)的情況,我們也不得不面臨使用 CPython 或者 PyPy 中的「全局解析器鎖 GIL」,這有效地限制了我們可以并行完成的工作量
說這些是為了解析為什么使用 select() 可能是個更好的選擇,不要覺得你必須使用 asyncio、線程或最新的異步庫。通常,在網(wǎng)絡(luò)應(yīng)用程序中,你的應(yīng)用程序就是 I/O 綁定:它可以在本地網(wǎng)絡(luò)上,網(wǎng)絡(luò)另一端的端,磁盤上等待
如果你從客戶端收到啟動 CPU 綁定工作的請求,查看 concurrent.futures模塊,它包含一個 ProcessPoolExecutor 類,用來異步執(zhí)行進(jìn)程池中的調(diào)用
如果你使用多進(jìn)程,你的 Python 代碼將被操作系統(tǒng)并行地在不同處理器或者核心上調(diào)度運行,并且沒有全局解析器鎖。你可以通過
Python 大會上的演講 John Reese - Thinking Outside the GIL with AsyncIO and Multiprocessing - PyCon 2018 來了解更多的想法
在下一節(jié)中,我們將介紹解決這些問題的服務(wù)器和客戶端的示例。他們使用 select() 來同時處理多連接請求,按需多次調(diào)用 send() 和 recv()
多連接的客戶端和服務(wù)端下面兩節(jié)中,我們將使用 selectors 模塊中的 selector 對象來創(chuàng)建一個可以同時處理多個請求的客戶端和服務(wù)端
多連接的服務(wù)端首頁,我們來看眼多連接服務(wù)端程序的代碼,multiconn-server.py。這是開始建立監(jiān)聽 socket 部分
import selectors sel = selectors.DefaultSelector() # ... lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) lsock.bind((host, port)) lsock.listen() print("listening on", (host, port)) lsock.setblocking(False) sel.register(lsock, selectors.EVENT_READ, data=None)
這個程序和之前打印程序服務(wù)端最大的不同是使用了 lsock.setblocking(False) 配置 socket 為非阻塞模式,這個 socket 的調(diào)用將不在是阻塞的。當(dāng)它和 sel.select() 一起使用的時候(下面會提到),我們就可以等待 socket 就緒事件,然后執(zhí)行讀寫操作
sel.register() 使用 sel.select() 為你感興趣的事件注冊 socket 監(jiān)控,對于監(jiān)聽 socket,我們希望使用 selectors.EVENT_READ 讀取到事件
data 用來存儲任何你 socket 中想存的數(shù)據(jù),當(dāng) select() 返回的時候它也會返回。我們將使用 data 來跟蹤 socket 上發(fā)送或者接收的東西
下面就是事件循環(huán):
import selectors sel = selectors.DefaultSelector() # ... while True: events = sel.select(timeout=None) for key, mask in events: if key.data is None: accept_wrapper(key.fileobj) else: service_connection(key, mask)
sel.select(timeout=None) 調(diào)用會阻塞直到 socket I/O 就緒。它返回一個(key, events) 元組,每個 socket 一個。key 就是一個包含 fileobj 屬性的具名元組。key.fileobj 是一個 socket 對象,mask 表示一個操作就緒的事件掩碼
如果 key.data 為空,我們就可以知道它來自于監(jiān)聽 socket,我們需要調(diào)用 accept() 方法來授受連接請求。我們將使用一個 accept() 包裝函數(shù)來獲取新的 socket 對象并注冊到 selector 上,我們馬上就會看到
如果 key.data 不為空,我們就可以知道它是一個被接受的客戶端 socket,我們需要為它服務(wù),接著 service_connection() 會傳入 key 和 mask 參數(shù)并調(diào)用,這包含了所有我們需要在 socket 上操作的東西
讓我們一起來看看 accept_wrapper() 方法做了什么:
def accept_wrapper(sock): conn, addr = sock.accept() # Should be ready to read print("accepted connection from", addr) conn.setblocking(False) data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") events = selectors.EVENT_READ | selectors.EVENT_WRITE sel.register(conn, events, data=data)
由于監(jiān)聽 socket 被注冊到了 selectors.EVENT_READ 上,它現(xiàn)在就能被讀取,我們調(diào)用 sock.accept() 后立即再立即調(diào) conn.setblocking(False) 來讓 socket 進(jìn)入非阻塞模式
請記住,這是這個版本服務(wù)器程序的主要目標(biāo),因為我們不希望它被阻塞。如果被阻塞,那么整個服務(wù)器在返回前都處于掛起狀態(tài)。這意味著其它 socket 處于等待狀態(tài),這是一種 非常嚴(yán)重的 誰都不想見到的服務(wù)被掛起的狀態(tài)
接著我們使用了 types.SimpleNamespace 類創(chuàng)建了一個對象用來保存我們想要的 socket 和數(shù)據(jù),由于我們得知道客戶端連接什么時候可以寫入或者讀取,下面兩個事件都會被用到:
events = selectors.EVENT_READ | selectors.EVENT_WRITE
事件掩碼、socket 和數(shù)據(jù)對象都會被傳入 sel.register()
現(xiàn)在讓我們來看下,當(dāng)客戶端 socket 就緒的時候連接請求是如何使用 service_connection() 來處理的
def service_connection(key, mask): sock = key.fileobj data = key.data if mask & selectors.EVENT_READ: recv_data = sock.recv(1024) # Should be ready to read if recv_data: data.outb += recv_data else: print("closing connection to", data.addr) sel.unregister(sock) sock.close() if mask & selectors.EVENT_WRITE: if data.outb: print("echoing", repr(data.outb), "to", data.addr) sent = sock.send(data.outb) # Should be ready to write data.outb = data.outb[sent:]
這就是多連接服務(wù)端的核心部分,key 就是從調(diào)用 select() 方法返回的一個具名元組,它包含了 socket 對象「fileobj」和數(shù)據(jù)對象。mask 包含了就緒的事件
如果 socket 就緒而且可以被讀取, mask & selectors.EVENT_READ 就為真,sock.recv() 會被調(diào)用。所有讀取到的數(shù)據(jù)都會被追加到 data.outb 里面。隨后被發(fā)送出去
注意 else: 語句,如果沒有收到任何數(shù)據(jù):
if recv_data: data.outb += recv_data else: print("closing connection to", data.addr) sel.unregister(sock) sock.close()
這表示客戶端關(guān)閉了它的 socket 連接,這時服務(wù)端也應(yīng)該關(guān)閉自己的連接。不過別忘了先調(diào)用 sel.unregister() 來撤銷 select() 的監(jiān)控
當(dāng) socket 就緒而且可以被讀取的時候,對于正常的 socket 應(yīng)該一直是這種狀態(tài),任何接收并被 data.outb 存儲的數(shù)據(jù)都將使用 sock.send() 方法打印出來。發(fā)送出去的字節(jié)隨后就會被從緩沖中刪除
data.outb = data.outb[sent:]多連接的客戶端
現(xiàn)在讓我們一起來看看多連接的客戶端程序,multiconn-client.py,它和服務(wù)端很相似,不一樣的是它沒有監(jiān)聽連接請求,它以調(diào)用 start_connections() 開始初始化連接:
messages = [b"Message 1 from client.", b"Message 2 from client."] def start_connections(host, port, num_conns): server_addr = (host, port) for i in range(0, num_conns): connid = i + 1 print("starting connection", connid, "to", server_addr) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(False) sock.connect_ex(server_addr) events = selectors.EVENT_READ | selectors.EVENT_WRITE data = types.SimpleNamespace(connid=connid, msg_total=sum(len(m) for m in messages), recv_total=0, messages=list(messages), outb=b"") sel.register(sock, events, data=data)
num_conns 參數(shù)是從命令行讀取的,表示為服務(wù)器建立多少個鏈接。就像服務(wù)端程序一樣,每個 socket 都設(shè)置成了非阻塞模式
由于 connect() 方法會立即觸發(fā)一個 BlockingIOError 異常,所以我們使用 connect_ex() 方法取代它。connect_ex() 會返回一個錯誤指示 errno.EINPROGRESS,不像 connect() 方法直接在進(jìn)程中返回異常。一旦連接結(jié)束,socket 就可以進(jìn)行讀寫并且通過 select() 方法返回
socket 建立完成后,我們將使用 types.SimpleNamespace 類創(chuàng)建想會傳送的數(shù)據(jù)。由于每個連接請求都會調(diào)用 socket.send(),發(fā)送到服務(wù)端的消息得使用 list(messages) 方法轉(zhuǎn)換成列表結(jié)構(gòu)。所有你想了解的東西,包括客戶端將要發(fā)送的、已發(fā)送的、已接收的消息以及消息的總字節(jié)數(shù)都存儲在 data 對象中
讓我們再來看看 service_connection()。基本上和服務(wù)端一樣:
def service_connection(key, mask): sock = key.fileobj data = key.data if mask & selectors.EVENT_READ: recv_data = sock.recv(1024) # Should be ready to read if recv_data: print("received", repr(recv_data), "from connection", data.connid) data.recv_total += len(recv_data) if not recv_data or data.recv_total == data.msg_total: print("closing connection", data.connid) sel.unregister(sock) sock.close() if mask & selectors.EVENT_WRITE: if not data.outb and data.messages: data.outb = data.messages.pop(0) if data.outb: print("sending", repr(data.outb), "to connection", data.connid) sent = sock.send(data.outb) # Should be ready to write data.outb = data.outb[sent:]
有一個不同的地方,客戶端會跟蹤從服務(wù)器接收的字節(jié)數(shù),根據(jù)結(jié)果來決定是否關(guān)閉 socket 連接,服務(wù)端檢測到客戶端關(guān)閉則會同樣的關(guān)閉服務(wù)端的連接
運行多連接的客戶端和服務(wù)端現(xiàn)在讓我們把 multiconn-server.py 和 multiconn-client.py 兩個程序跑起來。他們都使用了命令行參數(shù),如果不指定參數(shù)可以看到參數(shù)調(diào)用的方法:
服務(wù)端程序,傳入主機(jī)和端口號
$ ./multiconn-server.py usage: ./multiconn-server.py
客戶端程序,傳入啟動服務(wù)端程序時同樣的主機(jī)和端口號以及連接數(shù)量
$ ./multiconn-client.py usage: ./multiconn-client.py
下面就是服務(wù)端程序運行起來在 65432 端口上監(jiān)聽回環(huán)地址的輸出:
$ ./multiconn-server.py 127.0.0.1 65432 listening on ("127.0.0.1", 65432) accepted connection from ("127.0.0.1", 61354) accepted connection from ("127.0.0.1", 61355) echoing b"Message 1 from client.Message 2 from client." to ("127.0.0.1", 61354) echoing b"Message 1 from client.Message 2 from client." to ("127.0.0.1", 61355) closing connection to ("127.0.0.1", 61354) closing connection to ("127.0.0.1", 61355)
下面是客戶端,它創(chuàng)建了兩個連接請求到上面的服務(wù)端:
$ ./multiconn-client.py 127.0.0.1 65432 2 starting connection 1 to ("127.0.0.1", 65432) starting connection 2 to ("127.0.0.1", 65432) sending b"Message 1 from client." to connection 1 sending b"Message 2 from client." to connection 1 sending b"Message 1 from client." to connection 2 sending b"Message 2 from client." to connection 2 received b"Message 1 from client.Message 2 from client." from connection 1 closing connection 1 received b"Message 1 from client.Message 2 from client." from connection 2 closing connection 2應(yīng)用程序客戶端和服務(wù)端
多連接的客戶端和服務(wù)端程序版本與最早的原始版本相比肯定有了很大的改善,但是讓我們再進(jìn)一步地解決上面「多連接」版本中的不足,然后完成最終版的實現(xiàn):客戶端/服務(wù)器應(yīng)用程序
我們希望有個客戶端和服務(wù)端在不影響其它連接的情況下做好錯誤處理,顯然,如果沒有發(fā)生異常,我們的客戶端和服務(wù)端不能崩潰的一團(tuán)糟。這也是到現(xiàn)在為止我們還沒討論的東西,我故意沒有引入錯誤處理機(jī)制因為這樣可以使之前的程序容易理解
現(xiàn)在你對基本的 API,非阻塞 socket、select() 等概念已經(jīng)有所了解了。我們可以繼續(xù)添加一些錯誤處理同時討論下「房間里面的大象」的問題,我把一些東西隱藏在了幕后。你應(yīng)該還記得,我在介紹中討論到的自定義類
首先,讓我們先解決錯誤:
所有的錯誤都會觸發(fā)異常,像無效參數(shù)類型和內(nèi)存不足的常見異??梢员粧伋?;從 Python
3.3 開始,與 socket 或地址語義相關(guān)的錯誤會引發(fā) OSError 或其子類之一的異常 引用
我們需要捕獲 OSError 異常。另外一個我沒提及的的問題是延遲,你將在文檔的很多地方看見關(guān)于延遲的討論,延遲會發(fā)生而且屬于「正?!瑰e誤。主機(jī)或者路由器重啟、交換機(jī)端口出錯、電纜出問題或者被拔出,你應(yīng)該在你的代碼中處理好各種各樣的錯誤
剛才說的「房間里面的大象」問題是怎么回事呢。就像 socket.SOCK_STREAM 這個參數(shù)的字面意思一樣,當(dāng)使用 TCP 連接時,你會從一個連續(xù)的字節(jié)流讀取的數(shù)據(jù),好比從磁盤上讀取數(shù)據(jù),不同的是你是從網(wǎng)絡(luò)讀取字節(jié)流
然而,和使用 f.seek() 讀文件不同,換句話說,沒法定位 socket 的數(shù)據(jù)流的位置,如果可以像文件一樣定位數(shù)據(jù)流的位置(使用下標(biāo)),那你就可以隨意的讀取你想要的數(shù)據(jù)
當(dāng)字節(jié)流入你的 socket 時,會需要有不同的網(wǎng)絡(luò)緩沖區(qū),如果想讀取他們就必須先保存到其它地方,使用 recv() 方法持續(xù)的從 socket 上讀取可用的字節(jié)流
相當(dāng)于你從 socket 中讀取的是一塊一塊的數(shù)據(jù),你必須使用 recv() 方法不斷的從緩沖區(qū)中讀取數(shù)據(jù),直到你的應(yīng)用確定讀取到了足夠的數(shù)據(jù)
什么時候算「足夠」這取決于你的定義,就 TCP socket 而言,它只通過網(wǎng)絡(luò)發(fā)送或接收原始字節(jié)。它并不了解這些原始字節(jié)的含義
這可以讓我們定義一個應(yīng)用層協(xié)議,什么是應(yīng)用層協(xié)議?簡單來說,你的應(yīng)用會發(fā)送或者接收消息,這些消息其實就是你的應(yīng)用程序的協(xié)議
換句話說,這些消息的長度、格式可以定義應(yīng)用程序的語義和行為,這和我們之前說的從socket 中讀取字節(jié)部分內(nèi)容相關(guān),當(dāng)你使用 recv() 來讀取字節(jié)的時候,你需要知道讀的字節(jié)數(shù),并且決定什么時候算讀取完成
這些都是怎么完成的呢?一個方法是只讀取固定長度的消息,如果它們的長度總是一樣的話,這樣做很容易。當(dāng)你收到固定長度字節(jié)消息的時候,就能確定它是個完整的消息
然而,如果你使用定長模式來發(fā)送比較短的消息會比較低效,因為你還得處理填充剩余的部分,此外,你還得處理數(shù)據(jù)不適合放在一個定長消息里面的情況
在這個教程里面,我們將使用一個通用的方案,很多協(xié)議都會用到它,包括 HTTP。我們將在每條消息前面追加一個頭信息,頭信息中包括消息的長度和其它我們需要的字段。這樣做的話我們只需要追蹤頭信息,當(dāng)我們讀到頭信息時,就可以查到消息的長度并且讀出所有字節(jié)然后消費它
我們將通過使用一個自定義類來實現(xiàn)接收文本/二進(jìn)制數(shù)據(jù)。你可以在此基礎(chǔ)上做出改進(jìn)或者通過繼承這個類來擴(kuò)展你的應(yīng)用程序。重要的是你將看到一個例子實現(xiàn)它的過程
我將會提到一些關(guān)于 socket 和字節(jié)相關(guān)的東西,就像之前討論過的。當(dāng)你通過 socket 來發(fā)送或者接收數(shù)據(jù)時,其實你發(fā)送或者接收到的是原始字節(jié)
如果你收到數(shù)據(jù)并且想讓它在一個多字節(jié)解釋的上下文中使用,比如說 4-byte 的整形,你需要考慮它可能是一種不是你機(jī)器 CPU 本機(jī)的格式??蛻舳嘶蛘叻?wù)器的另外一頭可能是另外一種使用了不同的字節(jié)序列的 CPU,這樣的話,你就得把它們轉(zhuǎn)換成你主機(jī)的本地字節(jié)序列來使用
上面所說的字節(jié)順序就是 CPU 的 字節(jié)序,在引用部分的字節(jié)序 一節(jié)可以查看更多。我們將會利用 Unicode 字符集的優(yōu)點來規(guī)避這個問題,并使用UTF-8 的方式編碼,由于 UTF-8 使用了 8字節(jié) 編碼方式,所以就不會有字節(jié)序列的問題
你可以查看 Python 關(guān)于編碼與 Unicode 的 文檔,注意我們只會編碼消息的頭部。我們將使用嚴(yán)格的類型,發(fā)送的消息編碼格式會在頭信息中定義。這將讓我們可以傳輸我們覺得有用的任意類型/格式數(shù)據(jù)
你可以通過調(diào)用 sys.byteorder 來決定你的機(jī)器的字節(jié)序列,比如在我的英特爾筆記本上,運行下面的代碼就可以:
$ python3 -c "import sys; print(repr(sys.byteorder))" "little"
如果我把這段代碼跑在可以模擬大字節(jié)序 CPU「PowerPC」的虛擬機(jī)上的話,應(yīng)該是下面的結(jié)果:
$ python3 -c "import sys; print(repr(sys.byteorder))" "big"
在我們的例子程序中,應(yīng)用層的協(xié)議定義了使用 UTF-8 方式編碼的 Unicode 字符。對于真正傳輸消息來說,如果需要的話你還是得手動交換字節(jié)序列
這取決于你的應(yīng)用,是否需要它來處理不同終端間的多字節(jié)二進(jìn)制數(shù)據(jù),你可以通過添加額外的頭信息來讓你的客戶端或者服務(wù)端支持二進(jìn)制,像 HTTP 一樣,把頭信息做為參數(shù)傳進(jìn)去
不用擔(dān)心自己還沒搞懂上面的東西,下面一節(jié)我們看到是如果實現(xiàn)的
應(yīng)用的協(xié)議頭讓我們來定義一個完整的協(xié)議頭:
可變長度的文本
基于 UTF-8 編碼的 Unicode 字符集
使用 JSON 序列化的一個 Python 字典
其中必須具有的頭應(yīng)該有以下幾個:
名稱 | 描述 |
---|---|
byteorder | 機(jī)器的字節(jié)序列(uses sys.byteorder),應(yīng)用程序可能用不上 |
content-length | 內(nèi)容的字節(jié)長度 |
content-type | 內(nèi)容的類型,比如 text/json 或者 binary/my-binary-type |
content-encoding | 內(nèi)容的編碼類型,比如 utf-8 編碼的 Unicode 文本,二進(jìn)制數(shù)據(jù) |
這些頭信息告訴接收者消息數(shù)據(jù),這樣的話你就可以通過提供給接收者足夠的信息讓他接收到數(shù)據(jù)的時候正確的解碼的方式向它發(fā)送任何數(shù)據(jù),由于頭信息是字典格式,你可以隨意向頭信息中添加鍵值對
發(fā)送應(yīng)用程序消息不過還有一個問題,由于我們使用了變長的頭信息,雖然方便擴(kuò)展但是當(dāng)你使用 recv() 方法讀取消息的時候怎么知道頭信息的長度呢
我們前面講到過使用 recv() 接收數(shù)據(jù)和如何確定是否接收完成,我說過定長的頭可能會很低效,的確如此。但是我們將使用一個比較小的 2 字節(jié)定長的頭信息前綴來表示頭信息的長度
你可以認(rèn)為這是一種混合的發(fā)送消息的實現(xiàn)方法,我們通過發(fā)送頭信息長度來引導(dǎo)接收者,方便他們解析消息體
為了給你更好地解釋消息格式,讓我們來看看消息的全貌:
消息以 2字節(jié)的固定長度的頭開始,這兩個字節(jié)是整型的網(wǎng)絡(luò)字節(jié)序列,表示下面的變長 JSON 頭信息的長度,當(dāng)我們從 recv() 方法讀取到 2 個字節(jié)時就知道它表示的是頭信息長度的整形數(shù)字,然后在解碼 JSON 頭之前讀取這么多的字節(jié)
JSON 頭包含了頭信息的字典。其中一個就是 content-length,這表示消息內(nèi)容的數(shù)量(不是JSON頭),當(dāng)我們使用 recv() 方法讀取到了 content-length 個字節(jié)的數(shù)據(jù)時,就表示接收完成并且讀取到了完整的消息
應(yīng)用程序類最后讓我們來看下成果,我們使用了一個消息類。來看看它是如何在 socket 發(fā)生讀寫事件時與 select() 配合使用的
對于這個示例應(yīng)用程序而言,我必須想出客戶端和服務(wù)器將使用什么類型的消息,從這一點來講這遠(yuǎn)遠(yuǎn)超過了最早時候我們寫的那個玩具一樣的打印程序
為了保證程序簡單而且仍然能夠演示出它是如何在一個真正的程序中工作的,我創(chuàng)建了一個應(yīng)用程序協(xié)議用來實現(xiàn)基本的搜索功能。客戶端發(fā)送一個搜索請求,服務(wù)器做一次匹配的查找,如果客戶端的請求沒法被識別成搜索請求,服務(wù)器就會假定這個是二進(jìn)制請求,對應(yīng)的返回二進(jìn)制響應(yīng)
跟著下面一節(jié),運行示例、用代碼做實驗后你將會知道他是如何工作的,然后你就可以以這個消息類為起點把他修改成適合自己使用的
就像我們之前討論的,你將在下面看到,處理 socket 時需要保存狀態(tài)。通過使用類,我們可以將所有的狀態(tài)、數(shù)據(jù)和代碼打包到一個地方。當(dāng)連接開始或者接受的時候消息類就會為每個 socket 創(chuàng)建一個實例
類中的很多包裝方法、工具方法在客戶端和服務(wù)端上都是差不多的。它們以下劃線開頭,就像 Message._json_encode() 一樣,這些方法通過類使用起來很簡單。這使得它們在其它方法中調(diào)用時更短,而且符合 DRY 原則
消息類的服務(wù)端程序本質(zhì)上和客戶端一樣。不同的是客戶端初始化連接并發(fā)送請求消息,隨后要處理服務(wù)端返回的內(nèi)容。而服務(wù)端則是等待連接請求,處理客戶端的請求消息,隨后發(fā)送響應(yīng)消息
看起來就像這樣:
步驟 | 端 | 動作/消息內(nèi)容 |
---|---|---|
1 | 客戶端 | 發(fā)送帶有請求內(nèi)容的消息 |
2 | 服務(wù)端 | 接收并處理請求消息 |
3 | 服務(wù)端 | 發(fā)送有響應(yīng)內(nèi)容的消息 |
4 | 客戶端 | 接收并處理響應(yīng)消息 |
下面是代碼的結(jié)構(gòu):
應(yīng)用程序 | 文件 | 代碼 |
---|---|---|
服務(wù)端 | app-server.py | 服務(wù)端主程序 |
服務(wù)端 | libserver.py | 服務(wù)端消息類 |
客戶端 | app-client.py | 客戶端主程序 |
客戶端 | libclient.py | 客戶端消息類 |
我想通過首先提到它的設(shè)計方面來討論 Message 類的工作方式,不過這對我來說并不是立馬就能解釋清楚的,只有在重構(gòu)它至少五次之后我才能達(dá)到它目前的狀態(tài)。為什么呢?因為要管理狀態(tài)
當(dāng)消息對象創(chuàng)建的時候,它就被一個使用 selector.register() 事件監(jiān)控起來的 socket 關(guān)聯(lián)起來了
message = libserver.Message(sel, conn, addr) sel.register(conn, selectors.EVENT_READ, data=message)
注意,這一節(jié)中的一些代碼來自服務(wù)端主程序與消息類,但是這部分內(nèi)容的討論在客戶端
也是一樣的,我將在他們之間存在不同點的時候來解釋客戶端的版本
當(dāng) socket 上的事件就緒的時候,它就會被 selector.select() 方法返回。對過 key 對象的 data 屬性獲取到 message 的引用,然后在消息用調(diào)用一個方法:
while True: events = sel.select(timeout=None) for key, mask in events: # ... message = key.data message.process_events(mask)
觀察上面的事件循環(huán),可以看見 sel.select() 位于「司機(jī)位置」,它是阻塞的,在循環(huán)的上面等待。當(dāng) socket 上的讀寫事件就緒時,它就會為其服務(wù)。這表示間接的它也要負(fù)責(zé)調(diào)用 process_events() 方法。這就是我說 process_events() 方法是入口的原因
讓我們來看下 process_events() 方法做了什么
def process_events(self, mask): if mask & selectors.EVENT_READ: self.read() if mask & selectors.EVENT_WRITE: self.write()
這樣做很好,因為 process_events() 方法很簡潔,它只可以做兩件事情:調(diào)用 read() 和 write() 方法
這又把我們帶回了狀態(tài)管理的問題。在幾次重構(gòu)后,我決定如果別的方法依賴于狀態(tài)變量里面的某個確定值,那么它們就只應(yīng)該從 read() 和 write() 方法中調(diào)用,這將使處理socket 事件的邏輯盡量的簡單
可能說起來很簡單,但是經(jīng)歷了前面幾次類的迭代:混合了一些方法,檢查當(dāng)前狀態(tài)、依賴于其它值、在 read() 或者 write() 方法外面調(diào)用處理數(shù)據(jù)的方法,最后這證明了這樣管理起來很復(fù)雜
當(dāng)然,你肯定需要把類按你自己的需求修改使它能夠符合你的預(yù)期,但是我建議你盡可能把狀態(tài)檢查、依賴狀態(tài)的調(diào)用的邏輯放在 read() 和 write() 方法里面
讓我們來看看 read() 方法,這是服務(wù)端版本,但是客戶端也是一樣的。不同之處在于方法名稱,一個(客戶端)是 process_response() 另一個(服務(wù)端)是 process_request()
def read(self): self._read() if self._jsonheader_len is None: self.process_protoheader() if self._jsonheader_len is not None: if self.jsonheader is None: self.process_jsonheader() if self.jsonheader: if self.request is None: self.process_request()
_read() 方法首頁被調(diào)用,然后調(diào)用 socket.recv() 從 socket 讀取數(shù)據(jù)并存入到接收緩沖區(qū)
記住,當(dāng)調(diào)用 socket.recv() 方法時,組成消息的所有數(shù)據(jù)并沒有一次性全部到達(dá)。socket.recv() 方法可能需要調(diào)用很多次,這就是為什么在調(diào)用相關(guān)方法處理數(shù)據(jù)前每次都要檢查狀態(tài)
當(dāng)一個方法開始處理消息時,首頁要檢查的就是接受緩沖區(qū)保存了足夠的多讀取的數(shù)據(jù),如果確定,它們將繼續(xù)處理各自的數(shù)據(jù),然后把數(shù)據(jù)存到其它流程可能會用到的變量上,并且清空自己的緩沖區(qū)。由于一個消息有三個組件,所以會有三個狀態(tài)檢查和處理方法的調(diào)用:
Message Component | Method | Output |
---|---|---|
Fixed-length header | process_protoheader() | self._jsonheader_len |
JSON header | process_jsonheader() | self.jsonheader |
Content | process_request() | self.request |
接下來,讓我們一起看看 write() 方法,這是服務(wù)端的版本:
def write(self): if self.request: if not self.response_created: self.create_response() self._write()
write() 方法會首先檢測是否有請求,如果有而且響應(yīng)還沒被創(chuàng)建的話 create_response() 方法就會被調(diào)用,它會設(shè)置狀態(tài)變量 response_created,然后為發(fā)送緩沖區(qū)寫入響應(yīng)
如果發(fā)送緩沖區(qū)有數(shù)據(jù),write() 方法會調(diào)用 socket.send() 方法
記住,當(dāng) socket.send() 被調(diào)用時,所有發(fā)送緩沖區(qū)的數(shù)據(jù)可能還沒進(jìn)入到發(fā)送隊列,socket 的網(wǎng)絡(luò)緩沖區(qū)可能滿了,socket.send() 可能需要重新調(diào)用,這就是為什么需要檢查狀態(tài)的原因,create_response() 應(yīng)該只被調(diào)用一次,但是 _write() 方法需要調(diào)用多次
客戶端的 write() 版大體與服務(wù)端一致:
def write(self): if not self._request_queued: self.queue_request() self._write() if self._request_queued: if not self._send_buffer: # Set selector to listen for read events, we"re done writing. self._set_selector_events_mask("r")
因為客戶端首頁初始化了一個連接請求到服務(wù)端,狀態(tài)變量_request_queued被檢查。如果請求還沒加入到隊列,就調(diào)用 queue_request() 方法創(chuàng)建一個請求寫入到發(fā)送緩沖區(qū)中,同時也會使用變量 _request_queued 記錄狀態(tài)值防止多次調(diào)用
就像服務(wù)端一樣,如果發(fā)送緩沖區(qū)有數(shù)據(jù) _write() 方法會調(diào)用 socket.send() 方法
需要注意客戶端版本的 write() 方法與服務(wù)端不同之處在于最后的請求是否加入到隊列中的檢查,這個我們將在客戶端主程序中詳細(xì)解釋,原因是要告訴 selector.select()停止監(jiān)控 socket 的寫入事件而且我們只對讀取事件感興趣,沒有辦法通知套接字是可寫的
我將在這一節(jié)中留下一個懸念,這一節(jié)的主要目的是解釋 selector.select() 方法是如何通過 process_events() 方法調(diào)用消息類以及它是如何工作的
這一點很重要,因為 process_events() 方法在連接的生命周期中將被調(diào)用很多次,因此,要確保那些只能被調(diào)用一次的方法正常工作,這些方法中要么需要檢查自己的狀態(tài)變量,要么需要檢查調(diào)用者的方法中的狀態(tài)變量
服務(wù)端主程序在服務(wù)端主程序 app-server.py 中,主機(jī)、端口參數(shù)是通過命令行傳遞給程序的:
$ ./app-server.py usage: ./app-server.py
例如需求監(jiān)聽本地回環(huán)地址上面的 65432 端口,需要執(zhí)行:
$ ./app-server.py 127.0.0.1 65432 listening on ("127.0.0.1", 65432)
創(chuàng)建完 socket 后,一個傳入?yún)?shù) socket.SO_REUSEADDR 的方法 `to
socket.setsockopt()` 將被調(diào)用
# Avoid bind() exception: OSError: [Errno 48] Address already in use lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
設(shè)置這個參數(shù)是為了避免 端口被占用 的錯誤發(fā)生,如果當(dāng)前程序使用的端口和之前的程序使用的一樣,你就會發(fā)現(xiàn)連接處于 TIME_WAIT 狀態(tài)
比如說,如果服務(wù)器主動關(guān)閉連接,服務(wù)器會保持為大概兩分鐘的 TIME_WAIT 狀態(tài),具體時長取決于你的操作系統(tǒng)。如果你想在兩分鐘內(nèi)再開啟一個服務(wù),你將得到一個OSError 表示 端口被戰(zhàn)勝,這樣做是為了確保一些在途的數(shù)據(jù)包正確的被處理
事件循環(huán)會捕捉所有錯誤,以保證服務(wù)器正常運行:
while True: events = sel.select(timeout=None) for key, mask in events: if key.data is None: accept_wrapper(key.fileobj) else: message = key.data try: message.process_events(mask) except Exception: print("main: error: exception for", f"{message.addr}: {traceback.format_exc()}") message.close()
當(dāng)服務(wù)器接受到一個客戶端連接時,消息對象就會被創(chuàng)建:
def accept_wrapper(sock): conn, addr = sock.accept() # Should be ready to read print("accepted connection from", addr) conn.setblocking(False) message = libserver.Message(sel, conn, addr) sel.register(conn, selectors.EVENT_READ, data=message)
消息對象會通過 sel.register() 方法關(guān)聯(lián)到 socket 上,而且它初始化就被設(shè)置成了只監(jiān)控讀事件。當(dāng)請求被讀取時,我們將通過監(jiān)聽到的寫事件修改它
在服務(wù)器端采用這種方法的一個優(yōu)點是,大多數(shù)情況下,當(dāng) socket 正常并且沒有網(wǎng)絡(luò)問題時,它始終是可寫的
如果我們告訴 sel.register() 方法監(jiān)控 EVENT_WRITE 寫入事件,事件循環(huán)將會立即喚醒并通知我們這種情況,然而此時 socket 并不用喚醒調(diào)用 send() 方法。由于請求還沒被處理,所以不需要發(fā)回響應(yīng)。這將消耗并浪費寶貴的 CPU 周期
服務(wù)端消息類在消息切入點一節(jié)中,當(dāng)通過 process_events() 知道 socket 事件就緒時我們可以看到消息對象是如何發(fā)出動作的?,F(xiàn)在讓我們來看看當(dāng)數(shù)據(jù)在 socket 上被讀取是會發(fā)生些什么,以及為服務(wù)器就緒的消息的組件片段發(fā)生了什么
libserver.py 文件中的服務(wù)端消息類,可以在 Github 上找到 源代碼
這些方法按照消息處理順序出現(xiàn)在類中
當(dāng)服務(wù)器讀取到至少兩個字節(jié)時,定長頭的邏輯就可以開始了
def process_protoheader(self): hdrlen = 2 if len(self._recv_buffer) >= hdrlen: self._jsonheader_len = struct.unpack(">H", self._recv_buffer[:hdrlen])[0] self._recv_buffer = self._recv_buffer[hdrlen:]
網(wǎng)絡(luò)字節(jié)序列中的定長整型兩字節(jié)包含了 JSON 頭的長度,struct.unpack() 方法用來讀取并解碼,然后保存在 self._jsonheader_len 中,當(dāng)這部分消息被處理完成后,就要調(diào)用 process_protoheader() 方法來刪除接收緩沖區(qū)中處理過的消息
就像上面的定長頭的邏輯一樣,當(dāng)接收緩沖區(qū)有足夠的 JSON 頭數(shù)據(jù)時,它也需要被處理:
def process_jsonheader(self): hdrlen = self._jsonheader_len if len(self._recv_buffer) >= hdrlen: self.jsonheader = self._json_decode(self._recv_buffer[:hdrlen], "utf-8") self._recv_buffer = self._recv_buffer[hdrlen:] for reqhdr in ("byteorder", "content-length", "content-type", "content-encoding"): if reqhdr not in self.jsonheader: raise ValueError(f"Missing required header "{reqhdr}".")
self._json_decode() 方法用來解碼并反序列化 JSON 頭成一個字典。由于我們定義的 JSON 頭是 utf-8 格式的,所以解碼方法調(diào)用時我們寫死了這個參數(shù),結(jié)果將被存放在 self.jsonheader 中,process_jsonheader 方法做完他應(yīng)該做的事情后,同樣需要刪除接收緩沖區(qū)中處理過的消息
接下來就是真正的消息內(nèi)容,當(dāng)接收緩沖區(qū)有 JSON 頭中定義的 content-length 值的數(shù)量個字節(jié)時,請求就應(yīng)該被處理了:
def process_request(self): content_len = self.jsonheader["content-length"] if not len(self._recv_buffer) >= content_len: return data = self._recv_buffer[:content_len] self._recv_buffer = self._recv_buffer[content_len:] if self.jsonheader["content-type"] == "text/json": encoding = self.jsonheader["content-encoding"] self.request = self._json_decode(data, encoding) print("received request", repr(self.request), "from", self.addr) else: # Binary or unknown content-type self.request = data print(f"received {self.jsonheader["content-type"]} request from", self.addr) # Set selector to listen for write events, we"re done reading. self._set_selector_events_mask("w")
把消息保存到 data 變量中后,process_request() 又會刪除接收緩沖區(qū)中處理過的數(shù)據(jù)。接著,如果 content type 是 JSON 的話,它將解碼并反序列化數(shù)據(jù)。否則(在我們的例子中)數(shù)據(jù)將被視 做二進(jìn)制數(shù)據(jù)并打印出來
最后 process_request() 方法會修改 selector 為只監(jiān)控寫入事件。在服務(wù)端的程序 app-server.py 中,socket 初始化被設(shè)置成僅監(jiān)控讀事件。現(xiàn)在請求已經(jīng)被全部處理完了,我們對讀取事件就不感興趣了
現(xiàn)在就可以創(chuàng)建一個響應(yīng)寫入到 socket 中。當(dāng) socket 可寫時 create_response() 將被從 write() 方法中調(diào)用:
def create_response(self): if self.jsonheader["content-type"] == "text/json": response = self._create_response_json_content() else: # Binary or unknown content-type response = self._create_response_binary_content() message = self._create_message(**response) self.response_created = True self._send_buffer += message
響應(yīng)會根據(jù)不同的 content type 的不同而調(diào)用不同的方法創(chuàng)建。在這個例子中,當(dāng) action == "search" 的時候會執(zhí)行一個簡單的字典查找。你可以在這個地方添加你自己的處理方法并調(diào)用
一個不好處理的問題是響應(yīng)寫入完成時如何關(guān)閉連接,我會在 _write() 方法中調(diào)用 close()
def _write(self): if self._send_buffer: print("sending", repr(self._send_buffer), "to", self.addr) try: # Should be ready to write sent = self.sock.send(self._send_buffer) except BlockingIOError: # Resource temporarily unavailable (errno EWOULDBLOCK) pass else: self._send_buffer = self._send_buffer[sent:] # Close when the buffer is drained. The response has been sent. if sent and not self._send_buffer: self.close()
雖然close() 方法的調(diào)用有點隱蔽,但是我認(rèn)為這是一種權(quán)衡。因為消息類一個連接只處理一條消息。寫入響應(yīng)后,服務(wù)器無需執(zhí)行任何操作。它的任務(wù)就完成了
客戶端主程序客戶端主程序 app-client.py 中,參數(shù)從命令行中讀取,用來創(chuàng)建請求并連接到服務(wù)端
$ ./app-client.py usage: ./app-client.py
來個示例演示一下:
$ ./app-client.py 127.0.0.1 65432 search needle
當(dāng)從命令行參數(shù)創(chuàng)建完一個字典來表示請求后,主機(jī)、端口、請求字典一起被傳給 start_connection()
def start_connection(host, port, request): addr = (host, port) print("starting connection to", addr) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(False) sock.connect_ex(addr) events = selectors.EVENT_READ | selectors.EVENT_WRITE message = libclient.Message(sel, sock, addr, request) sel.register(sock, events, data=message)
對服務(wù)器的 socket 連接被創(chuàng)建,消息對象被傳入請求字典并創(chuàng)建
和服務(wù)端一樣,消息對象在 sel.register() 方法中被關(guān)聯(lián)到 socket 上。然而,客戶端不同的是,socket 初始化的時候會監(jiān)控讀寫事件,一旦請求被寫入,我們將會修改為只監(jiān)控讀取事件
這種實現(xiàn)和服務(wù)端一樣有好處:不浪費 CPU 生命周期。請求發(fā)送完成后,我們就不關(guān)注寫入事件了,所以不用保持狀態(tài)等待處理
客戶端消息類在 消息入口點 一節(jié)中,我們看到過,當(dāng) socket 使用準(zhǔn)備就緒時,消息對象是如何調(diào)用具體動作的。現(xiàn)在讓我們來看看 socket 上的數(shù)據(jù)是如何被讀寫的,以及消息準(zhǔn)備好被加工的時候發(fā)生了什么
客戶端消息類在 libclient.py 文件中,可以在 Github 上找到 源代碼
這些方法按照消息處理順序出現(xiàn)在類中
客戶端的第一個任務(wù)就是讓請求入隊列:
def queue_request(self): content = self.request["content"] content_type = self.request["type"] content_encoding = self.request["encoding"] if content_type == "text/json": req = { "content_bytes": self._json_encode(content, content_encoding), "content_type": content_type, "content_encoding": content_encoding } else: req = { "content_bytes": content, "content_type": content_type, "content_encoding": content_encoding } message = self._create_message(**req) self._send_buffer += message self._request_queued = True
用來創(chuàng)建請求的字典,取決于客戶端程序 app-client.py 中傳入的命令行參數(shù),當(dāng)消息對象創(chuàng)建的時候,請求字典被當(dāng)做參數(shù)傳入
請求消息被創(chuàng)建并追加到發(fā)送緩沖區(qū)中,消息將被 _write() 方法發(fā)送,狀態(tài)參數(shù) self._request_queued 被設(shè)置,這使 queue_request() 方法不會被重復(fù)調(diào)用
請求發(fā)送完成后,客戶端就等待服務(wù)器的響應(yīng)
客戶端讀取和處理消息的方法和服務(wù)端一致,由于響應(yīng)數(shù)據(jù)是從 socket 上讀取的,所以處理 header 的
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/42513.html
摘要:個高級多線程面試題及回答后端掘金在任何面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。目前在生產(chǎn)環(huán)基于的技術(shù)問答網(wǎng)站系統(tǒng)實現(xiàn)后端掘金這一篇博客將詳細(xì)介紹一個基于的問答網(wǎng)站的實現(xiàn),有詳細(xì)的代碼。 15 個高級 Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺資訊職位,那么你應(yīng)該準(zhǔn)備很多...
摘要:個高級多線程面試題及回答后端掘金在任何面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。目前在生產(chǎn)環(huán)基于的技術(shù)問答網(wǎng)站系統(tǒng)實現(xiàn)后端掘金這一篇博客將詳細(xì)介紹一個基于的問答網(wǎng)站的實現(xiàn),有詳細(xì)的代碼。 15 個高級 Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺資訊職位,那么你應(yīng)該準(zhǔn)備很多...
摘要:做一個搬運工,希望自己能努力學(xué)習(xí),也希望大神們的東西能讓更多的人看到不斷更新更新日志新增了網(wǎng)絡(luò)安全分類,整理了排版布局新增了的鏈接,將一些雜七雜八的東西弄到了一篇新文章上了,叫做積累與雜貨鋪一以及相關(guān)教程的規(guī)范與相關(guān)中文學(xué)習(xí)大本營中文文檔簡 做一個搬運工,希望自己能努力學(xué)習(xí),也希望大神們的東西能讓更多的人看到 不斷更新 更新日志:2017.10.13 新增了網(wǎng)絡(luò)安全分類,整理了排版布局...
閱讀 2905·2021-11-22 13:54
閱讀 3541·2021-11-16 11:44
閱讀 1381·2021-09-07 10:19
閱讀 1483·2019-08-29 17:30
閱讀 3206·2019-08-29 11:33
閱讀 3554·2019-08-26 12:18
閱讀 2894·2019-08-26 11:53
閱讀 1347·2019-08-26 10:47