摘要:實(shí)現(xiàn)阻塞隊(duì)列在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫(xiě)入線(xiàn)程寫(xiě)入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線(xiàn)程。
實(shí)現(xiàn)Java 阻塞隊(duì)列
在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn):
基本隊(duì)列特性:先進(jìn)先出。
寫(xiě)入隊(duì)列空間不可用時(shí)會(huì)阻塞。
獲取隊(duì)列數(shù)據(jù)時(shí)當(dāng)隊(duì)列為空時(shí)將阻塞。
實(shí)現(xiàn)隊(duì)列的方式多種,總的來(lái)說(shuō)就是數(shù)組和鏈表;其實(shí)我們只需要搞清楚其中一個(gè)即可,不同的特性主要表現(xiàn)為數(shù)組和鏈表的區(qū)別。
這里的 ArrayBlockingQueue 看名字很明顯是由數(shù)組實(shí)現(xiàn)。
我們先根據(jù)它這三個(gè)特性嘗試自己實(shí)現(xiàn)試試。
初始化隊(duì)列我這里自定義了一個(gè)類(lèi):ArrayBlockQueue,它的構(gòu)造函數(shù)如下:
//隊(duì)列參數(shù) public int size; private volatile Object[] items; private volatile int pollPoint = 0; private volatile int addPoint = 0; private volatile int count = 0; //初始化隊(duì)列容量 public ArrayBlockQueue(int size) { this.size = size; this.items = new Object[size]; }寫(xiě)入操作
有幾個(gè)需要注意的點(diǎn):
隊(duì)列滿(mǎn)的時(shí)候,寫(xiě)入的線(xiàn)程需要被阻塞。
寫(xiě)入過(guò)隊(duì)列的數(shù)量大于隊(duì)列大小時(shí)需要從第一個(gè)下標(biāo)開(kāi)始寫(xiě)。
先看第一個(gè)隊(duì)列滿(mǎn)的時(shí)候,寫(xiě)入的線(xiàn)程需要被阻塞,先來(lái)考慮下如何才能使一個(gè)線(xiàn)程被阻塞,看起來(lái)的表象線(xiàn)程卡住啥事也做不了。
其實(shí)這樣的一個(gè)特點(diǎn)很容易讓我們想到 Java 的等待通知機(jī)制來(lái)實(shí)現(xiàn)線(xiàn)程間通信。初始化 兩個(gè)鎖
所以我這里的做法是,一旦隊(duì)列滿(mǎn)時(shí)就將寫(xiě)入線(xiàn)程調(diào)用 addLock.wait() 進(jìn)入 waiting 狀態(tài),直到空間可用時(shí)再進(jìn)行喚醒。
//初始化兩個(gè)鎖 private Object addLock = new Object(); private Object pollLock = new Object();寫(xiě)入操作代碼
public void add(Object item) { synchronized (addLock) { //隊(duì)列滿(mǎn)則阻塞 while (count >= size) { try { addLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } items[addPoint] = item; addPoint = (addPoint + 1) % size; count++; //當(dāng)隊(duì)列從0個(gè)元素添加到1個(gè)元素,則說(shuō)明從空狀態(tài)轉(zhuǎn)非空狀態(tài)可以通知一次取元素線(xiàn)程 if (count == 1) synchronized (pollLock) { pollLock.notifyAll(); } } }
所以這里聲明了兩個(gè)對(duì)象用于隊(duì)列滿(mǎn)、空情況下的互相通知作用。
在寫(xiě)入數(shù)據(jù)成功后需要使用 pollLock.notifyAll(),這樣的目的是當(dāng)獲取隊(duì)列為空時(shí),一旦寫(xiě)入數(shù)據(jù)成功就可以把消費(fèi)隊(duì)列的線(xiàn)程喚醒。
這里的 wait 和 notify 操作都需要對(duì)各自的對(duì)象使用 synchronized 方法塊,這是因?yàn)?wait 和 notifyAll 都需要獲取到各自的鎖。消費(fèi)隊(duì)列
上文也提到了:當(dāng)隊(duì)列為空時(shí),獲取隊(duì)列的線(xiàn)程需要被阻塞,直到隊(duì)列中有數(shù)據(jù)時(shí)才被喚醒。
代碼和寫(xiě)入的非常類(lèi)似,也很好理解;只是這里的等待、喚醒恰好是相反的。
總的來(lái)說(shuō)就是:
寫(xiě)入隊(duì)列滿(mǎn)時(shí)會(huì)阻塞直到獲取線(xiàn)程消費(fèi)了隊(duì)列數(shù)據(jù)后喚醒寫(xiě)入線(xiàn)程。
消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫(xiě)入線(xiàn)程寫(xiě)入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線(xiàn)程。
public Object poll() { synchronized (pollLock) { //隊(duì)列空則阻塞 while (count == 0) { try { pollLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Object value = items[pollPoint]; pollPoint = (pollPoint + 1) % size; count--; //當(dāng)隊(duì)列從滿(mǎn)到非滿(mǎn),可以通知一次增添元素的線(xiàn)程 if (count == (size - 1)) synchronized (addLock) { addLock.notifyAll(); } return value; } }測(cè)試
每一秒出隊(duì)1個(gè)
//消費(fèi)者 //每一秒出隊(duì)1個(gè) class Counsum extends Thread { private ArrayBlockQueue queue; public Counsum(ArrayBlockQueue queue) { this.queue = queue; } @Override public void run() { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 出隊(duì)一個(gè)數(shù)據(jù): " + queue.poll()); } } }
每5秒入隊(duì)2個(gè),可見(jiàn)生產(chǎn)明顯慢于消費(fèi)
//生產(chǎn)者 //每5秒入隊(duì)2個(gè) class Product extends Thread { private ArrayBlockQueue queue; public Product(ArrayBlockQueue queue) { this.queue = queue; } @Override public void run() { int i = 0; while (true) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 0; j < 2; j++, i++) { queue.add(i); System.out.println(Thread.currentThread().getName() + "【入隊(duì)一個(gè)數(shù)據(jù): " + i+"】"); } } } }main
public class Test extends Object { public static void main(String[] args) throws CloneNotSupportedException { ArrayBlockQueue queue = new ArrayBlockQueue(5); Thread counsum = new Counsum(queue); Thread counsum1 = new Counsum(queue); Thread counsum2 = new Counsum(queue); Thread product = new Product(queue); Thread product1 = new Product(queue); Thread product2 = new Product(queue); counsum.start(); product.start(); counsum1.start(); product1.start(); counsum2.start(); product2.start(); } } //output 消費(fèi)者線(xiàn)程-1出隊(duì)一個(gè)數(shù)據(jù): 0 生成者線(xiàn)程-3入隊(duì)一個(gè)數(shù)據(jù): 0 消費(fèi)者線(xiàn)程-3出隊(duì)一個(gè)數(shù)據(jù): 0 消費(fèi)者線(xiàn)程-2出隊(duì)一個(gè)數(shù)據(jù): 0 生成者線(xiàn)程-2入隊(duì)一個(gè)數(shù)據(jù): 0 生成者線(xiàn)程-1入隊(duì)一個(gè)數(shù)據(jù): 0 生成者線(xiàn)程-2入隊(duì)一個(gè)數(shù)據(jù): 1 生成者線(xiàn)程-3入隊(duì)一個(gè)數(shù)據(jù): 1 生成者線(xiàn)程-1入隊(duì)一個(gè)數(shù)據(jù): 1 消費(fèi)者線(xiàn)程-3出隊(duì)一個(gè)數(shù)據(jù): 1 消費(fèi)者線(xiàn)程-1出隊(duì)一個(gè)數(shù)據(jù): 1 消費(fèi)者線(xiàn)程-2出隊(duì)一個(gè)數(shù)據(jù): 1 消費(fèi)者線(xiàn)程-2出隊(duì)一個(gè)數(shù)據(jù): 2 生成者線(xiàn)程-2入隊(duì)一個(gè)數(shù)據(jù): 2 消費(fèi)者線(xiàn)程-1出隊(duì)一個(gè)數(shù)據(jù): 2 生成者線(xiàn)程-3入隊(duì)一個(gè)數(shù)據(jù): 2 生成者線(xiàn)程-1入隊(duì)一個(gè)數(shù)據(jù): 2 消費(fèi)者線(xiàn)程-3出隊(duì)一個(gè)數(shù)據(jù): 2 生成者線(xiàn)程-1入隊(duì)一個(gè)數(shù)據(jù): 3 生成者線(xiàn)程-3入隊(duì)一個(gè)數(shù)據(jù): 3 生成者線(xiàn)程-2入隊(duì)一個(gè)數(shù)據(jù): 3 消費(fèi)者線(xiàn)程-1出隊(duì)一個(gè)數(shù)據(jù): 3 消費(fèi)者線(xiàn)程-3出隊(duì)一個(gè)數(shù)據(jù): 3 消費(fèi)者線(xiàn)程-2出隊(duì)一個(gè)數(shù)據(jù): 3
引用
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75935.html
摘要:什么是阻塞隊(duì)列阻塞隊(duì)列是一個(gè)在隊(duì)列基礎(chǔ)上又支持了兩個(gè)附加操作的隊(duì)列。阻塞隊(duì)列的應(yīng)用場(chǎng)景阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是向隊(duì)列里添加元素的線(xiàn)程,消費(fèi)者是從隊(duì)列里取元素的線(xiàn)程。由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。 什么是阻塞隊(duì)列? 阻塞隊(duì)列是一個(gè)在隊(duì)列基礎(chǔ)上又支持了兩個(gè)附加操作的隊(duì)列。 2個(gè)附加操作: 支持阻塞的插入方法:隊(duì)列滿(mǎn)時(shí),隊(duì)列會(huì)阻塞插入元素的線(xiàn)程,直到隊(duì)列不滿(mǎn)。 支持阻塞的...
摘要:是基于鏈接節(jié)點(diǎn)的線(xiàn)程安全的隊(duì)列。通過(guò)這些高效并且線(xiàn)程安全的隊(duì)列類(lèi),為我們快速搭建高質(zhì)量的多線(xiàn)程程序帶來(lái)極大的便利。隊(duì)列內(nèi)部?jī)H允許容納一個(gè)元素。該隊(duì)列的頭部是延遲期滿(mǎn)后保存時(shí)間最長(zhǎng)的元素。 隊(duì)列簡(jiǎn)述 Queue: 基本上,一個(gè)隊(duì)列就是一個(gè)先入先出(FIFO)的數(shù)據(jù)結(jié)構(gòu)Queue接口與List、Set同一級(jí)別,都是繼承了Collection接口。LinkedList實(shí)現(xiàn)了Deque接 口。...
摘要:知識(shí)點(diǎn)總結(jié)容器知識(shí)點(diǎn)總結(jié)容器接口與是在同一級(jí)別,都是繼承了接口。另一種隊(duì)列則是雙端隊(duì)列,支持在頭尾兩端插入和移除元素,主要包括。一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。是一個(gè)阻塞的線(xiàn)程安全的隊(duì)列,底層實(shí)現(xiàn)也是使用鏈?zhǔn)浇Y(jié)構(gòu)。 Java知識(shí)點(diǎn)總結(jié)(Java容器-Queue) @(Java知識(shí)點(diǎn)總結(jié))[Java, Java容器] Queue Queue接口與List、Set是在同一級(jí)別,都是繼承了...
摘要:盡管中已經(jīng)包含了阻塞隊(duì)列的官方實(shí)現(xiàn),但是熟悉其背后的原理還是很有幫助的。阻塞隊(duì)列的實(shí)現(xiàn)阻塞隊(duì)列的實(shí)現(xiàn)類(lèi)似于帶上限的的實(shí)現(xiàn)。下面是阻塞隊(duì)列的一個(gè)簡(jiǎn)單實(shí)現(xiàn)必須注意到,在和方法內(nèi)部,只有隊(duì)列的大小等于上限或者下限時(shí),才調(diào)用方法。 阻塞隊(duì)列與普通隊(duì)列的區(qū)別在于,當(dāng)隊(duì)列是空的時(shí),從隊(duì)列中獲取元素的操作將會(huì)被阻塞,或者當(dāng)隊(duì)列是滿(mǎn)時(shí),往隊(duì)列里添加元素的操作會(huì)被阻塞。試圖從空的阻塞隊(duì)列中獲取元素的線(xiàn)程...
摘要:隊(duì)列中有元素時(shí),就說(shuō)明有過(guò)期了,線(xiàn)程繼續(xù)執(zhí)行,然后元素出隊(duì),根據(jù)相應(yīng)的移除緩存。所以嚴(yán)格來(lái)說(shuō),雖然實(shí)現(xiàn)了隊(duì)列接口,但是它的目的卻并不是隊(duì)列,而是將生產(chǎn)者消費(fèi)者線(xiàn)程配對(duì)。轉(zhuǎn)移隊(duì)列鏈?zhǔn)睫D(zhuǎn)移隊(duì)列。 引言 本周在編寫(xiě)短信驗(yàn)證碼頻率限制切面的時(shí)候,經(jīng)潘老師給的實(shí)現(xiàn)思路,使用隊(duì)列進(jìn)行實(shí)現(xiàn)。 看了看java.util包下的Queue接口,發(fā)現(xiàn)還從來(lái)沒(méi)用過(guò)呢! Collection集合類(lèi)接口,由它派生...
摘要:并發(fā)編程實(shí)戰(zhàn)水平很高,然而并不是本好書(shū)。一是多線(xiàn)程的控制,二是并發(fā)同步的管理。最后,使用和來(lái)關(guān)閉線(xiàn)程池,停止其中的線(xiàn)程。當(dāng)線(xiàn)程調(diào)用或等阻塞時(shí),對(duì)這個(gè)線(xiàn)程調(diào)用會(huì)使線(xiàn)程醒來(lái),并受到,且線(xiàn)程的中斷標(biāo)記被設(shè)置。 《Java并發(fā)編程實(shí)戰(zhàn)》水平很高,然而并不是本好書(shū)。組織混亂、長(zhǎng)篇大論、難以消化,中文翻譯也較死板。這里是一篇批評(píng)此書(shū)的帖子,很是貼切。俗話(huà)說(shuō):看到有這么多人罵你,我就放心了。 然而知...
閱讀 3229·2021-11-08 13:21
閱讀 1209·2021-08-12 13:28
閱讀 1419·2019-08-30 14:23
閱讀 1939·2019-08-30 11:09
閱讀 852·2019-08-29 13:22
閱讀 2699·2019-08-29 13:12
閱讀 2560·2019-08-26 17:04
閱讀 2270·2019-08-26 13:22