摘要:所謂生產(chǎn)者消費者模式,即個線程進行生產(chǎn),同時個線程進行消費,兩種角色通過內(nèi)存緩沖區(qū)進行通信圖片來源下面我們通過四種方式,來實現(xiàn)生產(chǎn)者消費者模式。通過生產(chǎn)者調(diào)用,減少數(shù)目可以消費的數(shù)量。
所謂生產(chǎn)者消費者模式,即N個線程進行生產(chǎn),同時N個線程進行消費,兩種角色通過內(nèi)存緩沖區(qū)進行通信
圖片來源https://www.cnblogs.com/chent...
下面我們通過四種方式,來實現(xiàn)生產(chǎn)者消費者模式。
首先是最原始的synchronized方式
定義庫存類(即圖中緩存區(qū))
class Stock { private String name; // 標(biāo)記庫存是否有內(nèi)容 private boolean hasComputer = false; public synchronized void putOne(String name) { // 若庫存中已有內(nèi)容,則生產(chǎn)線程阻塞等待 while (hasComputer) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name = name; System.out.println("生產(chǎn)者...生產(chǎn)了 " + name); // 更新標(biāo)記 this.hasComputer = true; // 這里用notify的話,假設(shè)p0執(zhí)行完畢,此時c0,c1都在wait, 同時喚醒另一個provider:p1, // p1判斷標(biāo)記后休眠,造成所有線程都wait的局面,即死鎖; // 因此使用notifyAll解決死鎖問題 this.notifyAll(); } public synchronized void takeOne() { // 若庫存中沒有內(nèi)容,則消費線程阻塞等待生產(chǎn)完畢后繼續(xù) while (!hasComputer) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消費者...消費了 " + name); this.hasComputer = false; this.notifyAll(); } }
定義生產(chǎn)者和消費者(為了節(jié)省空間和方便閱讀,這里將生產(chǎn)者和消費者定義成了匿名內(nèi)部類)
public static void main(String[] args) { // 用于通信的庫存類 Stock computer = new Stock(); // 定義兩個生產(chǎn)者和兩個消費者 Thread p1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.putOne("Dell"); } } }); Thread p2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.putOne("Mac"); } } }); Thread c1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.takeOne(); } } }); Thread c2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.takeOne(); } } }); p1.start(); p2.start(); c1.start(); c2.start(); }
運行結(jié)果圖
第二種方式:Lock
Jdk1.5之后加入了Lock接口,一個lock對象可以有多個Condition類,Condition類負責(zé)對lock對象進行wait,notify,notifyall操作
定義庫存類
class LockStock { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); // 加入庫存概念,可批量生產(chǎn)和消費 // 定義最大庫存為10 final String[] stock = new String[10]; // 寫入標(biāo)記、讀取標(biāo)記、已有商品數(shù)量 int putptr, takeptr, count; public void put(String computer) { // lock代替synchronized lock.lock(); try { // 若庫存已滿則生產(chǎn)者線程阻塞 while (count == stock.length) notFull.await(); // 庫存中加入商品 stock[putptr] = computer; // 庫存已滿,指針置零,方便下次重新寫入 if (++putptr == stock.length) putptr = 0; ++count; System.out.println(computer + " 正在生產(chǎn)數(shù)據(jù): -- 庫存剩余:" + count); notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public String take(String consumerName) { lock.lock(); try { while (count == 0) notEmpty.await(); // 從庫存中獲取商品 String computer = stock[takeptr]; if (++takeptr == stock.length) takeptr = 0; --count; System.out.println(consumerName + " 正在消費數(shù)據(jù):" + computer + " -- 庫存剩余:" + count); notFull.signal(); return computer; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } // 無邏輯作用,放慢速度 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return ""; } }
以上部分代碼摘自java7 API中Condition接口的官方示例
接著還是定義生產(chǎn)者和消費者
public static void main(String[] args) { LockStock computer = new LockStock(); Thread p1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.put("Dell"); } } }); Thread p2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.put("Mac"); } } }); Thread c1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.take("zhangsan"); } } }); Thread c2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.take("李四"); } } }); // 兩個生產(chǎn)者兩個消費者同時運行 p1.start(); p2.start(); c1.start(); c2.start(); }
運行結(jié)果圖:
第三種方式:Semaphore
首先依舊是庫存類:
class Stock { Liststock = new LinkedList(); // 互斥量,控制共享數(shù)據(jù)的互斥訪問 private Semaphore mutex = new Semaphore(1); // canProduceCount可以生產(chǎn)的總數(shù)量。 通過生產(chǎn)者調(diào)用acquire,減少permit數(shù)目 private Semaphore canProduceCount = new Semaphore(10); // canConsumerCount可以消費的數(shù)量。通過生產(chǎn)者調(diào)用release,增加permit數(shù)目 private Semaphore canConsumerCount = new Semaphore(0); public void put(String computer) { try { // 可生產(chǎn)數(shù)量 -1 canProduceCount.acquire(); mutex.acquire(); // 生產(chǎn)一臺電腦 stock.add(computer); System.out.println(computer + " 正在生產(chǎn)數(shù)據(jù)" + " -- 庫存剩余:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放互斥鎖 mutex.release(); // 釋放canConsumerCount,增加可以消費的數(shù)量 canConsumerCount.release(); } // 無邏輯作用,放慢速度 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } public void get(String consumerName) { try { // 可消費數(shù)量 -1 canConsumerCount.acquire(); mutex.acquire(); // 從庫存消費一臺電腦 String removedVal = stock.remove(0); System.out.println(consumerName + " 正在消費數(shù)據(jù):" + removedVal + " -- 庫存剩余:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); // 消費后釋放canProduceCount,增加可以生產(chǎn)的數(shù)量 canProduceCount.release(); } } }
還是生產(chǎn)消費者:
public class SemaphoreTest { public static void main(String[] args) { // 用于多線程操作的庫存變量 final Stock stock = new Stock(); // 定義兩個生產(chǎn)者和兩個消費者 Thread dellProducer = new Thread(new Runnable() { @Override public void run() { while (true) { stock.put("Del"); } } }); Thread macProducer = new Thread(new Runnable() { @Override public void run() { while (true) { stock.put("Mac"); } } }); Thread consumer1 = new Thread(new Runnable() { @Override public void run() { while (true) { stock.get("zhangsan"); } } }); Thread consumer2 = new Thread(new Runnable() { @Override public void run() { while (true) { stock.get("李四"); } } }); dellProducer.start(); macProducer.start(); consumer1.start(); consumer2.start(); } }
運行結(jié)果圖:
第四種方式:BlockingQueue
BlockingQueue的put和take底層實現(xiàn)其實也是使用了第二種方式中的ReentrantLock+Condition,并且?guī)臀覀儗崿F(xiàn)了庫存隊列,方便簡潔
1、定義生產(chǎn)者
class Producer implements Runnable { // 庫存隊列 private BlockingQueuestock; // 生產(chǎn)/消費延遲 private int timeOut; private String name; public Producer(BlockingQueue stock, int timeout, String name) { this.stock = stock; this.timeOut = timeout; this.name = name; } @Override public void run() { while (true) { try { stock.put(name); System.out.println(name + " 正在生產(chǎn)數(shù)據(jù)" + " -- 庫存剩余:" + stock.size()); TimeUnit.MILLISECONDS.sleep(timeOut); } catch (InterruptedException e) { e.printStackTrace(); } } } }
2、定義消費者
class Consumer implements Runnable { // 庫存隊列 private BlockingQueuestock; private String consumerName; public Consumer(BlockingQueue stock, String name) { this.stock = stock; this.consumerName = name; } @Override public void run() { while (true) { try { // 從庫存消費一臺電腦 String takeName = stock.take(); System.out.println(consumerName + " 正在消費數(shù)據(jù):" + takeName + " -- 庫存剩余:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
3、定義庫存并運行
public static void main(String[] args) { // 定義最大庫存為10 BlockingQueuestock = new ArrayBlockingQueue<>(10); Thread p1 = new Thread(new Producer(stock, 500, "Mac")); Thread p2 = new Thread(new Producer(stock, 500, "Dell")); Thread c1 = new Thread(new Consumer(stock,"zhangsan")); Thread c2 = new Thread(new Consumer(stock, "李四")); p1.start(); p2.start(); c1.start(); c2.start(); }
運行結(jié)果圖:
感謝閱讀~歡迎指正和補充~~~
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76979.html
摘要:基本元素機制需要幾個元素來配合,分別是臨界區(qū)對象及鎖條件變量以及定義在對象上的,操作。這個外部條件在機制中稱為條件變量。提供的機制,其實是,等元素合作形成的,甚至說外部的條件變量也是個組成部分。 monitor的概念 管程,英文是 Monitor,也常被翻譯為監(jiān)視器,monitor 不管是翻譯為管程還是監(jiān)視器,都是比較晦澀的,通過翻譯后的中文,并無法對 monitor 達到一個直觀的描...
摘要:下面是線程相關(guān)的熱門面試題,你可以用它來好好準備面試。線程安全問題都是由全局變量及靜態(tài)變量引起的。持有自旋鎖的線程在之前應(yīng)該釋放自旋鎖以便其它線程可以獲得自旋鎖。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗及面試題,往往都是一大堆技術(shù)題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關(guān)線程的問題。Java語言一個重要的特點就是內(nèi)置了對并發(fā)的支持,讓Java大受企業(yè)和程序員...
摘要:但是單核我們還是要應(yīng)用多線程,就是為了防止阻塞。多線程可以防止這個問題,多條線程同時運行,哪怕一條線程的代碼執(zhí)行讀取數(shù)據(jù)阻塞,也不會影響其它任務(wù)的執(zhí)行。 1、多線程有什么用?一個可能在很多人看來很扯淡的一個問題:我會用多線程就好了,還管它有什么用?在我看來,這個回答更扯淡。所謂知其然知其所以然,會用只是知其然,為什么用才是知其所以然,只有達到知其然知其所以然的程度才可以說是把一個知識點...
摘要:方法可以將當(dāng)前線程放入等待集合中,并釋放當(dāng)前線程持有的鎖。此后,該線程不會接收到的調(diào)度,并進入休眠狀態(tài)。該線程會喚醒,并嘗試恢復(fù)之前的狀態(tài)。 并發(fā) 最近重新復(fù)習(xí)了一邊并發(fā)的知識,發(fā)現(xiàn)自己之前對于并發(fā)的了解只是皮毛。這里總結(jié)以下Java并發(fā)需要掌握的點。 使用并發(fā)的一個重要原因是提高執(zhí)行效率。由于I/O等情況阻塞,單個任務(wù)并不能充分利用CPU時間。所以在單處理器的機器上也應(yīng)該使用并發(fā)。為...
閱讀 3306·2021-09-02 15:41
閱讀 2839·2021-09-02 09:48
閱讀 1379·2019-08-29 13:27
閱讀 1169·2019-08-26 13:37
閱讀 844·2019-08-26 11:56
閱讀 2490·2019-08-26 10:24
閱讀 1651·2019-08-23 18:07
閱讀 2625·2019-08-23 15:16