成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Python 模塊源碼分析:queue 隊(duì)列

goji / 2082人閱讀

摘要:因?yàn)樗蔷€程安全的,所以多個(gè)線程很輕松地使用同一個(gè)實(shí)例。后進(jìn)先出隊(duì)列使用后進(jìn)先出順序,與棧結(jié)構(gòu)相似這就是全部代碼了,這正是設(shè)計(jì)很棒的一個(gè)原因,它將底層的數(shù)據(jù)操作抽象成四個(gè)操作函數(shù),本身來(lái)處理線程安全的問(wèn)題,使得其子類只需關(guān)注底層的操作。

起步

queue 模塊提供適用于多線程編程的先進(jìn)先出(FIFO)數(shù)據(jù)結(jié)構(gòu)。因?yàn)樗蔷€程安全的,所以多個(gè)線程很輕松地使用同一個(gè)實(shí)例。

源碼分析

先從初始化的函數(shù)來(lái)看:

class Queue:
    def __init__(self, maxsize=0):
        # 設(shè)置隊(duì)列的最大容量
        self.maxsize = maxsize
        self._init(maxsize)

        # 線程鎖,互斥變量
        self.mutex = threading.Lock()
        # 由鎖衍生出三個(gè)條件變量
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)

        self.unfinished_tasks = 0

    def _init(self, maxsize):
        # 初始化底層數(shù)據(jù)結(jié)構(gòu)
        self.queue = deque()

從這初始化函數(shù)能得到哪些信息呢?首先,隊(duì)列是可以設(shè)置其容量大小的,并且具體的底層存放元素的它使用了 collections.deque() 雙端列表的數(shù)據(jù)結(jié)構(gòu),這使得能很方便的做先進(jìn)先出操作。這里還特地抽象為 _init 函數(shù)是為了方便其子類進(jìn)行覆蓋,允許子類使用其他結(jié)構(gòu)來(lái)存放元素(比如優(yōu)先隊(duì)列使用了 list)。

然后就是線程鎖 self.mutex ,對(duì)于底層數(shù)據(jù)結(jié)構(gòu) self.queue 的操作都要先獲得這把鎖;再往下是三個(gè)條件變量,這三個(gè) Condition 都以 self.mutex 作為參數(shù),也就是說(shuō)它們共用一把鎖;從這可以知道諸如 with self.mutexwith self.not_empty 等都是互斥的。

基于這些鎖而做的一些簡(jiǎn)單的操作:

class Queue:
    ...
    def qsize(self):
        # 返回隊(duì)列中的元素?cái)?shù)
        with self.mutex:
            return self._qsize()

    def empty(self):
        # 隊(duì)列是否為空
        with self.mutex:
            return not self._qsize()

    def full(self):
        # 隊(duì)列是否已滿
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def _qsize(self):
        return len(self.queue)

這個(gè)代碼片段挺好理解的,無(wú)需分析。

作為隊(duì)列,主要得完成入隊(duì)與出隊(duì)的操作,首先是入隊(duì):

class Queue:
    ...
    def put(self, item, block=True, timeout=None):
        with self.not_full: # 獲取條件變量not_full
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full # 如果 block 是 False,并且隊(duì)列已滿,那么拋出 Full 異常
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait() # 阻塞直到由剩余空間
                elif timeout < 0: # 不合格的參數(shù)值,拋出ValueError
                    raise ValueError(""timeout" must be a non-negative number")
                else:
                    endtime = time() + timeout  # 計(jì)算等待的結(jié)束時(shí)間
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full # 等待期間一直沒(méi)空間,拋出 Full 異常
                        self.not_full.wait(remaining)
            self._put(item) # 往底層數(shù)據(jù)結(jié)構(gòu)中加入一個(gè)元素
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def _put(self, item):
        self.queue.append(item)

盡管只有二十幾行的代碼,但這里的邏輯還是比較復(fù)雜的。它要處理超時(shí)與隊(duì)列剩余空間不足的情況,具體幾種情況如下:

如果 block 是 False,忽略timeout參數(shù)

若此時(shí)隊(duì)列已滿,則拋出 Full 異常;

若此時(shí)隊(duì)列未滿,則立即把元素保存到底層數(shù)據(jù)結(jié)構(gòu)中;

如果 block 是 True

timeoutNone 時(shí),那么put操作可能會(huì)阻塞,直到隊(duì)列中有空閑的空間(默認(rèn));

