成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Python 的并發(fā)編程

happen / 2659人閱讀

摘要:本文最先發(fā)布在博客這篇文章將講解并發(fā)編程的基本操作。并發(fā)是指能夠多任務(wù)處理,并行則是是能夠同時(shí)多任務(wù)處理。雖然自帶了很好的類庫支持多線程進(jìn)程編程,但眾所周知,因?yàn)榈拇嬖冢茈y做好真正的并行。

本文最先發(fā)布在博客:https://blog.ihypo.net/151628...

這篇文章將講解 Python 并發(fā)編程的基本操作。并發(fā)和并行是對孿生兄弟,概念經(jīng)?;煜?。并發(fā)是指能夠多任務(wù)處理,并行則是是能夠同時(shí)多任務(wù)處理。Erlang 之父 Joe Armstrong 有一張非常有趣的圖說明這兩個(gè)概念:

我個(gè)人更喜歡的一種說法是:并發(fā)是宏觀并行而微觀串行。

GIL

雖然 Python 自帶了很好的類庫支持多線程/進(jìn)程編程,但眾所周知,因?yàn)?GIL 的存在,Python 很難做好真正的并行。

GIL 指全局解釋器鎖,對于 GIL 的介紹:

全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),是計(jì)算機(jī)程序設(shè)計(jì)語言解釋器用于同步線程的一種機(jī)制,它使得任何時(shí)刻僅有一個(gè)線程在執(zhí)行。

維基百科

其實(shí)與其說 GIL 是 Python 解釋器的限制,不如說是 CPython 的限制,因?yàn)?Python 為了保障性能,底層大多使用 C 實(shí)現(xiàn)的,而 CPython 的內(nèi)存管理并不是線程安全的,為了保障整體的線程安全,解釋器便禁止多線程的并行執(zhí)行。

因?yàn)?Python 社區(qū)認(rèn)為操作系統(tǒng)的線程調(diào)度已經(jīng)非常成熟了,沒有必要自己再實(shí)現(xiàn)一遍,因此 Python 的線程切換基本是依賴操作系統(tǒng),在實(shí)際的使用中,對于單核 CPU,GIL 并沒有太大的影響,但對于多核 CPU 卻引入了線程顛簸(thrashing)問題。

線程顛簸是指作為單一資源的 GIL 鎖,在被多核心競爭強(qiáng)占時(shí)資源額外消耗的現(xiàn)象。

比如下圖,線程1 在釋放 GIL 鎖后,操作系統(tǒng)喚醒了 線程2,并將 線程2 分配給 核心2 執(zhí)行,但是如果此時(shí) 線程2 卻沒有成功獲得 GIL 鎖,只能再次被掛起。此時(shí)切換線程、切換上下文的資源都將白白浪費(fèi)。

因此,Python 多線程程序在多核 CPU 機(jī)器下的性能不一定比單核高。那么如果是計(jì)算密集型的程序,一般還是考慮用 C 重寫關(guān)鍵部分,或者使用多進(jìn)程避開 GIL。

多線程

在 Python 中使用多線程,有 threadthreading 可供原則,thread 提供了低級別的、原始的線程以及一個(gè)簡單的鎖,因?yàn)?thread 過于簡陋,線程管理容易出現(xiàn)人為失誤,因此官方更建議使用 threading,而 threading 也不過是對 thread 的封裝和補(bǔ)充。(Python3 中 thread 被改名為 _thread)。

在 Python 中創(chuàng)建線程非常簡單:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創(chuàng)建 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執(zhí)行 task
        t.start()

    for t in tasks:
        # 等待 task 執(zhí)行完畢
        # 完畢前會(huì)阻塞住主線程
        t.join()
    print("Finish.")

直接創(chuàng)建線程簡單優(yōu)雅,如果邏輯復(fù)雜,也可以通過繼承 Thread 基類完成多線程:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創(chuàng)建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執(zhí)行 task
        t.start()

    for t in tasks:
        # 等待 task 執(zhí)行完畢
        # 完畢前會(huì)阻塞住主線程
        t.join()
    print("Finish.")
多進(jìn)程

在 Python 中,可以使用 multiprocessing 庫來實(shí)現(xiàn)多進(jìn)程編程,和多線程一樣,有兩種方法可以使用多進(jìn)程編程。

