摘要:目前開發(fā)中有遇到進(jìn)程間需要共享數(shù)據(jù)的情況所以研究了下主要會以為例子說明下進(jìn)程間共享同一個父進(jìn)程使用說明創(chuàng)建一個對象創(chuàng)建一個創(chuàng)建一個測試程序創(chuàng)建進(jìn)程池進(jìn)行測試簡單的源碼分析這時我們再看一個例子創(chuàng)建一個對象創(chuàng)建一個創(chuàng)建一個測試程序創(chuàng)建進(jìn)程池進(jìn)行
目前開發(fā)中有遇到進(jìn)程間需要共享數(shù)據(jù)的情況. 所以研究了下multiprocessing.Manager, 主要會以dict為例子, 說明下進(jìn)程間共享(同一個父進(jìn)程).dict使用說明
import multiprocessing # 1. 創(chuàng)建一個Manger對象 manager = multiprocessing.Manager() # 2. 創(chuàng)建一個dict temp_dict = manager.dict() # 3. 創(chuàng)建一個測試程序 def test(idx, test_dict): test_dict[idx] = idx # 4. 創(chuàng)建進(jìn)程池進(jìn)行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
too simple.
簡單的源碼分析這時我們再看一個例子
import multiprocessing # 1. 創(chuàng)建一個Manger對象 manager = multiprocessing.Manager() # 2. 創(chuàng)建一個dict temp_dict = manager.dict() temp_dict["test"] = {} # 3. 創(chuàng)建一個測試程序 def test(idx, test_dict): test_dict["test"][idx] = idx # 4. 創(chuàng)建進(jìn)程池進(jìn)行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
可以看到輸出結(jié)果是奇怪的{"test": {}}
如果我們簡單修改一下代碼
import multiprocessing # 1. 創(chuàng)建一個Manger對象 manager = multiprocessing.Manager() # 2. 創(chuàng)建一個dict temp_dict = manager.dict() temp_dict["test"] = {} # 3. 創(chuàng)建一個測試程序 def test(idx, test_dict): row = test_dict["test"] row[idx] = idx test_dict["test"] = row # 4. 創(chuàng)建進(jìn)程池進(jìn)行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
這時輸出結(jié)果就符合預(yù)期了.
為了了解這個現(xiàn)象背后的原因, 我簡單去讀了一下源碼, 主要有以下幾段代碼很關(guān)鍵.
def Manager(): """ Returns a manager associated with a running server process The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects. """ from multiprocessing.managers import SyncManager m = SyncManager() m.start() return m ... def start(self, initializer=None, initargs=()): """ Spawn a server process for this manager object """ assert self._state.value == State.INITIAL if initializer is not None and not hasattr(initializer, "__call__"): raise TypeError("initializer must be a callable") # pipe over which we will retrieve address of server reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server self._process = Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), ) ident = ":".join(str(i) for i in self._process._identity) self._process.name = type(self).__name__ + "-" + ident self._process.start() ...
上面代碼可以看出, 當(dāng)我們聲明了一個Manager對象的時候, 程序?qū)嶋H在其他進(jìn)程啟動了一個server服務(wù), 這個server是阻塞的, 以此來實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)安全.
我的理解就是不同進(jìn)程之間操作都是互斥的, 一個進(jìn)程向server請求到這部分?jǐn)?shù)據(jù), 再把這部分?jǐn)?shù)據(jù)修改, 返回給server, 之后server再去處理其他進(jìn)程的請求.
回到上面的奇怪現(xiàn)象上, 這個操作test_dict["test"][idx] = idx實(shí)際上在拉取到server上的數(shù)據(jù)后進(jìn)行了修改, 但并沒有返回給server, 所以temp_dict的數(shù)據(jù)根本沒有變化. 在第二段正常代碼, 就相當(dāng)于先向服務(wù)器請求數(shù)據(jù), 再向服務(wù)器傳送修改后的數(shù)據(jù). 這樣就可以解釋這個現(xiàn)象了.
進(jìn)程間數(shù)據(jù)安全這個時候如果出現(xiàn)一種情況, 兩個進(jìn)程同時請求了一份相同的數(shù)據(jù), 分別進(jìn)行修改, 再提交到server上會怎么樣呢? 那當(dāng)然是數(shù)據(jù)產(chǎn)生異常. 基于此, 我們需要Manager的另一個對象, Lock(). 這個對象也不難理解, Manager本身就是一個server, dict跟lock都來自于這個server, 所以當(dāng)你lock住的時候, 其他進(jìn)程是不能取到數(shù)據(jù), 自然也不會出現(xiàn)上面那種異常情況.
代碼示例:
import multiprocessing # 1. 創(chuàng)建一個Manger對象 manager = multiprocessing.Manager() # 2. 創(chuàng)建一個dict temp_dict = manager.dict() lock = manager.Lock() temp_dict["test"] = {} # 3. 創(chuàng)建一個測試程序 def test(idx, test_dict, lock): lock.acquire() row = test_dict["test"] row[idx] = idx test_dict["test"] = row lock.release() # 4. 創(chuàng)建進(jìn)程池進(jìn)行測試 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict, lock)) pool.close() pool.join() print(temp_dict)
切忌不要進(jìn)程里自己新建lock對象, 要使用統(tǒng)一的lock對象.
終わり。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/43443.html
摘要:很簡單,這個模塊實(shí)現(xiàn)了開辟一塊共享內(nèi)存空間,就好比中的方法一樣,有興趣的同學(xué)可以去查閱。查了下資料,返回的對象控制了一個進(jìn)程,可用于多進(jìn)程之間的安全通信,其支持的類型有和等。 有關(guān)于 multiprocessing 中共享變量的問題 現(xiàn)在的cpu都很強(qiáng)大,比方我用的至強(qiáng)2620有24核可以同時工作,并行執(zhí)行進(jìn)程程序。這在計算密集型的程序是很需要的,如沙漠中的綠洲,令人重獲新生。那么,問...
摘要:連接帶遠(yuǎn)程管理器對象,該對象的地址在構(gòu)造函數(shù)中支出。在當(dāng)前進(jìn)程中運(yùn)行管理器服務(wù)器。啟動一個單的子進(jìn)程,并在該子進(jìn)程中啟動管理器服務(wù)器。如果無法序列號對象將引發(fā)異常。 上一篇文章:Python進(jìn)程專題6:共享數(shù)據(jù)與同步下一篇文章:Python進(jìn)程專題8:分布集群的消息傳遞 進(jìn)程不支持共享對象,上面描述的創(chuàng)建共享值和數(shù)組,但都是指定的特殊類型,對高級的Python對象(如:字典、列表、用...
摘要:效率高當(dāng)然,對于爬蟲這種密集型任務(wù)來說,多線程和多進(jìn)程影響差別并不大。對于計算密集型任務(wù)來說,的多進(jìn)程相比多線程,其多核運(yùn)行效率會有成倍的提升。 一、進(jìn)程介紹 進(jìn)程...
摘要:如果某線程并未使用很多操作,它會在自己的時間片內(nèi)一直占用處理器和。在中使用線程在和等大多數(shù)類系統(tǒng)上運(yùn)行時,支持多線程編程。守護(hù)線程另一個避免使用模塊的原因是,它不支持守護(hù)線程。 這一篇是Python并發(fā)的第四篇,主要介紹進(jìn)程和線程的定義,Python線程和全局解釋器鎖以及Python如何使用thread模塊處理并發(fā) 引言&動機(jī) 考慮一下這個場景,我們有10000條數(shù)據(jù)需要處理,處理每條...
摘要:本文最先發(fā)布在博客這篇文章將講解并發(fā)編程的基本操作。并發(fā)是指能夠多任務(wù)處理,并行則是是能夠同時多任務(wù)處理。雖然自帶了很好的類庫支持多線程進(jìn)程編程,但眾所周知,因?yàn)榈拇嬖?,很難做好真正的并行。 本文最先發(fā)布在博客:https://blog.ihypo.net/151628... 這篇文章將講解 Python 并發(fā)編程的基本操作。并發(fā)和并行是對孿生兄弟,概念經(jīng)?;煜?。并發(fā)是指能夠多任務(wù)處...
閱讀 1323·2023-04-25 19:33
閱讀 1203·2021-10-21 09:39
閱讀 3689·2021-09-09 09:32
閱讀 2656·2019-08-30 10:58
閱讀 1677·2019-08-29 16:17
閱讀 905·2019-08-29 15:29
閱讀 2927·2019-08-26 11:55
閱讀 2689·2019-08-26 10:33