摘要:主程序通過喚起子程序并傳入數(shù)據(jù),子程序處理完后,用將自己掛起,并返回主程序,如此交替進(jìn)行。通過輪詢或是等事件框架,捕獲返回的事件。從消息隊(duì)列中取出記錄,恢復(fù)協(xié)程函數(shù)。然而事實(shí)上只有直接操縱的協(xié)程函數(shù)才有可能接觸到這個(gè)對象。
寫在前面首發(fā)于 我的博客 轉(zhuǎn)載請注明出處
本文默認(rèn)讀者對 Python 生成器 有一定的了解,不了解者請移步至生成器 - 廖雪峰的官方網(wǎng)站。
本文基于 Python 3.5.1,文中所有的例子都可在 Github 上獲得。
學(xué)過 Python 的都知道,Python 里有一個(gè)很厲害的概念叫做 生成器(Generators)。一個(gè)生成器就像是一個(gè)微小的線程,可以隨處暫停,也可以隨時(shí)恢復(fù)執(zhí)行,還可以和代碼塊外部進(jìn)行數(shù)據(jù)交換。恰當(dāng)使用生成器,可以極大地簡化代碼邏輯。
也許,你可以熟練地使用生成器完成一些看似不可能的任務(wù),如“無窮斐波那契數(shù)列”,并引以為豪,認(rèn)為所謂的生成器也不過如此——那我可要告訴你:這些都太小兒科了,下面我所要介紹的絕對會讓你大開眼界。
生成器 可以實(shí)現(xiàn) 協(xié)程,你相信嗎?
什么是協(xié)程在異步編程盛行的今天,也許你已經(jīng)對 協(xié)程(coroutines) 早有耳聞,但卻不一定了解它。我們先來看看 Wikipedia 的定義:
Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.
也就是說:協(xié)程是一種 允許在特定位置暫?;蚧謴?fù)的子程序——這一點(diǎn)和 生成器 相似。但和 生成器 不同的是,協(xié)程 可以控制子程序暫停之后代碼的走向,而 生成器 僅能被動(dòng)地將控制權(quán)交還給調(diào)用者。
協(xié)程 是一種很實(shí)用的技術(shù)。和 多進(jìn)程 與 多線程 相比,協(xié)程 可以只利用一個(gè)線程更加輕便地實(shí)現(xiàn) 多任務(wù),將任務(wù)切換的開銷降至最低。和 回調(diào) 等其他異步技術(shù)相比,協(xié)程 維持了正常的代碼流程,在保證代碼可讀性的同時(shí)最大化地利用了 阻塞 IO 的空閑時(shí)間。它的高效與簡潔贏得了開發(fā)者們的擁戴。
Python 中的協(xié)程早先 Python 是沒有原生協(xié)程支持的,因此在 協(xié)程 這個(gè)領(lǐng)域出現(xiàn)了百家爭鳴的現(xiàn)象。主流的實(shí)現(xiàn)由以下兩種:
用 C 實(shí)現(xiàn)協(xié)程調(diào)度。這一派以 gevent 為代表,在底層實(shí)現(xiàn)了協(xié)程調(diào)度,并將大部分的 阻塞 IO 重寫為異步。
用 生成器模擬。這一派以 Tornado 為代表。Tornado 是一個(gè)老牌的異步 Web 框架,涵蓋了五花八門的異步編程方式,其中包括 協(xié)程。本文部分代碼借鑒于 Tornado。
直至 Python 3.4,Python 第一次將異步編程納入標(biāo)準(zhǔn)庫中(參見 PEP 3156),其中包括了用生成器模擬的 協(xié)程。而在 Python 3.5 中,Guido 總算在語法層面上實(shí)現(xiàn)了 協(xié)程(參見 PEP 0492)。比起 yield 關(guān)鍵字,新關(guān)鍵字 async 和 await 具有更好的可讀性。在不久的將來,新的實(shí)現(xiàn)將會慢慢統(tǒng)一混亂已久的協(xié)程領(lǐng)域。
盡管 生成器協(xié)程 已成為了過去時(shí),但它曾經(jīng)的輝煌卻不可磨滅。下面,讓我們一起來探索其中的魔法。
一個(gè)簡單的例子假設(shè)有兩個(gè)子程序 main 和 printer。printer 是一個(gè)死循環(huán),等待輸入、加工并輸出結(jié)果。main 作為主程序,不時(shí)地向 printer 發(fā)送數(shù)據(jù)。
這應(yīng)該怎么實(shí)現(xiàn)呢?
傳統(tǒng)方式中,這幾乎不可能在一個(gè)線程中實(shí)現(xiàn),因?yàn)樗姥h(huán)會阻塞。而協(xié)程卻能很好地解決這個(gè)問題:
def printer(): counter = 0 while True: string = (yield) print("[{0}] {1}".format(counter, string)) counter += 1 if __name__ == "__main__": p = printer() next(p) p.send("Hi") p.send("My name is hsfzxjy.") p.send("Bye!")
輸出:
[0] Hi [1] My name is hsfzxjy. [2] Bye!
這其實(shí)就是最簡單的協(xié)程。程序由兩個(gè)分支組成。主程序通過 send 喚起子程序并傳入數(shù)據(jù),子程序處理完后,用 yield 將自己掛起,并返回主程序,如此交替進(jìn)行。
協(xié)程調(diào)度有時(shí),你的手頭上會有多個(gè)任務(wù),每個(gè)任務(wù)耗時(shí)很長,而你又不想同步處理,而是希望能像多線程一樣交替執(zhí)行。這時(shí),你就需要一個(gè)調(diào)度器來協(xié)調(diào)流程了。
作為例子,我們假設(shè)有這么一個(gè)任務(wù):
def task(name, times): for i in range(times): print(name, i)
如果你直接執(zhí)行 task,那它會在遍歷 times 次之后才會返回。為了實(shí)現(xiàn)我們的目的,我們需要將 task 人為地切割成若干塊,以便并行處理:
def task(name, times): for i in range(times): yield print(name, i)
這里的 yield 沒有邏輯意義,僅是作為暫停的標(biāo)志點(diǎn)。程序流可以在此暫停,也可以在此恢復(fù)。而通過實(shí)現(xiàn)一個(gè)調(diào)度器,我們可以完成多個(gè)任務(wù)的并行處理:
from collections import deque class Runner(object): def __init__(self, tasks): self.tasks = deque(tasks) def next(self): return self.tasks.pop() def run(self): while len(self.tasks): task = self.next() try: next(task) except StopIteration: pass else: self.tasks.appendleft(task)
這里我們用一個(gè)隊(duì)列(deque)儲存任務(wù)列表。其中的 run 是一個(gè)重要的方法: 它通過輪轉(zhuǎn)隊(duì)列依次喚起任務(wù),并將已經(jīng)完成的任務(wù)清出隊(duì)列,簡潔地模擬了任務(wù)調(diào)度的過程。
而現(xiàn)在,我們只需調(diào)用:
Runner([ task("hsfzxjy", 5), task("Jack", 4), task("Bob", 6) ]).run()
就可以得到預(yù)想中的效果了:
Bob 0 Jack 0 hsfzxjy 0 Bob 1 Jack 1 hsfzxjy 1 Bob 2 Jack 2 hsfzxjy 2 Bob 3 Jack 3 hsfzxjy 3 Bob 4 hsfzxjy 4 Bob 5
簡直完美!答案和丑陋的多線程別無二樣,代碼卻簡單了不止一個(gè)數(shù)量級。
異步 IO 模擬你絕對有過這樣的煩惱:程序常常被時(shí)滯嚴(yán)重的 IO 操作(數(shù)據(jù)庫查詢、大文件讀取、越過長城拿數(shù)據(jù))阻塞,在等待 IO 返回期間,線程就像死了一樣,空耗著時(shí)間。為此,你不得不用多線程甚至是多進(jìn)程來解決問題。
而事實(shí)上,在等待 IO 的時(shí)候,你完全可以做一些與數(shù)據(jù)無關(guān)的操作,最大化地利用時(shí)間。Node.js 在這點(diǎn)做得不錯(cuò)——它將一切異步化,壓榨性能。只可惜它的異步是基于事件回調(diào)機(jī)制的,稍有不慎,你就有可能陷入 Callback Hell 的深淵。
而協(xié)程并不使用回調(diào),相比之下可讀性會好很多。其思路大致如下:
維護(hù)一個(gè)消息隊(duì)列,用于儲存 IO 記錄。
協(xié)程函數(shù) IO 時(shí),自身掛起,同時(shí)向消息隊(duì)列插入一個(gè)記錄。
通過輪詢或是 epoll 等事件框架,捕獲 IO 返回的事件。
從消息隊(duì)列中取出記錄,恢復(fù)協(xié)程函數(shù)。
現(xiàn)在假設(shè)有這么一個(gè)耗時(shí)任務(wù):
def task(name): print(name, 1) sleep(1) print(name, 2) sleep(2) print(name, 3)
正常情況下,這個(gè)任務(wù)執(zhí)行完需要 3 秒,倘若多個(gè)同步任務(wù)同步執(zhí)行,執(zhí)行時(shí)間會成倍增長。而如果利用協(xié)程,我們就可以在接近 3 秒的時(shí)間內(nèi)完成多個(gè)任務(wù)。
首先我們要實(shí)現(xiàn)消息隊(duì)列:
events_list = [] class Event(object): def __init__(self, *args, **kwargs): self.callback = lambda: None events_list.append(self) def set_callback(self, callback): self.callback = callback def is_ready(self): result = self._is_ready() if result: self.callback() return result
Event 是消息的基類,其在初始化時(shí)會將自己放入消息隊(duì)列 events_list 中。Event 和 調(diào)度器 使用回調(diào)進(jìn)行交互。
接著我們要 hack 掉 sleep 函數(shù),這是因?yàn)樵?time.sleep() 會阻塞線程。通過自定義 sleep 我們可以模擬異步延時(shí)操作:
# sleep.py from event import Event from time import time class SleepEvent(Event): def __init__(self, timeout): super(SleepEvent, self).__init__(timeout) self.timeout = timeout self.start_time = time() def _is_ready(self): return time() - self.start_time >= self.timeout def sleep(timeout): return SleepEvent(timeout)
可以看出:sleep 在調(diào)用后就會立即返回,同時(shí)一個(gè) SleepEvent 對象會被放入消息隊(duì)列,經(jīng)過timeout 秒后執(zhí)行回調(diào)。
再接下來便是協(xié)程調(diào)度了:
# runner.py from event import events_list def run(tasks): for task in tasks: _next(task) while len(events_list): for event in events_list: if event.is_ready(): events_list.remove(event) break def _next(task): try: event = next(task) event.set_callback(lambda: _next(task)) # 1 except StopIteration: pass
run 啟動(dòng)了所有的子程序,并開始消息循環(huán)。每遇到一處掛起,調(diào)度器自動(dòng)設(shè)置回調(diào),并在回調(diào)中重新恢復(fù)代碼流?!?” 處巧妙地利用閉包保存狀態(tài)。
最后是主代碼:
from sleep import sleep import runner def task(name): print(name, 1) yield sleep(1) print(name, 2) yield sleep(2) print(name, 3) if __name__ == "__main__": runner.run((task("hsfzxjy"), task("Jack")))
輸出:
hsfzxjy 1 Jack 1 hsfzxjy 2 Jack 2 hsfzxjy 3 Jack 3 # [Finished in 3.0s]協(xié)程函數(shù)的層級調(diào)用
上面的代碼有一個(gè)不足之處,即協(xié)程函數(shù)返回的是一個(gè) Event 對象。然而事實(shí)上只有直接操縱 IO 的協(xié)程函數(shù)才有可能接觸到這個(gè)對象。那么,對于調(diào)用了 IO 的函數(shù)的調(diào)用者,它們應(yīng)該如何實(shí)現(xiàn)呢?
設(shè)想如下任務(wù):
def long_add(x, y, duration=1): yield sleep(duration) return x + y def task(duration): print("start:", time()) print((yield long_add(1, 2, duration))) print((yield long_add(3, 4, duration)))
long_add 是 IO 的一級調(diào)用者,task 調(diào)用 long_add,并利用其返回值進(jìn)行后續(xù)操作。
簡而言之,我們遇到的問題是:一個(gè)被喚起的協(xié)程函數(shù)如何喚起它的調(diào)用者?
正如在上個(gè)例子中,協(xié)程函數(shù)通過 Event 的回調(diào)與調(diào)度器交互。同理,我們也可以使用一個(gè)類似的對象,在這里我們稱其為 Future。
Future 保存在被調(diào)用者的閉包中,并由被調(diào)用者返回。而調(diào)用者通過在其上面設(shè)置回調(diào)函數(shù),實(shí)現(xiàn)兩個(gè)協(xié)程函數(shù)之間的交互。
Future 的代碼如下,看起來有點(diǎn)像 Event:
# future.py class Future(object): def __init__(self): super(Future, self).__init__() self.callback = lambda *args: None self._done = False def set_callback(self, callback): self.callback = callback def done(self, value=None): self._done = True self.callback(value)
Future 的回調(diào)函數(shù)允許接受一個(gè)參數(shù)作為返回值,以盡可能地模擬一般函數(shù)。
但這樣一來,協(xié)程函數(shù)就會有些復(fù)雜了。它們不僅要負(fù)責(zé)喚醒被調(diào)用者,還要負(fù)責(zé)與調(diào)用者之間的交互。這會產(chǎn)生許多重復(fù)代碼。為了 D.R.Y,我們用裝飾器封裝這一邏輯:
# co.py from functools import wraps from future import Future def _next(gen, future, value=None): try: try: yielded_future = gen.send(value) except TypeError: yielded_future = next(gen) yielded_future.set_callback(lambda value: _next(gen, future, value)) except StopIteration as e: future.done(e.value) def coroutine(func): @wraps(func) def wrapper(*args, **kwargs): future = Future() gen = func(*args, **kwargs) _next(gen, future) return future return wrapper
被 coroutine 包裝過的生成器成為了一個(gè)普通函數(shù),返回一個(gè) Future 對象。_next 為喚醒的核心邏輯,通過一個(gè)類似遞歸的回調(diào)設(shè)置簡潔地實(shí)現(xiàn)自我喚醒。當(dāng)自己執(zhí)行完時(shí),會將自己閉包內(nèi)的Future對象標(biāo)記為done,從而喚醒調(diào)用者。
為了適應(yīng)新變化,sleep 也要做相應(yīng)的更改:
from event import Event from future import Future from time import time class SleepEvent(Event): def __init__(self, timeout): super(SleepEvent, self).__init__() self.start_time = time() self.timeout = timeout def _is_ready(self): return time() - self.start_time >= self.timeout def sleep(timeout): future = Future() event = SleepEvent(timeout) event.set_callback(lambda: future.done()) return future
sleep 不再返回 Event 對象,而是一致地返回 Future,并作為 Event 和 Future 之間的代理者。
基于以上更改,調(diào)度器可以更加簡潔——這是因?yàn)閰f(xié)程函數(shù)能夠自我喚醒:
# runner.py from event import events_list def run(): while len(events_list): for event in events_list: if event.is_ready(): events_list.remove(event) break
主程序:
from co import coroutine from sleep import sleep import runner from time import time @coroutine def long_add(x, y, duration=1): yield sleep(duration) return x + y @coroutine def task(duration): print("start:", time()) print((yield long_add(1, 2, duration)), time()) print((yield long_add(3, 4, duration)), time()) task(2) task(1) runner.run()
由于我們使用了一個(gè)糟糕的事件輪詢機(jī)制,密集的計(jì)算會阻塞通往 stdout 的輸出,因而看起來所有的結(jié)果都是一起打印出來的。為此,我在打印時(shí)特地加上了時(shí)間戳,以演示協(xié)程的效果。輸出如下:
start: 1459609512.263156 start: 1459609512.263212 3 1459609513.2632613 3 1459609514.2632234 7 1459609514.263319 7 1459609516.2633028
這事實(shí)上是 tornado.gen.coroutine 的簡化版本,為了敘述方便我略去了許多細(xì)節(jié),如異常處理以及調(diào)度優(yōu)化,目的是讓大家能較清晰地了解 生成器協(xié)程 背后的機(jī)制。因此,這段代碼并不能用于實(shí)際生產(chǎn)中。
小結(jié)這,才叫精通生成器。
學(xué)習(xí)編程,不僅要知其然,亦要知其所以然。
Python 是有魔法的,只有想不到,沒有做不到。
Referencestornado.gen.coroutine
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/37873.html
摘要:幸而,提供了造物主的接口這便是,或者稱為元類。接下來我們將通過一個(gè)栗子感受的黑魔法,不過在此之前,我們要先了解一個(gè)語法糖。此外,在一些小型的庫中,也有元類的身影。 首發(fā)于 我的博客 轉(zhuǎn)載請注明出處 接觸過 Django 的同學(xué)都應(yīng)該十分熟悉它的 ORM 系統(tǒng)。對于 python 新手而言,這是一項(xiàng)幾乎可以被稱作黑科技的特性:只要你在models.py中隨便定義一個(gè)Model的子類,Dj...
摘要:我可以明確告訴你這不是,但它可以用解釋器運(yùn)行。這種黑魔法,還要從說起。提案者設(shè)想使用一種特殊的文件首注釋,用于指定代碼的編碼。暴露了一個(gè)函數(shù),用于注冊自定義編碼。所謂的黑魔法其實(shí)并不神秘,照貓畫虎定義好相應(yīng)的接口即可。 首發(fā)于我的博客,轉(zhuǎn)載請注明出處 寫在前面 本文為科普文 本文中的例子在 Ubuntu 14.04 / Python 2.7.11 下運(yùn)行成功,Python 3+ 的接...
摘要:本文講述了各種針對的方案比如和,尤其是針對等科學(xué)計(jì)算庫的化的進(jìn)展與困擾。本文認(rèn)為科學(xué)計(jì)算的未來必定會大規(guī)模的引用以提升效率。上相關(guān)的討論見這里。英文版本見郵件訂閱精選閱讀 專題:Python的各種黑魔法 用各種generator/iterator/descriptor等黑魔法,加上各種函數(shù)編程方法的使用,Python總能使用很短的代碼完成很復(fù)雜的事情,下面集中放一些這方面的文章 知乎...
摘要:所以在第一遍閱讀官方文檔的時(shí)候,感覺完全是在夢游。通過或者等待另一個(gè)協(xié)程的結(jié)果或者異常,異常會被傳播。接口返回的結(jié)果指示已結(jié)束,并賦值。取消與取消不同。調(diào)用將會向被包裝的協(xié)程拋出。任務(wù)相關(guān)函數(shù)安排協(xié)程的執(zhí)行。負(fù)責(zé)切換線程保存恢復(fù)。 Tasks and coroutines 翻譯的python官方文檔 這個(gè)問題的惡心之處在于,如果你要理解coroutine,你應(yīng)該理解future和tas...
摘要:什么是元類剛才說了,元類就是創(chuàng)建類的類。類上面的屬性,相信愿意了解元類細(xì)節(jié)的盆友,都肯定見過這個(gè)東西,而且為之好奇。使用了這個(gè)魔法方法就意味著就會用指定的元類來創(chuàng)建類了。深刻理解中的元類 (一) python中的類 今天看到一篇好文,然后結(jié)合自己的情況總結(jié)一波。這里討論的python類,都基于python2.7x以及繼承于object的新式類進(jìn)行討論。 首先在python中,所有東西都...
閱讀 501·2021-11-22 12:05
閱讀 1578·2021-11-17 09:33
閱讀 3623·2021-11-11 16:54
閱讀 2734·2021-10-14 09:49
閱讀 4125·2021-09-06 15:01
閱讀 1854·2019-08-29 17:23
閱讀 727·2019-08-29 14:09
閱讀 749·2019-08-29 12:28