摘要:所有示例代碼請見下載于基本概念并發(fā)同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內(nèi)存這些線程是同時存在的,每個線程都處于執(zhí)行過程中的某個狀態(tài),如果運行在多核處理器上此時,程序中的每個線程都
所有示例代碼,請見/下載于
https://github.com/Wasabi1234...
同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內(nèi)存,這些線程是同時“存在"的,每個線程都處于執(zhí)行過程中的某個狀態(tài),如果運行在多核處理器上,此時,程序中的每個線程都將分配到一個處理器核上,因此可以同時運行.
1.2 高并發(fā)( High Concurrency)互聯(lián)網(wǎng)分布式系統(tǒng)架構設計中必須考慮的因素之一,通常是指,通過設計保證系統(tǒng)能夠同時并行處理很多請求.
1.3 區(qū)別與聯(lián)系并發(fā): 多個線程操作相同的資源,保證線程安全,合理使用資源
高并發(fā):服務能同時處理很多請求,提高程序性能
2 CPU 2.1 CPU 多級緩存為什么需要CPU cache
CPU的頻率太快了,快到主存跟不上
如此,在處理器時鐘周期內(nèi),CPU常常需要等待主存,浪費資源。所以cache的出現(xiàn),是為了緩解CPU和內(nèi)存之間速度的不匹配問題(結構:cpu-> cache-> memory ).
CPU cache的意義
1) 時間局部性
如果某個數(shù)據(jù)被訪問,那么在不久的將來它很可能被再次訪問
2) 空間局部性
如果某個數(shù)據(jù)被訪問,那么與它相鄰的數(shù)據(jù)很快也可能被訪問
2.2 緩存一致性(MESI)用于保證多個 CPU cache 之間緩存共享數(shù)據(jù)的一致
M-modified被修改
該緩存行只被緩存在該 CPU 的緩存中,并且是被修改過的,與主存中數(shù)據(jù)是不一致的,需在未來某個時間點寫回主存,該時間是允許在其他CPU 讀取主存中相應的內(nèi)存之前,當這里的值被寫入主存之后,該緩存行狀態(tài)變?yōu)?E
E-exclusive獨享
緩存行只被緩存在該 CPU 的緩存中,未被修改過,與主存中數(shù)據(jù)一致
可在任何時刻當被其他 CPU讀取該內(nèi)存時變成 S 態(tài),被修改時變?yōu)?M態(tài)
S-shared共享
該緩存行可被多個 CPU 緩存,與主存中數(shù)據(jù)一致
I-invalid無效
亂序執(zhí)行優(yōu)化
處理器為提高運算速度而做出違背代碼原有順序的優(yōu)化
并發(fā)的優(yōu)勢與風險 3 項目準備 3.1 項目初始化
以上二者通常和線程池搭配
下面開始做并發(fā)模擬
package com.mmall.concurrency; import com.mmall.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng * @date 18/4/1 */ @Slf4j @NotThreadSafe public class ConcurrencyTest { /** * 請求總數(shù) */ public static int clientTotal = 5000; /** * 同時并發(fā)執(zhí)行的線程數(shù) */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量,給出允許并發(fā)的線程數(shù)目 final Semaphore semaphore = new Semaphore(threadTotal); //統(tǒng)計計數(shù)結果 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); //將請求放入線程池 for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { //信號量的獲取 semaphore.acquire(); add(); //釋放 semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); //關閉線程池 executorService.shutdown(); log.info("count:{}", count); } /** * 統(tǒng)計方法 */ private static void add() { count++; } }
運行發(fā)現(xiàn)結果隨機,所以非線程安全
4線程安全性 4.1 線程安全性當多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式或者這些進程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行為,那么就稱這個類是線程安全的
4.2 原子性 4.2.1 Atomic 包AtomicXXX:CAS,Unsafe.compareAndSwapInt
提供了互斥訪問,同一時刻只能有一個線程來對它進行操作
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; /** * @author shishusheng */ @Slf4j @ThreadSafe public class AtomicExample2 { /** * 請求總數(shù) */ public static int clientTotal = 5000; /** * 同時并發(fā)執(zhí)行的線程數(shù) */ public static int threadTotal = 200; /** * 工作內(nèi)存 */ public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { System.out.println(); semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); //主內(nèi)存 log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; /** * @author shishusheng * @date 18/4/3 */ @Slf4j @ThreadSafe public class AtomicExample4 { private static AtomicReferencecount = new AtomicReference<>(0); public static void main(String[] args) { // 2 count.compareAndSet(0, 2); // no count.compareAndSet(0, 1); // no count.compareAndSet(1, 3); // 4 count.compareAndSet(2, 4); // no count.compareAndSet(3, 5); log.info("count:{}", count.get()); } }
AtomicReference,AtomicReferenceFieldUpdater
AtomicBoolean
AtomicStampReference : CAS的 ABA 問題
4.2.2 鎖synchronized:依賴 JVM
修飾代碼塊:大括號括起來的代碼,作用于調(diào)用的對象
修飾方法: 整個方法,作用于調(diào)用的對象
修飾靜態(tài)方法:整個靜態(tài)方法,作用于所有對象
package com.mmall.concurrency.example.count; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j @ThreadSafe public class CountExample3 { /** * 請求總數(shù) */ public static int clientTotal = 5000; /** * 同時并發(fā)執(zhí)行的線程數(shù) */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private synchronized static void add() { count++; } }
synchronized 修正計數(shù)類方法
修飾類:括號括起來的部分,作用于所有對象
子類繼承父類的被 synchronized 修飾方法時,是沒有 synchronized 修飾的!!!
Lock: 依賴特殊的 CPU 指令,代碼實現(xiàn)
4.2.3 對比synchronized: 不可中斷鎖,適合競爭不激烈,可讀性好
Lock: 可中斷鎖,多樣化同步,競爭激烈時能維持常態(tài)
Atomic: 競爭激烈時能維持常態(tài),比Lock性能好; 只能同步一
個值
4.3 可見性一個線程對主內(nèi)存的修改可以及時的被其他線程觀察到
4.3.1 導致共享變量在線程間不可見的原因線程交叉執(zhí)行
重排序結合線程交叉執(zhí)行
共享變量更新后的值沒有在工作內(nèi)存與主存間及時更新
4.3.2 可見性之synchronizedJMM關于synchronized的規(guī)定
線程解鎖前,必須把共享變量的最新值刷新到主內(nèi)存
線程加鎖時,將清空工作內(nèi)存中共享變量的值,從而使
用共享變量時需要從主內(nèi)存中重新讀取最新的值(加鎖與解鎖是同一把鎖)
4.3.3 可見性之volatile通過加入內(nèi)存屏障和禁止重排序優(yōu)化來實現(xiàn)
對volatile變量寫操作時,會在寫操作后加入一條store
屏障指令,將本地內(nèi)存中的共享變量值刷新到主內(nèi)存
對volatile變量讀操作時,會在讀操作前加入一條load
屏障指令,從主內(nèi)存中讀取共享變量
volatile使用
volatile boolean inited = false; //線程1: context = loadContext(); inited= true; // 線程2: while( !inited ){ sleep(); } doSomethingWithConfig(context)4.4 有序性
一個線程觀察其他線程中的指令執(zhí)行順序,由于指令重排序的存在,該觀察結果一般雜亂無序
JMM允許編譯器和處理器對指令進行重排序,但是重排序過程不會影響到單線程程序的執(zhí)行,卻會影響到多線程并發(fā)執(zhí)行的正確性
4.4.1 happens-before 規(guī)則 5發(fā)布對象package com.mmall.concurrency.example.singleton; import com.mmall.concurrency.annoations.NotThreadSafe; /** * 懶漢模式 -》 雙重同步鎖單例模式 * 單例實例在第一次使用時進行創(chuàng)建 * @author shishusheng */ @NotThreadSafe public class SingletonExample4 { /** * 私有構造函數(shù) */ private SingletonExample4() { } // 1、memory = allocate() 分配對象的內(nèi)存空間 // 2、ctorInstance() 初始化對象 // 3、instance = memory 設置instance指向剛分配的內(nèi)存 // JVM和cpu優(yōu)化,發(fā)生了指令重排 // 1、memory = allocate() 分配對象的內(nèi)存空間 // 3、instance = memory 設置instance指向剛分配的內(nèi)存 // 2、ctorInstance() 初始化對象 /** * 單例對象 */ private static SingletonExample4 instance = null; /** * 靜態(tài)的工廠方法 * * @return */ public static SingletonExample4 getInstance() { // 雙重檢測機制 // B if (instance == null) { // 同步鎖 synchronized (SingletonExample4.class) { if (instance == null) { // A - 3 instance = new SingletonExample4(); } } } return instance; } }
使用Node實現(xiàn)FIFO隊列,可以用于構建鎖或者其他同步裝置的基礎框架
利用了一個int類型表示狀態(tài)
使用方法是繼承
子類通過繼承并通過實現(xiàn)它的方法管理其狀態(tài){acquire 和release} 的方法操縱狀態(tài)
可以同時實現(xiàn)排它鎖和共享鎖模式(獨占、共享)
同步組件
CountDownLatchpackage com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 指定時間內(nèi)處理任務 * * @author shishusheng * */ @Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }Semaphore用法
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { // 嘗試獲取一個許可 if (semaphore.tryAcquire()) { test(threadNum); // 釋放一個許可 semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }9 線程池 9.1 newCachedThreadPool 9.2 newFixedThreadPool 9.3 newSingleThreadExecutor
看出是順序執(zhí)行的
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72545.html
摘要:筆記來源并發(fā)編程與高并發(fā)解決方案并發(fā)基礎綜述多級緩存緩存一致性亂序執(zhí)行優(yōu)化內(nèi)存模型規(guī)定抽象結構同步八種操作及規(guī)則并發(fā)的優(yōu)勢與風險并發(fā)與高并發(fā)基本概念基本概念并發(fā)同時擁有兩個或者多個線程,如果程序在單核處理器上運行,多個線程將交替地換入或者換 筆記來源:【IMOOC】Java并發(fā)編程與高并發(fā)解決方案 并發(fā)基礎 綜述: CPU多級緩存:緩存一致性、亂序執(zhí)行優(yōu)化 Java內(nèi)存模型:JM...
摘要:資源獲取方式根據(jù)下面的索引,大家可以選擇自己需要的資源,然后在松哥公眾號牧碼小子后臺回復對應的口令,就可以獲取到資源的百度云盤下載地址。公眾號二維碼如下另外本文會定期更新,松哥有新資源的時候會及時分享給大家,歡迎各位小伙伴保持關注。 沒有一條路是容易的,特別是轉(zhuǎn)行計算機這條路。 松哥接觸過很多轉(zhuǎn)行做開發(fā)的小伙伴,我了解到很多轉(zhuǎn)行人的不容易,記得松哥大二時剛剛決定轉(zhuǎn)行計算機,完全不知道這...
摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術專家我看過哪些技術類書籍。 大家好,我是...
摘要:并發(fā)表示在一段時間內(nèi)有多個動作存在。并發(fā)帶來的問題在享受并發(fā)編程帶來的高性能高吞吐量的同時,也會因為并發(fā)編程帶來一些意想不到弊端。并發(fā)過程中多線程之間的切換調(diào)度,上下文的保存恢復等都會帶來額外的線程切換開銷。 0x01 什么是并發(fā) 要理解并發(fā)首選我們來區(qū)分下并發(fā)和并行的概念。 并發(fā):表示在一段時間內(nèi)有多個動作存在。 并行:表示在同一時間點有多個動作同時存在。 例如:此刻我正在寫博客,但...
摘要:另一個是使用鎖的機制來處理線程之間的原子性。依賴于去實現(xiàn)鎖,因此在這個關鍵字作用對象的作用范圍內(nèi),都是同一時刻只能有一個線程對其進行操作的。 線程安全性 定義:當多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行為,那么就稱這個類是線程安全的。 線程安全性主要體現(xiàn)在三個方面:原子性、可見性...
閱讀 1216·2019-08-30 15:55
閱讀 964·2019-08-30 15:55
閱讀 2161·2019-08-30 15:44
閱讀 2895·2019-08-29 14:17
閱讀 1140·2019-08-29 12:45
閱讀 3316·2019-08-26 10:48
閱讀 3142·2019-08-23 18:18
閱讀 2613·2019-08-23 16:47