摘要:徘徊和行程所用的時間使用指數(shù)分布生成,我們將時間設為分鐘數(shù),以便顯示清楚。迭代表示各輛出租車的進程在各輛出租車上調(diào)用函數(shù),預激協(xié)程。
前兩篇我們已經(jīng)介紹了python 協(xié)程的使用和yield from 的原理,這一篇,我們用一個例子來揭示如何使用協(xié)程在單線程中管理并發(fā)活動。。
什么是離散事件仿真Wiki上的定義是:
離散事件仿真將系統(tǒng)隨時間的變化抽象成一系列的離散時間點上的事件,通過按照事件時間順序處理事件來演進,是一種事件驅(qū)動的仿真世界觀。離散事件仿真將系統(tǒng)的變化看做一個事件,因此系統(tǒng)任何的變化都只能是通過處理相應的事件來實現(xiàn),在兩個相鄰的事件之間,系統(tǒng)狀態(tài)維持前一個事件發(fā)生后的狀態(tài)不變。
人話說就是一種把系統(tǒng)建模成一系列事件的仿真系統(tǒng)。在離散事件仿真中,仿真“鐘”向前推進的量不是固定的,而是直接推進到下一個事件模型的模擬時間。
假設我們抽象模擬出租車的運營過程,其中一個事件是乘客上車,下一個事件則是乘客下車。不管乘客做了5分鐘還是50分鐘,一旦下車,仿真鐘就會更新,指向此次運營的結束時間。
事件?是不是想到了協(xié)程!
協(xié)程恰好為實現(xiàn)離散事件仿真提供了合理的抽象。
出租車對運營仿真第一門面向?qū)ο蟮恼Z音 Simula 引入?yún)f(xié)程這個概念就是為了支持仿真。
Simpy 是一個實現(xiàn)離散事件仿真的Python包,通過一個協(xié)程表示離散事件仿真系統(tǒng)的各個進程。
仿真程序會創(chuàng)建幾輛出租車,每輛出租車會拉幾個乘客,然后回家。出租車會首先駛離車庫,四處徘徊,尋找乘客;拉到乘客后,行程開始;乘客下車后,繼續(xù)四處徘徊。
徘徊和行程所用的時間使用指數(shù)分布生成,我們將時間設為分鐘數(shù),以便顯示清楚。
完整代碼如下:(taxi_sim.py)
#! -*- coding: utf-8 -*- import random import collections import queue import argparse DEFAULT_NUMBER_OF_TAXIS = 3 DEFAULT_END_TIME = 180 SEARCH_DURATION = 5 TRIP_DURATION = 20 DEPARTURE_INTERAVAL = 5 # time 是事件發(fā)生的仿真時間,proc 是出租車進程實例的編號,action是描述活動的字符串 Event = collections.namedtuple("Event", "time proc action") # 開始 出租車進程 # 每輛出租車調(diào)用一次taxi_process 函數(shù),創(chuàng)建一個生成器對象,表示各輛出租車的運營過程。 def taxi_process(ident, trips, start_time=0): """ 每次狀態(tài)變化時向創(chuàng)建事件,把控制權交給仿真器 :param ident: 出租車編號 :param trips: 出租車回家前的行程數(shù)量 :param start_time: 離開車庫的時間 :return: """ time = yield Event(start_time, ident, "leave garage") # 產(chǎn)出的第一個Event for i in range(trips): # 每次行程都會執(zhí)行一遍這個代碼塊 # 產(chǎn)出一個Event實例,表示拉到了乘客 協(xié)程在這里暫停 等待下一次send() 激活 time = yield Event(time, ident, "pick up passenger") # 產(chǎn)出一個Event實例,表示乘客下車 協(xié)程在這里暫停 等待下一次send() 激活 time = yield Event(time, ident, "drop off passenger") # 指定的行程數(shù)量完成后,for 循環(huán)結束,最后產(chǎn)出 "going home" 事件。協(xié)程最后一次暫停 yield Event(time, ident, "going home") # 協(xié)程執(zhí)行到最后 拋出StopIteration 異常 def compute_duration(previous_action): """使用指數(shù)分布計算操作的耗時""" if previous_action in ["leave garage", "drop off passenger"]: # 新狀態(tài)是四處徘徊 interval = SEARCH_DURATION elif previous_action == "pick up passenger": # 新狀態(tài)是開始行程 interval = TRIP_DURATION elif previous_action == "going home": interval = 1 else: raise ValueError("Unkonw previous_action: %s" % previous_action) return int(random.expovariate(1/interval)) + 1 # 開始仿真 class Simulator: def __init__(self, procs_map): self.events = queue.PriorityQueue() # 帶優(yōu)先級的隊列 會按時間正向排序 self.procs = dict(procs_map) # 從獲取的procs_map 參數(shù)中創(chuàng)建本地副本,為了不修改用戶傳入的值 def run(self, end_time): """ 調(diào)度并顯示事件,直到時間結束 :param end_time: 結束時間 只需要指定一個參數(shù) :return: """ # 調(diào)度各輛出租車的第一個事件 for iden, proc in sorted(self.procs.items()): first_event = next(proc) # 預激協(xié)程 并產(chǎn)出一個 Event 對象 self.events.put(first_event) # 把各個事件加到self.events 屬性表示的 PriorityQueue對象中 # 此次仿真的主循環(huán) sim_time = 0 # 把 sim_time 歸0 while sim_time < end_time: if self.events.empty(): # 事件全部完成后退出循環(huán) print("*** end of event ***") break current_event = self.events.get() # 獲取優(yōu)先級最高(time 屬性最小)的事件 sim_time, proc_id, previous_action = current_event # 更新 sim_time print("taxi:", proc_id, proc_id * " ", current_event) active_proc = self.procs[proc_id] # 從self.procs 字典中獲取表示當前活動的出租車協(xié)程 next_time = sim_time + compute_duration(previous_action) try: next_event = active_proc.send(next_time) # 把計算得到的時間發(fā)送給出租車協(xié)程。協(xié)程會產(chǎn)出下一個事件,或者拋出 StopIteration except StopIteration: del self.procs[proc_id] # 如果有異常 表示已經(jīng)退出, 刪除這個協(xié)程 else: self.events.put(next_event) # 如果沒有異常,把next_event 加入到隊列 else: # 如果超時 則走到這里 msg = "*** end of simulation time: {} event pendding ***" print(msg.format(self.events.qsize())) def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS, seed=None): """初始化隨機生成器,構建過程,運行仿真程序""" if seed is not None: random.seed(seed) # 獲取可復現(xiàn)的結果 # 構建taxis 字典。值是三個參數(shù)不同的生成器對象。 taxis = {i: taxi_process(i, (i + 1) * 2, i*DEPARTURE_INTERAVAL) for i in range(num_taxis)} sim = Simulator(taxis) sim.run(end_time) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Taxi fleet simulator.") parser.add_argument("-e", "--end-time", type=int, default=DEFAULT_END_TIME, help="simulation end time; default=%s" % DEFAULT_END_TIME) parser.add_argument("-t", "--taxis", type=int, default=DEFAULT_NUMBER_OF_TAXIS, help="number of taxis running; default = %s" % DEFAULT_NUMBER_OF_TAXIS) parser.add_argument("-s", "--seed", type=int, default=None, help="random generator seed (for testing)") args = parser.parse_args() main(args.end_time, args.taxis, args.seed)
運行程序,
# -s 3 參數(shù)設置隨機生成器的種子,以便調(diào)試的時候隨機數(shù)不變,輸出相同的結果 python taxi_sim.py -s 3
輸出結果如下圖
從結果我們可以看出,3輛出租車的行程是交叉進行的。不同顏色的箭頭代表不同出租車從乘客上車到乘客下車的跨度。
從結果可以看出:
出租車每5隔分鐘從車庫出發(fā)
0 號出租車2分鐘后拉到乘客(time=2),1號出租車3分鐘后拉到乘客(time=8),2號出租車5分鐘后拉到乘客(time=15)
0 號出租車拉了兩個乘客
1 號出租車拉了4個乘客
2 號出租車拉了6個乘客
在此次示中,所有排定的事件都在默認的仿真時間內(nèi)完成
我們先在控制臺中調(diào)用taxi_process 函數(shù),自己駕駛一輛出租車,示例如下:
In [1]: from taxi_sim import taxi_process # 創(chuàng)建一個生成器,表示一輛出租車 編號是13 從t=0 開始,有兩次行程 In [2]: taxi = taxi_process(ident=13, trips=2, start_time=0) In [3]: next(taxi) # 預激協(xié)程 Out[3]: Event(time=0, proc=13, action="leave garage") # 發(fā)送當前時間 在控制臺中,變量_綁定的是前一個結果 # _.time + 7 是 0 + 7 In [4]: taxi.send(_.time+7) Out[4]: Event(time=7, proc=13, action="pick up passenger") # 這個事件有for循環(huán)在第一個行程的開頭產(chǎn)出 # 發(fā)送_.time+12 表示這個乘客用時12分鐘 In [5]: taxi.send(_.time+12) Out[5]: Event(time=19, proc=13, action="drop off passenger") # 徘徊了29 分鐘 In [6]: taxi.send(_.time+29) Out[6]: Event(time=48, proc=13, action="pick up passenger") # 乘坐了50分鐘 In [7]: taxi.send(_.time+50) Out[7]: Event(time=98, proc=13, action="drop off passenger") # 兩次行程結束 for 循環(huán)結束產(chǎn)出"going home" In [8]: taxi.send(_.time+5) Out[8]: Event(time=103, proc=13, action="going home") # 再發(fā)送值,會執(zhí)行到末尾 協(xié)程返回后 拋出 StopIteration 異常 In [9]: taxi.send(_.time+10) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last)in () ----> 1 taxi.send(_.time+10) StopIteration:
在這個示例中,我們用控制臺模擬仿真主循環(huán)。從taxi協(xié)程中產(chǎn)出的Event實例中獲取 .time 屬性,隨意加一個數(shù),然后調(diào)用send()方法發(fā)送兩數(shù)之和,重新激活協(xié)程。
在taxi_sim.py 代碼中,出租車協(xié)程由 Simulator.run 方法中的主循環(huán)驅(qū)動。
Simulator 類的主要數(shù)據(jù)結構如下:
self.events
PriorityQueue 對象,保存Event實例。元素可以放進PriorityQueue對象中,然后按 item[0](對象的time 屬性)依序取出(按從小到大)。
self.procs
一個字典,把出租車的編號映射到仿真過程的進程(表示出租車生成器的對象)。這個屬性會綁定前面所示的taxis字典副本。
優(yōu)先隊列是離散事件仿真系統(tǒng)的基礎構件:創(chuàng)建事件的順序不定,放入這種隊列后,可以按各個事件排定的順序取出。
比如,我們把兩個事件放入隊列:
Event(time=14, proc=0, action="pick up passenger") Event(time=10, proc=1, action="pick up passenger")
這個意思是 0號出租車14分拉到一個乘客,1號出租車10分拉到一個乘客。但是主循環(huán)獲取的第一個事件將是
Event(time=10, proc=1, action="pick up passenger")
下面我們分析一下仿真系統(tǒng)的主算法--Simulator.run 方法。
迭代表示各輛出租車的進程
在各輛出租車上調(diào)用next()函數(shù),預激協(xié)程。
把各個事件放入Simulator類的self.events屬性中。
滿足 sim_time < end_time 條件是,運行仿真系統(tǒng)的主循環(huán)。
檢查self.events 屬性是否為空;如果為空,跳出循環(huán)
從self.events 中獲取當前事件
顯示獲取的Event對象
獲取curent_event 的time 屬性,更新仿真時間
把時間發(fā)送給current_event 的pro屬性標識的協(xié)程,產(chǎn)出下一個事件
把next_event 添加到self.events 隊列中,排定 next_event
我們代碼中 while 循環(huán)有一個else 語句,仿真系統(tǒng)到達結束時間后,代碼會執(zhí)行else中的語句。
這個示例主要是想說明如何在一個主循環(huán)中處理事件,以及如何通過發(fā)送數(shù)據(jù)驅(qū)動協(xié)程,同時解釋了如何使用生成器代替線程和回調(diào),實現(xiàn)并發(fā)。
并發(fā): 多個任務交替執(zhí)行
并行: 多個任務同時執(zhí)行
到這里 Python協(xié)程系列的三篇文章就結束了。
前兩篇文章我們會看到,協(xié)程做面向事件編程時,會不斷把控制權讓步給主循環(huán),激活并向前運行其他協(xié)程,從而執(zhí)行各個并發(fā)活動。
協(xié)程一種協(xié)作式多任務:協(xié)程顯式自主的把控制權讓步給中央調(diào)度程序。
多線程實現(xiàn)的是搶占式多任務。調(diào)度程序可以在任何時刻暫停線程,把控制權交給其他線程
python 協(xié)程1:協(xié)程10分鐘入門
python 協(xié)程2:yield from 從入門到精通
再次說明一下,這幾篇是《流暢的python》一書的讀書筆記,作者提供了大量的擴展閱讀,有興趣的可以看一下。
擴展閱讀Generator Tricks for Systems Programmers
A Curious Course on Coroutines and Concurrency
Generators: The Final Frontier
greedy algorithm with coroutines
BinaryTree類、一個簡單的XML解析器、和一個任務調(diào)度器Proposal for a yield from statement for Python
考慮用協(xié)程操作多個函數(shù)
最后,感謝女朋友支持。
>歡迎關注 | >請我喝芬達 |
---|---|
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/44436.html
摘要:仿真示例出租車進程。每次狀態(tài)變化時向仿真程序產(chǎn)出一個事件結束出租車進程出租車仿真程序主程序。 這個簡單的例子讓我們比較淺顯易懂的看到了事件驅(qū)動型框架的運作方式,即在單個線程中使用一個主循環(huán)驅(qū)動協(xié)程執(zhí)行并發(fā)活動。 使用協(xié)程做面向事件編程時,協(xié)程會不斷的把控制權讓步給主循環(huán),激活并向前運行其他協(xié)程,從而執(zhí)行各個并發(fā)活動。這是一種協(xié)作多任務:協(xié)程顯示的把控制權讓步給中央調(diào)度程序。 仿真示例 ...
摘要:于此同時,會阻塞,等待終止。子生成器返回之后,解釋器會拋出異常,并把返回值附加到異常對象上,只是委派生成器恢復。實例運行完畢后,返回的值綁定到上。這一部分處理調(diào)用方通過方法傳入的異常。之外的異常會向上冒泡。 上一篇python協(xié)程1:yield的使用介紹了: 生成器作為協(xié)程使用時的行為和狀態(tài) 使用裝飾器預激協(xié)程 調(diào)用方如何使用生成器對象的 .throw(...) 和 .close()...
摘要:協(xié)程,又稱微線程,纖程。最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。生產(chǎn)者產(chǎn)出第條數(shù)據(jù)返回更新值更新消費者正在調(diào)用第條數(shù)據(jù)查看當前進行的線程函數(shù)中有,返回值為生成器庫實現(xiàn)協(xié)程通過提供了對協(xié)程的基本支持,但是不完全。 協(xié)程,又稱微線程,纖程。英文名Coroutine協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉而執(zhí)行別的子程序,在適當?shù)臅r候再返回來接著執(zhí)行。 最大的優(yōu)勢就是協(xié)程極高...
摘要:新語法表達式語句可以被用在賦值表達式的右側在這種情況下,它就是表達式。表達式必須始終用括號括起來,除非它是作為頂級表達式而出現(xiàn)在賦值表達式的右側。 showImg(https://segmentfault.com/img/bVbnQsb?w=4344&h=2418);PEP原文 : https://www.python.org/dev/pe... PEP標題: Coroutines v...
閱讀 3623·2021-11-24 10:19
閱讀 3754·2021-09-30 09:47
閱讀 1319·2019-08-30 15:56
閱讀 822·2019-08-29 15:11
閱讀 929·2019-08-29 13:43
閱讀 3610·2019-08-28 18:25
閱讀 2180·2019-08-26 13:27
閱讀 1456·2019-08-26 11:44