timeout 是非負(fù)數(shù),則會(huì)阻塞相應(yīng)時(shí)間直到隊(duì)列中有剩余空間,在這個(gè)期間,如果隊(duì)列中一直沒(méi)有空間,拋出 Full 異常;

處理好參數(shù)邏輯后,,將元素保存到底層數(shù)據(jù)結(jié)構(gòu)中,并遞增unfinished_tasks,同時(shí)通知 not_empty ,喚醒在其中等待數(shù)據(jù)的線程。

出隊(duì)操作:

class Queue:
    ...
    def get(self, block=True, timeout=None):
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError(""timeout" must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def _get(self):     
        return self.queue.popleft()

get() 操作是 put() 相反的操作,代碼塊也及其相似,get() 是從隊(duì)列中移除最先插入的元素并將其返回。

如果 block 是 False,忽略timeout參數(shù)

若此時(shí)隊(duì)列沒(méi)有元素,則拋出 Empty 異常;

若此時(shí)隊(duì)列由元素,則立即把元素保存到底層數(shù)據(jù)結(jié)構(gòu)中;

如果 block 是 True

timeoutNone 時(shí),那么get操作可能會(huì)阻塞,直到隊(duì)列中有元素(默認(rèn));

timeout 是非負(fù)數(shù),則會(huì)阻塞相應(yīng)時(shí)間直到隊(duì)列中有元素,在這個(gè)期間,如果隊(duì)列中一直沒(méi)有元素,則拋出 Empty 異常;

最后,通過(guò) self.queue.popleft() 將最早放入隊(duì)列的元素移除,并通知 not_full ,喚醒在其中等待數(shù)據(jù)的線程。

這里有個(gè)值得注意的地方,在 put() 操作中遞增了 self.unfinished_tasks ,而 get() 中卻沒(méi)有遞減,這是為什么?

這其實(shí)是為了留給用戶一個(gè)消費(fèi)元素的時(shí)間,get() 僅僅是獲取元素,并不代表消費(fèi)者線程處理的該元素,用戶需要調(diào)用 task_done() 來(lái)通知隊(duì)列該任務(wù)處理完成了:

class Queue:
    ...
    def task_done(self):
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0: # 也就是成功調(diào)用put()的次數(shù)小于調(diào)用task_done()的次數(shù)時(shí),會(huì)拋出異常
                    raise ValueError("task_done() called too many times")
                self.all_tasks_done.notify_all() # 當(dāng)unfinished為0時(shí),會(huì)通知all_tasks_done
            self.unfinished_tasks = unfinished

    def join(self):
        with self.all_tasks_done:
            while self.unfinished_tasks: # 如果有未完成的任務(wù),將調(diào)用wait()方法等待
                self.all_tasks_done.wait()

由于 task_done() 使用方調(diào)用的,當(dāng) task_done() 次數(shù)大于 put() 次數(shù)時(shí)會(huì)拋出異常。

task_done() 操作的作用是喚醒正在阻塞的 join() 操作。join() 方法會(huì)一直阻塞,直到隊(duì)列中所有的元素都被取出,并被處理了(和線程的join方法類似)。也就是說(shuō) join() 方法必須配合 task_done() 來(lái)使用才行。

LIFO 后進(jìn)先出隊(duì)列

LifoQueue使用后進(jìn)先出順序,與棧結(jié)構(gòu)相似:

class LifoQueue(Queue):
    """Variant of Queue that retrieves most recently added entries first."""

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()

這就是 LifoQueue 全部代碼了,這正是 Queue 設(shè)計(jì)很棒的一個(gè)原因,它將底層的數(shù)據(jù)操作抽象成四個(gè)操作函數(shù),本身來(lái)處理線程安全的問(wèn)題,使得其子類只需關(guān)注底層的操作。

LifoQueue 底層數(shù)據(jù)結(jié)構(gòu)改用 list 來(lái)存放,通過(guò) self.queue.pop() 就能將 list 中最后一個(gè)元素移除,無(wú)需重置索引。

PriorityQueue 優(yōu)先隊(duì)列
from heapq import heappush, heappop

class PriorityQueue(Queue):
    """Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    """

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)

優(yōu)先隊(duì)列使用了 heapq 模塊的結(jié)構(gòu),也就是最小堆的結(jié)構(gòu)。優(yōu)先隊(duì)列更為常用,隊(duì)列中項(xiàng)目的處理順序需要基于這些項(xiàng)目的特征,一個(gè)簡(jiǎn)單的例子:

import queue

class A:
    def __init__(self, priority, value):
        self.priority = priority
        self.value = value

    def __lt__(self, other):
        return self.priority < other.priority


