摘要:自己實(shí)現(xiàn)在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫入線程寫入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。最終的隊(duì)列大小為,可見(jiàn)線程也是安全的。
前言
較長(zhǎng)一段時(shí)間以來(lái)我都發(fā)現(xiàn)不少開(kāi)發(fā)者對(duì) jdk 中的 J.U.C(java.util.concurrent)也就是 Java 并發(fā)包的使用甚少,更別談對(duì)它的理解了;但這卻也是我們進(jìn)階的必備關(guān)卡。
之前或多或少也分享過(guò)相關(guān)內(nèi)容,但都不成體系;于是便想整理一套與并發(fā)包相關(guān)的系列文章。
其中的內(nèi)容主要包含以下幾個(gè)部分:
根據(jù)定義自己實(shí)現(xiàn)一個(gè)并發(fā)工具。
JDK 的標(biāo)準(zhǔn)實(shí)現(xiàn)。
實(shí)踐案例。
基于這三點(diǎn)我相信大家對(duì)這部分內(nèi)容不至于一問(wèn)三不知。
既然開(kāi)了一個(gè)新坑,就不想做的太差;所以我打算將這個(gè)列表下的大部分類都講到。
所以本次重點(diǎn)討論 ArrayBlockingQueue。
自己實(shí)現(xiàn)在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn):
基本隊(duì)列特性:先進(jìn)先出。
寫入隊(duì)列空間不可用時(shí)會(huì)阻塞。
獲取隊(duì)列數(shù)據(jù)時(shí)當(dāng)隊(duì)列為空時(shí)將阻塞。
實(shí)現(xiàn)隊(duì)列的方式多種,總的來(lái)說(shuō)就是數(shù)組和鏈表;其實(shí)我們只需要搞清楚其中一個(gè)即可,不同的特性主要表現(xiàn)為數(shù)組和鏈表的區(qū)別。
這里的 ArrayBlockingQueue 看名字很明顯是由數(shù)組實(shí)現(xiàn)。
我們先根據(jù)它這三個(gè)特性嘗試自己實(shí)現(xiàn)試試。
初始化隊(duì)列我這里自定義了一個(gè)類:ArrayQueue,它的構(gòu)造函數(shù)如下:
public ArrayQueue(int size) { items = new Object[size]; }
很明顯這里的 items 就是存放數(shù)據(jù)的數(shù)組;在初始化時(shí)需要根據(jù)大小創(chuàng)建數(shù)組。
寫入隊(duì)列寫入隊(duì)列比較簡(jiǎn)單,只需要依次把數(shù)據(jù)存放到這個(gè)數(shù)組中即可,如下圖:
但還是有幾個(gè)需要注意的點(diǎn):
隊(duì)列滿的時(shí)候,寫入的線程需要被阻塞。
寫入過(guò)隊(duì)列的數(shù)量大于隊(duì)列大小時(shí)需要從第一個(gè)下標(biāo)開(kāi)始寫。
先看第一個(gè)隊(duì)列滿的時(shí)候,寫入的線程需要被阻塞,先來(lái)考慮下如何才能使一個(gè)線程被阻塞,看起來(lái)的表象線程卡住啥事也做不了。
有幾種方案可以實(shí)現(xiàn)這個(gè)效果:
Thread.sleep(timeout)線程休眠。
object.wait() 讓線程進(jìn)入 waiting 狀態(tài)。
當(dāng)然還有一些 join、LockSupport.part 等不在本次的討論范圍。
阻塞隊(duì)列還有一個(gè)非常重要的特性是:當(dāng)隊(duì)列空間可用時(shí)(取出隊(duì)列),寫入線程需要被喚醒讓數(shù)據(jù)可以寫入進(jìn)去。
所以很明顯Thread.sleep(timeout)不合適,它在到達(dá)超時(shí)時(shí)間之后便會(huì)繼續(xù)運(yùn)行;達(dá)不到空間可用時(shí)才喚醒繼續(xù)運(yùn)行這個(gè)特點(diǎn)。
其實(shí)這樣的一個(gè)特點(diǎn)很容易讓我們想到 Java 的等待通知機(jī)制來(lái)實(shí)現(xiàn)線程間通信;更多線程見(jiàn)通信的方案可以參考這里:深入理解線程通信
所以我這里的做法是,一旦隊(duì)列滿時(shí)就將寫入線程調(diào)用 object.wait() 進(jìn)入 waiting 狀態(tài),直到空間可用時(shí)再進(jìn)行喚醒。
/** * 隊(duì)列滿時(shí)的阻塞鎖 */ private Object full = new Object(); /** * 隊(duì)列空時(shí)的阻塞鎖 */ private Object empty = new Object();
所以這里聲明了兩個(gè)對(duì)象用于隊(duì)列滿、空情況下的互相通知作用。
在寫入數(shù)據(jù)成功后需要使用 empty.notify(),這樣的目的是當(dāng)獲取隊(duì)列為空時(shí),一旦寫入數(shù)據(jù)成功就可以把消費(fèi)隊(duì)列的線程喚醒。
這里的 wait 和 notify 操作都需要對(duì)各自的對(duì)象使用 synchronized 方法塊,這是因?yàn)?wait 和 notify 都需要獲取到各自的鎖。消費(fèi)隊(duì)列
上文也提到了:當(dāng)隊(duì)列為空時(shí),獲取隊(duì)列的線程需要被阻塞,直到隊(duì)列中有數(shù)據(jù)時(shí)才被喚醒。
代碼和寫入的非常類似,也很好理解;只是這里的等待、喚醒恰好是相反的,通過(guò)下面這張圖可以很好理解:
總的來(lái)說(shuō)就是:
寫入隊(duì)列滿時(shí)會(huì)阻塞直到獲取線程消費(fèi)了隊(duì)列數(shù)據(jù)后喚醒寫入線程。
消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫入線程寫入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。
測(cè)試先來(lái)一個(gè)基本的測(cè)試:?jiǎn)尉€程的寫入和消費(fèi)。
3 123 1234 12345
通過(guò)結(jié)果來(lái)看沒(méi)什么問(wèn)題。
當(dāng)寫入的數(shù)據(jù)超過(guò)隊(duì)列的大小時(shí),就只能消費(fèi)之后才能接著寫入。
2019-04-09 16:24:41.040 [Thread-0] INFO c.c.concurrent.ArrayQueueTest - [Thread-0]123 2019-04-09 16:24:41.040 [main] INFO c.c.concurrent.ArrayQueueTest - size=3 2019-04-09 16:24:41.047 [main] INFO c.c.concurrent.ArrayQueueTest - 1234 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 12345 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 123456
從運(yùn)行結(jié)果也能看出只有當(dāng)消費(fèi)數(shù)據(jù)后才能接著往隊(duì)列里寫入數(shù)據(jù)。
而當(dāng)沒(méi)有消費(fèi)時(shí),再往隊(duì)列里寫數(shù)據(jù)則會(huì)導(dǎo)致寫入線程被阻塞。
并發(fā)測(cè)試三個(gè)線程并發(fā)寫入300條數(shù)據(jù),其中一個(gè)線程消費(fèi)一條。
=====0 299
最終的隊(duì)列大小為 299,可見(jiàn)線程也是安全的。
由于不管是寫入還是獲取方法里的操作都需要獲取鎖才能操作,所以整個(gè)隊(duì)列是線程安全的。ArrayBlockingQueue
下面來(lái)看看 JDK 標(biāo)準(zhǔn)的 ArrayBlockingQueue 的實(shí)現(xiàn),有了上面的基礎(chǔ)會(huì)更好理解。
初始化隊(duì)列看似要復(fù)雜些,但其實(shí)逐步拆分后也很好理解:
第一步其實(shí)和我們自己寫的一樣,初始化一個(gè)隊(duì)列大小的數(shù)組。
第二步初始化了一個(gè)重入鎖,這里其實(shí)就和我們之前使用的 synchronized 作用一致的;
只是這里在初始化重入鎖的時(shí)候默認(rèn)是非公平鎖,當(dāng)然也可以指定為 true 使用公平鎖;這樣就會(huì)按照隊(duì)列的順序進(jìn)行寫入和消費(fèi)。
更多關(guān)于 ReentrantLock 的使用和原理請(qǐng)參考這里:ReentrantLock 實(shí)現(xiàn)原理
三四兩步則是創(chuàng)建了 notEmpty notFull 這兩個(gè)條件,他的作用于用法和之前使用的 object.wait/notify 類似。
這就是整個(gè)初始化的內(nèi)容,其實(shí)和我們自己實(shí)現(xiàn)的非常類似。
寫入隊(duì)列其實(shí)會(huì)發(fā)現(xiàn)阻塞寫入的原理都是差不多的,只是這里使用的是 Lock 來(lái)顯式獲取和釋放鎖。
同時(shí)其中的 notFull.await();notEmpty.signal(); 和我們之前使用的 object.wait/notify 的用法和作用也是一樣的。
當(dāng)然它還是實(shí)現(xiàn)了超時(shí)阻塞的 API。
也是比較簡(jiǎn)單,使用了一個(gè)具有超時(shí)時(shí)間的等待方法。
消費(fèi)隊(duì)列再看消費(fèi)隊(duì)列:
也是差不多的,一看就懂。
而其中的超時(shí) API 也是使用了 notEmpty.awaitNanos(nanos) 來(lái)實(shí)現(xiàn)超時(shí)返回的,就不具體說(shuō)了。
實(shí)際案例說(shuō)了這么多,來(lái)看一個(gè)隊(duì)列的實(shí)際案例吧。
背景是這樣的:
有一個(gè)定時(shí)任務(wù)會(huì)按照一定的間隔時(shí)間從數(shù)據(jù)庫(kù)中讀取一批數(shù)據(jù),需要對(duì)這些數(shù)據(jù)做校驗(yàn)同時(shí)調(diào)用一個(gè)遠(yuǎn)程接口。
簡(jiǎn)單的做法就是由這個(gè)定時(shí)任務(wù)的線程去完成讀取數(shù)據(jù)、消息校驗(yàn)、調(diào)用接口等整個(gè)全流程;但這樣會(huì)有一個(gè)問(wèn)題:
假設(shè)調(diào)用外部接口出現(xiàn)了異常、網(wǎng)絡(luò)不穩(wěn)導(dǎo)致耗時(shí)增加就會(huì)造成整個(gè)任務(wù)的效率降低,因?yàn)樗际谴袝?huì)互相影響。
所以我們改進(jìn)了方案:
其實(shí)就是一個(gè)典型的生產(chǎn)者消費(fèi)者模型:
生產(chǎn)線程從數(shù)據(jù)庫(kù)中讀取消息丟到隊(duì)列里。
消費(fèi)線程從隊(duì)列里獲取數(shù)據(jù)做業(yè)務(wù)邏輯。
這樣兩個(gè)線程就可以通過(guò)這個(gè)隊(duì)列來(lái)進(jìn)行解耦,互相不影響,同時(shí)這個(gè)隊(duì)列也能起到緩沖的作用。
但在使用過(guò)程中也有一些小細(xì)節(jié)值得注意。
因?yàn)檫@個(gè)外部接口是支持批量執(zhí)行的,所以在消費(fèi)線程取出數(shù)據(jù)后會(huì)在內(nèi)存中做一個(gè)累加,一旦達(dá)到閾值或者是累計(jì)了一個(gè)時(shí)間段便將這批累計(jì)的數(shù)據(jù)處理掉。
但由于開(kāi)發(fā)者的大意,在消費(fèi)的時(shí)候使用的是 queue.take() 這個(gè)阻塞的 API;正常運(yùn)行沒(méi)啥問(wèn)題。
可一旦原始的數(shù)據(jù)源,也就是 DB 中沒(méi)數(shù)據(jù)了,導(dǎo)致隊(duì)列里的數(shù)據(jù)也被消費(fèi)完后這個(gè)消費(fèi)線程便會(huì)被阻塞。
這樣上一輪積累在內(nèi)存中的數(shù)據(jù)便一直沒(méi)機(jī)會(huì)使用,直到數(shù)據(jù)源又有數(shù)據(jù)了,一旦中間間隔較長(zhǎng)時(shí)便可能會(huì)導(dǎo)致嚴(yán)重的業(yè)務(wù)異常。
所以我們最好是使用 queue.poll(timeout) 這樣帶超時(shí)時(shí)間的 api,除非業(yè)務(wù)上有明確的要求需要阻塞。
這個(gè)習(xí)慣同樣適用于其他場(chǎng)景,比如調(diào)用 http、rpc 接口等都需要設(shè)置合理的超時(shí)時(shí)間。
總結(jié)關(guān)于 ArrayBlockingQueue 的相關(guān)分享便到此結(jié)束,接著會(huì)繼續(xù)更新其他并發(fā)容器及并發(fā)工具。
對(duì)本文有任何相關(guān)問(wèn)題都可以留言討論。
本文涉及到的所有源碼:
https://github.com/crossoverJ...
你的點(diǎn)贊與分享是對(duì)我最大的支持
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77511.html
摘要:所以也很容易想到可以利用等待通知機(jī)制來(lái)實(shí)現(xiàn),和上文的并發(fā)包入坑指北之阻塞隊(duì)列的類似。 showImg(https://segmentfault.com/img/remote/1460000019021474?w=2785&h=2785); 前言 在面試過(guò)程中聊到并發(fā)相關(guān)的內(nèi)容時(shí),不少面試官都喜歡問(wèn)這類問(wèn)題: 當(dāng) N 個(gè)線程同時(shí)完成某項(xiàng)任務(wù)時(shí),如何知道他們都已經(jīng)執(zhí)行完畢了。 這也是本次討...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見(jiàn)的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:最近業(yè)務(wù)需要抽離,抽離出來(lái)的應(yīng)用需要做成第三方包的形式,可以在任何也沒(méi)那么神奇,例如有些版本就沒(méi)測(cè)試版本項(xiàng)目中,直接安裝使用,所以這里還是需要發(fā)包到。第一次發(fā)包我是先發(fā)到環(huán)境,看下發(fā)包還是不是符合我的預(yù)期,畢竟很長(zhǎng)時(shí)間沒(méi)發(fā)過(guò)包。 最近業(yè)務(wù)需要抽離,抽離出來(lái)的應(yīng)用需要做成 Django 第三方包的形式,可以在任何 Django(也沒(méi)那么神奇,例如有些版本就沒(méi)測(cè)試)版本項(xiàng)目中,直接安裝使用...
摘要:一和并發(fā)包中的和主要解決的是線程的互斥和同步問(wèn)題,這兩者的配合使用,相當(dāng)于的使用。寫鎖與讀鎖之間互斥,一個(gè)線程在寫時(shí),不允許讀操作。的注意事項(xiàng)不支持重入,即不可反復(fù)獲取同一把鎖。沒(méi)有返回值,也就是說(shuō)無(wú)法獲取執(zhí)行結(jié)果。 一、Lock 和 Condition Java 并發(fā)包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問(wèn)題,這兩者的配合使用,相當(dāng)于 synchron...
閱讀 2022·2021-09-30 09:53
閱讀 1863·2021-09-24 09:48
閱讀 1769·2019-08-30 14:01
閱讀 2183·2019-08-29 18:35
閱讀 1260·2019-08-26 18:27
閱讀 2996·2019-08-26 12:12
閱讀 963·2019-08-23 17:16
閱讀 958·2019-08-23 15:31