直接創(chuàng)建進(jìn)程:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創(chuàng)建 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執(zhí)行 task
        t.start()

    for t in tasks:
        # 等待 task 執(zhí)行完畢
        # 完畢前會(huì)阻塞住主線程
        t.join()
    print("Finish.")

繼承進(jìn)程父類:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創(chuàng)建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執(zhí)行 task
        t.start()

    for t in tasks:
        # 等待 task 執(zhí)行完畢
        # 完畢前會(huì)阻塞住主線程
        t.join()
    print("Finish.")

multiprocessing 除了常用的多進(jìn)程編程外,我認(rèn)為它最大的意義在于提供了一套規(guī)范,在該庫下有一個(gè) dummy 模塊,即 multiprocessing.dummy,里面對 threading 進(jìn)行封裝,提供了和 multiprocessing 相同 API 的線程實(shí)現(xiàn),換句話說,class::multiprocessing.Process 提供的是進(jìn)程任務(wù)類,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,可以快速的講一個(gè)多進(jìn)程程序改為多線程:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創(chuàng)建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執(zhí)行 task
        t.start()

    for t in tasks:
        # 等待 task 執(zhí)行完畢
        # 完畢前會(huì)阻塞住主線程
        t.join()
    print("Finish.")

無論是多線程還是多進(jìn)程編程,這也是我一般會(huì)選擇 multiprocessing 的原因。

除了直接創(chuàng)建進(jìn)程,還可以用進(jìn)程池(或者 multiprocessing.dummy 里的進(jìn)程池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     創(chuàng)建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

線程池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     創(chuàng)建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

這里示例有個(gè)問題,pool 在 join 前需要 close 掉,否則就會(huì)拋出異常,不過 Python 之禪的作者 Tim Peters 給出解釋:

As to Pool.close(), you should call that when - and only when - you"re never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It"s also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there"s often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you"d otherwise never see.

同步原語

在多進(jìn)程編程中,因?yàn)檫M(jìn)程間的資源隔離,不需要考慮內(nèi)存的線程安全問題,而在多線程編程中便需要同步原語來保存線程安全,因?yàn)?Python 是一門簡單的語言,很多操作都是封裝的操作系統(tǒng) API,因此支持的同步原語蠻全,但這里只寫兩種常見的同步原語:鎖和信號量。

通過使用鎖可以用來保護(hù)一段內(nèi)存空間,而信號量可以被多個(gè)線程共享。

threading 中可以看到 Lock 鎖和 RLock 重用鎖兩種鎖,區(qū)別如名。這兩種鎖都只能被一個(gè)線程擁有,第一種鎖只能被獲得一次,而重用鎖可以被多次獲得,但也需要同樣次數(shù)的釋放才能真正的釋放。

當(dāng)多個(gè)線程對同一塊內(nèi)存空間同時(shí)進(jìn)行修改的時(shí)候,經(jīng)常遇到奇怪的問題:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非線程安全導(dǎo)致 count 沒有達(dá)到預(yù)期的效果。而通過鎖便可以控制某一段代碼,或者說某段內(nèi)存空間的訪問:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

當(dāng)然,上述例子非常暴力,直接強(qiáng)行把并發(fā)改為串行。

對于信號量常見于有限資源強(qiáng)占的場景,可以定義固定大小的信號量供多個(gè)線程獲取或者釋放,從而控制線程的任務(wù)執(zhí)行,比如下面的例子,控制最多有 5 個(gè)任務(wù)在執(zhí)行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")
Queue 和 Pipe

因?yàn)槎噙M(jìn)程的內(nèi)存隔離,不會(huì)存在內(nèi)存競爭的問題。但同時(shí),多個(gè)進(jìn)程間的數(shù)據(jù)共享成為了新的問題,而進(jìn)程間通信常見:隊(duì)列,管道,信號。

這里只講解隊(duì)列和管道。

隊(duì)列常見于雙進(jìn)程模型,一般用作生產(chǎn)者-消費(fèi)者模式,由生產(chǎn)者進(jìn)程向隊(duì)列中發(fā)布任務(wù),并由消費(fèi)者從隊(duì)列首部拿出任務(wù)進(jìn)行執(zhí)行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理論上每個(gè)進(jìn)程都可以向隊(duì)列里的讀或者寫,可以認(rèn)為隊(duì)列是半雙工路線。但是往往只有特定的讀進(jìn)程(比如消費(fèi)者)和寫進(jìn)程(比如生產(chǎn)者),盡管這些進(jìn)程只是開發(fā)者自己定義的。

