摘要:我們將使用單個(gè)線程管理任務(wù)放入隊(duì)列的操作以及從隊(duì)列中取出的操作。同時(shí)這個(gè)線程會(huì)持續(xù)的管理隊(duì)列。另一個(gè)線程將用來(lái)創(chuàng)建,它將一直運(yùn)行知道服務(wù)器終止。此線程永遠(yuǎn)不會(huì)過(guò)期,有助于實(shí)現(xiàn)持續(xù)監(jiān)控。這些請(qǐng)求將會(huì)自動(dòng)的被獲取,并在線程中繼續(xù)處理。
在Java中,BlockingQueue接口位于java.util.concurrent包下。阻塞隊(duì)列主要用來(lái)線程安全的實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型。他們可以使用于多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者的場(chǎng)景中。
我們可以在各種論壇和文章中找到BlockingQueue的范例。在這篇文章中,我們將介紹如何持續(xù)管理隊(duì)列中的請(qǐng)求,以及如何在請(qǐng)求進(jìn)入隊(duì)列后立刻處理。
我們將使用單個(gè)線程管理任務(wù)放入隊(duì)列的操作以及從隊(duì)列中取出的操作。同時(shí)這個(gè)線程會(huì)持續(xù)的管理隊(duì)列。另一個(gè)線程將用來(lái)創(chuàng)建BlockingQueue,它將一直運(yùn)行知道服務(wù)器終止。
阻塞隊(duì)列的大小可以在對(duì)象初始化的時(shí)候設(shè)置。它的大小應(yīng)該基于系統(tǒng)堆的大小。
現(xiàn)在,讓我們回顧創(chuàng)建阻塞隊(duì)列的步驟以及如何持續(xù)的管理和處理請(qǐng)求。
Step 1: EventData新建一個(gè)EventData的POJO類(lèi),它會(huì)存儲(chǔ)生產(chǎn)者產(chǎn)生的事件數(shù)據(jù)并輸入到隊(duì)列中 - 同時(shí)它會(huì)被消費(fèi)者從隊(duì)列中取出e并處理。
package com.dzone.blockingqueue.example; class EventData { private String eventID; private String eventName; private String eventDate; private String eventType; private String eventLocation; public String getEventID() { return eventID; } public void setEventID(String eventID) { this.eventID = eventID; } public String getEventName() { return eventName; } public void setEventName(String eventName) { this.eventName = eventName; } public String getEventDate() { return eventDate; } public void setEventDate(String eventDate) { this.eventDate = eventDate; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public String getEventLocation() { return eventLocation; } public void setEventLocation(String eventLocation) { this.eventLocation = eventLocation; } }Step 2: QueueService
創(chuàng)建一個(gè)QueueService單例類(lèi),用來(lái)將請(qǐng)求放入隊(duì)列中,以及從隊(duì)列中提取請(qǐng)求并處理。
package com.dzone.blockingqueue.example; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class QueueService { private static QueueService instance = null; private static BlockingQueue < EventData > eventQueue = null; private QueueService() {} public static QueueService getInstance() { if (instance == null) { instance = new QueueService(); } return instance; } private void initialize() { if (eventQueue == null) { eventQueue = new LinkedBlockingQueue(); EventProcessor eventProcessor = new EventProcessor(); eventProcessor.start(); } } public void putEventInQueue(EventData eventData) { try { initialize(); eventQueue.put(eventData); } catch (InterruptedException ex) { ex.printStackTrace(); } } class EventProcessor extends Thread { @Override public void run() { for (;;) { EventData eventData = null; try { eventData = eventQueue.take(); System.out.println("Process Event Data : Type : " + eventData.getEventType() + " / Name : " + eventData.getEventName()); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } }
我們新建了一個(gè)靜態(tài)的BlockingQueue變量。它在初始化時(shí)會(huì)比初始化為ArrayBlockingQueue或是LinkedBlockingQueue,這取決于需求。在此之后,這個(gè)對(duì)象會(huì)被用來(lái)放入或是提取請(qǐng)求。
我們還新建了一個(gè)繼承了Thread的EventProcessor私有類(lèi)。它在BlockingQueue初始化的時(shí)候啟動(dòng)。在EventProcessor中使用了一個(gè)for循環(huán)來(lái)管理隊(duì)列。BlockingQueue的優(yōu)點(diǎn)在于它會(huì)在沒(méi)有元素的時(shí)候進(jìn)入等待模式。當(dāng)隊(duì)列為空時(shí),for循環(huán)不會(huì)繼續(xù)遍歷。當(dāng)請(qǐng)求進(jìn)入隊(duì)列后,BlockingQueue會(huì)繼續(xù)運(yùn)行并處理請(qǐng)求。
單個(gè)EventProcessor線程將處理特定隊(duì)列中的所有請(qǐng)求。此線程永遠(yuǎn)不會(huì)過(guò)期,有助于實(shí)現(xiàn)持續(xù)監(jiān)控。
我們還在QueueService中創(chuàng)建了一個(gè)公有的putEventInQueue方法,它會(huì)幫助我們將請(qǐng)求放入由getInstance方法獲取的隊(duì)列中。在這個(gè)方法里,請(qǐng)求被放入BlockingQueue。這些請(qǐng)求將會(huì)自動(dòng)的被BlockingQueue獲取,并在EventProcessor線程中繼續(xù)處理。
Step 3: EventService現(xiàn)在讓我們向隊(duì)列中加載數(shù)據(jù)。我們已經(jīng)實(shí)現(xiàn)了一個(gè)EventService類(lèi)。它會(huì)將幾個(gè)請(qǐng)求寫(xiě)入BlockingQueue中。在QueueService中,我們會(huì)看到請(qǐng)求是如何被取出并處理的。
package com.dzone.blockingqueue.example; public class EventService { public static void main(String arg[]) { try { EventData event = null; for (int i = 0; i < 100; i++) { event = new EventData(); event.setEventType("EventType " + i); event.setEventName("EventName " + i); QueueService.getInstance().putEventInQueue(event); Thread.sleep(100); } } catch (InterruptedException e) { e.printStackTrace(); } } }Step 4: EventProcessor Output
輸出結(jié)果如下:
Process Event Data : Type : EventType 0 / Name : EventName 0 Process Event Data : Type : EventType 1 / Name : EventName 1 Process Event Data : Type : EventType 2 / Name : EventName 2 Process Event Data : Type : EventType 3 / Name : EventName 3 Process Event Data : Type : EventType 4 / Name : EventName 4
想要了解更多開(kāi)發(fā)技術(shù),面試教程以及互聯(lián)網(wǎng)公司內(nèi)推,歡迎關(guān)注我的微信公眾號(hào)!將會(huì)不定期的發(fā)放福利哦~
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68732.html
摘要:如果我們的容器使用,文件如下在這個(gè)例子中,我們可以重復(fù)創(chuàng)建和銷(xiāo)毀,同一個(gè)持久存儲(chǔ)會(huì)被提供給新的,無(wú)論容器位于哪個(gè)節(jié)點(diǎn)上。 前言 臨時(shí)性存儲(chǔ)是容器的一個(gè)很大的買(mǎi)點(diǎn)。根據(jù)一個(gè)鏡像啟動(dòng)容器,隨意變更,然后停止變更重啟一個(gè)容器。你看,一個(gè)全新的文件系統(tǒng)又誕生了。 在docker的語(yǔ)境下: # docker run -it centos [root@d42876f95c6a /]# echo H...
摘要:如果我們的容器使用,文件如下在這個(gè)例子中,我們可以重復(fù)創(chuàng)建和銷(xiāo)毀,同一個(gè)持久存儲(chǔ)會(huì)被提供給新的,無(wú)論容器位于哪個(gè)節(jié)點(diǎn)上。 前言 臨時(shí)性存儲(chǔ)是容器的一個(gè)很大的買(mǎi)點(diǎn)。根據(jù)一個(gè)鏡像啟動(dòng)容器,隨意變更,然后停止變更重啟一個(gè)容器。你看,一個(gè)全新的文件系統(tǒng)又誕生了。 在docker的語(yǔ)境下: # docker run -it centos [root@d42876f95c6a /]# echo H...
摘要:讀取出數(shù)據(jù)時(shí),將此版本號(hào)一同讀出,之后更新時(shí),對(duì)此版本號(hào)加一。此時(shí),將提交數(shù)據(jù)的版本數(shù)據(jù)與數(shù)據(jù)庫(kù)表對(duì)應(yīng)記錄的當(dāng)前版本信息進(jìn)行比對(duì),如果提交的數(shù)據(jù)版本號(hào)大于數(shù)據(jù)庫(kù)表當(dāng)前版本號(hào),則予以更新,否則認(rèn)為是過(guò)期數(shù)據(jù)。 前言 很多人都在討論數(shù)據(jù)的指數(shù)型增長(zhǎng),以及我們將會(huì)有比想象的還要大的數(shù)據(jù)量。但是,很少有人從數(shù)據(jù)庫(kù)的角度談?wù)撨@個(gè)問(wèn)題。隨著數(shù)據(jù)量的暴漲,數(shù)據(jù)庫(kù)也需要隨之升級(jí)。這也是為什么既要了解如...
摘要:根本上來(lái)說(shuō),這意味著不僅要將整個(gè)應(yīng)用程序分解,而且要將任何一個(gè)服務(wù)器中的各個(gè)部分分解為多個(gè)模塊化容器,這些容器易于參數(shù)化和重復(fù)使用。在中,這種模塊化容器服務(wù)的實(shí)施者是。一個(gè)是指一組共享文件系統(tǒng),內(nèi)核命名空間和地址的一組容器。 過(guò)去幾年容器逐漸成為了打包和部署代碼的流行的方式。容器鏡像解決很多現(xiàn)有的打包和部署工具所帶來(lái)的問(wèn)題,初次以外,還為我們提供了構(gòu)建分布式應(yīng)用的全新的思路。就如SOA...
閱讀 1141·2021-11-16 11:45
閱讀 3152·2021-10-13 09:40
閱讀 747·2019-08-26 13:45
閱讀 1245·2019-08-26 13:32
閱讀 2198·2019-08-26 13:23
閱讀 943·2019-08-26 12:16
閱讀 2847·2019-08-26 11:37
閱讀 1781·2019-08-26 10:32