q = queue.PriorityQueue()

q.put(A(1, "a"))
q.put(A(0, "b"))
q.put(A(1, "c"))

print(q.get().value)  # "b"

使用優(yōu)先隊(duì)列的時(shí)候,需要定義 __lt__ 魔術(shù)方法,來(lái)定義它們之間如何比較大小。若元素的 priority 相同,依然使用先進(jìn)先出的順序。

參考

https://pymotw.com/3/queue/in...

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/43547.html

相關(guān)文章

  • python threading模塊使用 以及python多線程操作的實(shí)踐(使用Queue隊(duì)列模塊)

    摘要:介紹今天花了近乎一天的時(shí)間研究關(guān)于多線程的問(wèn)題,查看了大量源碼自己也實(shí)踐了一個(gè)生產(chǎn)消費(fèi)者模型,所以把一天的收獲總結(jié)一下。提供了兩個(gè)模塊和來(lái)支持的多線程操作。使用來(lái)阻塞線程。 介紹 今天花了近乎一天的時(shí)間研究python關(guān)于多線程的問(wèn)題,查看了大量源碼 自己也實(shí)踐了一個(gè)生產(chǎn)消費(fèi)者模型,所以把一天的收獲總結(jié)一下。 由于GIL(Global Interpreter Lock)鎖的關(guān)系,純的p...

    shusen 評(píng)論0 收藏0
  • Swoole 源碼分析——基礎(chǔ)模塊Queue隊(duì)列

    摘要:消息隊(duì)列的接受消息隊(duì)列的接受是利用函數(shù),其中是消息的類型,該參數(shù)會(huì)取出指定類型的消息,如果設(shè)定的是爭(zhēng)搶模式,該值會(huì)統(tǒng)一為,否則該值就是消息發(fā)送目的的。環(huán)形隊(duì)列的消息入隊(duì)發(fā)送消息首先要確定環(huán)形隊(duì)列的隊(duì)尾。取模操作可以優(yōu)化 前言 swoole 的底層隊(duì)列有兩種:進(jìn)程間通信 IPC 的消息隊(duì)列 swMsgQueue,與環(huán)形隊(duì)列 swRingQueue。IPC 的消息隊(duì)列用于 task_wor...

    jollywing 評(píng)論0 收藏0
  • Python線程池源碼分析

    摘要:對(duì)線程池的研究是之前對(duì)分析的附加工作。在之前對(duì)源碼分析的文章中,寫(xiě)到調(diào)度器將任務(wù)放入線程池的函數(shù)這里分析的線程池類是,也就是上述代碼中所使用的類。 對(duì)Python線程池的研究是之前對(duì)Apshceduler分析的附加工作。 在之前對(duì)Apshceduler源碼分析的文章中,寫(xiě)到調(diào)度器將任務(wù)放入線程池的函數(shù) def _do_submit_job(self, job, run_time...

    ephererid 評(píng)論0 收藏0
  • Python -- Queue模塊

    摘要:默認(rèn)值為,指定為時(shí)代表可以阻塞,若同時(shí)指定,在超時(shí)時(shí)返回。當(dāng)消費(fèi)者線程調(diào)用意味著有消費(fèi)者取得任務(wù)并完成任務(wù),未完成的任務(wù)數(shù)就會(huì)減少。當(dāng)未完成的任務(wù)數(shù)降到,解除阻塞。 學(xué)習(xí)契機(jī) 最近的一個(gè)項(xiàng)目中在使用grpc時(shí)遇到一個(gè)問(wèn)題,由于client端可多達(dá)200,每個(gè)端口每10s向grpc server發(fā)送一次請(qǐng)求,server端接受client的請(qǐng)求后根據(jù)request信息更新數(shù)據(jù)庫(kù),再將數(shù)據(jù)...

    rubyshen 評(píng)論0 收藏0
  • Python保住“設(shè)計(jì)大哥“的頭發(fā),直接甩給他10000張參考圖,爬蟲(chóng)采集【稿定設(shè)計(jì)】平面模板素材

    摘要:最近稿定設(shè)計(jì)這個(gè)站點(diǎn)挺火,設(shè)計(jì)組的大哥一直在提,啊,這個(gè)好,這個(gè)好。目的是給設(shè)計(jì)組大哥提供素材參考,畢竟做設(shè)計(jì)的可不能抄襲哦思路枯竭的時(shí)候,借鑒一下還湊合??戳艘谎墼O(shè)計(jì)大哥的頭發(fā),我覺(jué)得夠他用一年了。 ...

    iliyaku 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<