而 Pipe 更像一個(gè)全工路線:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介紹的 threadingmultiprocessing 兩個(gè)庫外,還有一個(gè)好用的令人發(fā)指的庫 concurrent.futures。和前面兩個(gè)庫不同,這個(gè)庫是更高等級的抽象,隱藏了很多底層的東西,但也因此非常好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

該庫中自帶了進(jìn)程池和線程池,可以通過上下文管理器來管理,而且對于異步任務(wù)執(zhí)行完后,結(jié)果的獲得也非常簡單。再拿一個(gè)官方的多進(jìn)程計(jì)算的例子作為結(jié)束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print("%d is prime: %s" % (number, prime))

if __name__ == "__main__":
    main()

歡迎關(guān)注個(gè)人公眾號:CS實(shí)驗(yàn)室

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/41698.html

相關(guān)文章

  • 使用 Python 進(jìn)行并發(fā)編程系列 - 收藏集 - 掘金

    摘要:使用進(jìn)行并發(fā)編程篇三掘金這是使用進(jìn)行并發(fā)編程系列的最后一篇。所以我考慮啟用一個(gè)本地使用進(jìn)行并發(fā)編程篇二掘金我們今天繼續(xù)深入學(xué)習(xí)。 使用 Python 進(jìn)行并發(fā)編程 - asyncio 篇 (三) - 掘金 這是「使用Python進(jìn)行并發(fā)編程」系列的最后一篇。我特意地把它安排在了16年最后一天。 重新實(shí)驗(yàn)上篇的效率對比的實(shí)現(xiàn) 在第一篇我們曾經(jīng)對比并發(fā)執(zhí)行的效率,但是請求的是httpb...

    MorePainMoreGain 評論0 收藏0
  • Python基礎(chǔ)之使用期物處理并發(fā)

    摘要:本文重點(diǎn)掌握異步編程的相關(guān)概念了解期物的概念意義和使用方法了解中的阻塞型函數(shù)釋放的特點(diǎn)。一異步編程相關(guān)概念阻塞程序未得到所需計(jì)算資源時(shí)被掛起的狀態(tài)。 導(dǎo)語:本文章記錄了本人在學(xué)習(xí)Python基礎(chǔ)之控制流程篇的重點(diǎn)知識(shí)及個(gè)人心得,打算入門Python的朋友們可以來一起學(xué)習(xí)并交流。 本文重點(diǎn): 1、掌握異步編程的相關(guān)概念;2、了解期物future的概念、意義和使用方法;3、了解Python...

    asoren 評論0 收藏0
  • 從小白程序員一路晉升為大廠高級技術(shù)專家我看過哪些書籍?(建議收藏)

    摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報(bào)率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術(shù)專家我看過哪些技術(shù)類書籍。 大家好,我是...

    sf_wangchong 評論0 收藏0
  • python初學(xué)——網(wǎng)絡(luò)編程之FTP服務(wù)器支持多并發(fā)版本

    摘要:擴(kuò)展支持多用戶并發(fā)訪問與線程池。項(xiàng)目請見初學(xué)網(wǎng)絡(luò)編程之服務(wù)器。不允許超過磁盤配額。該文件是一個(gè)使用模塊編寫的線程池類。這一步就做到了線程池的作用。 對MYFTP項(xiàng)目進(jìn)行升級。擴(kuò)展支持多用戶并發(fā)訪問與線程池。MYFTP項(xiàng)目請見python初學(xué)——網(wǎng)絡(luò)編程之FTP服務(wù)器。 擴(kuò)展需求 1.在之前開發(fā)的FTP基礎(chǔ)上,開發(fā)支持多并發(fā)的功能2.不能使用SocketServer模塊,必須自己實(shí)現(xiàn)多線...

    oysun 評論0 收藏0
  • 史上最詳細(xì)Python學(xué)習(xí)路線-從入門到精通,只需90天

    摘要:針對的初學(xué)者,從無到有的語言如何入門,主要包括了的簡介,如何下載,如何安裝,如何使用終端,等各種開發(fā)環(huán)境進(jìn)行開發(fā),中的語法和基本知識(shí)概念和邏輯,以及繼續(xù)深入學(xué)習(xí)的方法。 ...

    gghyoo 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<