摘要:生產(chǎn)者消費(fèi)者模式是一個(gè)經(jīng)典的多線(xiàn)程設(shè)計(jì)模式,它為多線(xiàn)程的協(xié)作提供了良好的解決方案。生產(chǎn)者消費(fèi)者模式中的內(nèi)存緩沖區(qū)的主要功能是數(shù)據(jù)在多線(xiàn)程間的共享。
生產(chǎn)者-消費(fèi)者模式是一個(gè)經(jīng)典的多線(xiàn)程設(shè)計(jì)模式,它為多線(xiàn)程的協(xié)作提供了良好的解決方案。在生產(chǎn)者-消費(fèi)者模式中,通常有兩類(lèi)線(xiàn)程,即若干個(gè)生產(chǎn)者線(xiàn)程和若干個(gè)消費(fèi)者線(xiàn)程。生產(chǎn)者線(xiàn)程負(fù)責(zé)提交用戶(hù)請(qǐng)求,消費(fèi)者線(xiàn)程負(fù)責(zé)處理用戶(hù)請(qǐng)求。生產(chǎn)者和消費(fèi)者之間通過(guò)共享內(nèi)存緩沖區(qū)進(jìn)行通信。
生產(chǎn)者-消費(fèi)者模式中的內(nèi)存緩沖區(qū)的主要功能是數(shù)據(jù)在多線(xiàn)程間的共享。
1.創(chuàng)建一個(gè)被消費(fèi)的對(duì)象
public final class Data{ private String id; private String name; //getter/setter(),toString()省略,構(gòu)造方法省略 }
2.創(chuàng)建一個(gè)生產(chǎn)者
public class Provider implements Runnable{ //共享緩存區(qū) private BlockingQueue queue; //多線(xiàn)程間是否啟動(dòng)變量,有強(qiáng)制從主內(nèi)存中刷新的功能。即時(shí)返回線(xiàn)程的狀態(tài) private volatile boolean isRunning = true; //id生成器 private static AtomicInteger count = new AtomicInteger(); //隨機(jī)對(duì)象 private static Random r = new Random(); public Provider(BlockingQueue queue){ this.queue = queue; } @Override public void run() { while(isRunning){ try { //隨機(jī)休眠0 - 1000 毫秒 表示獲取數(shù)據(jù)(產(chǎn)生數(shù)據(jù)的耗時(shí)) Thread.sleep(r.nextInt(1000)); //獲取的數(shù)據(jù)進(jìn)行累計(jì)... int id = count.incrementAndGet(); //比如通過(guò)一個(gè)getData方法獲取了 Data data = new Data(Integer.toString(id), "數(shù)據(jù)" + id); System.out.println("當(dāng)前線(xiàn)程:" + Thread.currentThread().getName() + ", 獲取了數(shù)據(jù),id為:" + id + ", 進(jìn)行裝載到公共緩沖區(qū)中..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ System.out.println("提交緩沖區(qū)數(shù)據(jù)失敗...."); //do something... 比如重新提交 } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop(){ this.isRunning = false; } }
3.添加一個(gè)消費(fèi)者
public class Consumer implements Runnable{ private BlockingQueue queue; public Consumer(BlockingQueue queue){ this.queue = queue; } //隨機(jī)對(duì)象 private static Random r = new Random(); @Override public void run() { while(true){ try { //獲取數(shù)據(jù) Data data = this.queue.take(); //進(jìn)行數(shù)據(jù)處理。休眠0 - 1000毫秒模擬耗時(shí) Thread.sleep(r.nextInt(1000)); System.out.println("當(dāng)前消費(fèi)線(xiàn)程:" + Thread.currentThread().getName() + ", 消費(fèi)成功,消費(fèi)數(shù)據(jù)為id: " + data.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
4.定義一個(gè)測(cè)試類(lèi)
public class Main{ public static void main(String[] args) throws Exception { //內(nèi)存緩沖區(qū) BlockingQueue queue = new LinkedBlockingQueue(10); //生產(chǎn)者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); //消費(fèi)者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //創(chuàng)建線(xiàn)程池運(yùn)行,這是一個(gè)緩存的線(xiàn)程池,可以創(chuàng)建無(wú)窮大的線(xiàn)程, //沒(méi)有任務(wù)的時(shí)候不創(chuàng)建線(xiàn)程。空閑線(xiàn)程存活時(shí)間為60s(默認(rèn)值) ExecutorService cachePool = Executors.newCachedThreadPool(); cachePool.execute(p1); cachePool.execute(p2); cachePool.execute(p3); cachePool.execute(c1); cachePool.execute(c2); cachePool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // cachePool.shutdown(); // cachePool.shutdownNow(); } }
運(yùn)行結(jié)果如下所示
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70455.html
摘要:模式是常用的并行計(jì)算模式,它的核心思想是系統(tǒng)是由兩類(lèi)進(jìn)程協(xié)助工作。負(fù)責(zé)接收和分配任務(wù),負(fù)責(zé)處理子任務(wù)。當(dāng)各個(gè)子進(jìn)程處理完成后,會(huì)返回結(jié)果給,由做歸納和總結(jié)。其好處是能將一個(gè)大任務(wù)分解成若干個(gè)小任務(wù),并行執(zhí)行,從而提高系統(tǒng)的吞吐量。 Master-worker模式是常用的并行計(jì)算模式,它的核心思想是系統(tǒng)是由兩類(lèi)進(jìn)程協(xié)助工作。Master負(fù)責(zé)接收和分配任務(wù),worker負(fù)責(zé)處理子任務(wù)。當(dāng)各...
摘要:模式類(lèi)似于用戶(hù)提交商品訂單,下單成功以后后臺(tái)異步的執(zhí)行耗時(shí)的業(yè)務(wù)在包中接口是線(xiàn)程模式的實(shí)現(xiàn),可以來(lái)進(jìn)行異步計(jì)算。 Future模式類(lèi)似于用戶(hù)提交商品訂單,下單成功以后后臺(tái)異步的執(zhí)行耗時(shí)的業(yè)務(wù)在java.util.concurrent包中.Future接口是Java線(xiàn)程Future模式的實(shí)現(xiàn),可以來(lái)進(jìn)行異步計(jì)算。 showImg(https://segmentfault.com/img/...
摘要:導(dǎo)讀閱讀本文需要有足夠的時(shí)間,筆者會(huì)由淺到深帶你一步一步了解一個(gè)資深架構(gòu)師所要掌握的各類(lèi)知識(shí)點(diǎn),你也可以按照文章中所列的知識(shí)體系對(duì)比自身,對(duì)自己進(jìn)行查漏補(bǔ)缺,覺(jué)得本文對(duì)你有幫助的話(huà),可以點(diǎn)贊關(guān)注一下。目錄一基礎(chǔ)篇二進(jìn)階篇三高級(jí)篇四架構(gòu)篇五擴(kuò) 導(dǎo)讀:閱讀本文需要有足夠的時(shí)間,筆者會(huì)由淺到深帶你一步一步了解一個(gè)資深架構(gòu)師所要掌握的各類(lèi)知識(shí)點(diǎn),你也可以按照文章中所列的知識(shí)體系對(duì)比自身,對(duì)自己...
摘要:程序執(zhí)行時(shí),至少會(huì)有一個(gè)線(xiàn)程在運(yùn)行,這個(gè)運(yùn)行的線(xiàn)程被稱(chēng)為主線(xiàn)程。程序的終止是指除守護(hù)線(xiàn)程以外的線(xiàn)程全部終止。多線(xiàn)程程序由多個(gè)線(xiàn)程組成的程序稱(chēng)為多線(xiàn)程程序。線(xiàn)程休眠期間可以被中斷,中斷將會(huì)拋出異常。 線(xiàn)程 我們?cè)陂喿x程序時(shí),表面看來(lái)是在跟蹤程序的處理流程,實(shí)際上跟蹤的是線(xiàn)程的執(zhí)行。 單線(xiàn)程程序 在單線(xiàn)程程序中,在某個(gè)時(shí)間點(diǎn)執(zhí)行的處理只有一個(gè)。 Java 程序執(zhí)行時(shí),至少會(huì)有一個(gè)線(xiàn)程在運(yùn)行...
摘要:并發(fā)與并行并發(fā)與并行的概念并行多個(gè)實(shí)例或者多臺(tái)機(jī)器同時(shí)執(zhí)行一段處理邏輯,是真正的同時(shí)。并發(fā)通過(guò)調(diào)度算法,讓用戶(hù)看上去同時(shí)執(zhí)行,實(shí)際上從操作層面不是真正的同時(shí)。并行與并發(fā)的異同點(diǎn)相似性都是為了合理且最大化利用系統(tǒng)的資源。 并發(fā)(concurrency)與并行(parallellism) 并發(fā)與并行的概念 ??并行:多個(gè)cpu實(shí)例或者多臺(tái)機(jī)器同時(shí)執(zhí)行一段處理邏輯,是真正的同時(shí)。 ...
閱讀 1535·2021-11-23 09:51
閱讀 3649·2021-09-26 09:46
閱讀 2139·2021-09-22 10:02
閱讀 1864·2019-08-30 15:56
閱讀 3335·2019-08-30 12:51
閱讀 2240·2019-08-30 11:12
閱讀 2070·2019-08-29 13:23
閱讀 2332·2019-08-29 13:16