成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

長文慎入-探索Java并發(fā)編程與高并發(fā)解決方案

SimpleTriangle / 2060人閱讀

摘要:所有示例代碼請見下載于基本概念并發(fā)同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內(nèi)存這些線程是同時存在的,每個線程都處于執(zhí)行過程中的某個狀態(tài),如果運行在多核處理器上此時,程序中的每個線程都

所有示例代碼,請見/下載于

https://github.com/Wasabi1234...



1 基本概念 1.1 并發(fā)

同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內(nèi)存,這些線程是同時“存在"的,每個線程都處于執(zhí)行過程中的某個狀態(tài),如果運行在多核處理器上,此時,程序中的每個線程都將分配到一個處理器核上,因此可以同時運行.

1.2 高并發(fā)( High Concurrency)

互聯(lián)網(wǎng)分布式系統(tǒng)架構設計中必須考慮的因素之一,通常是指,通過設計保證系統(tǒng)能夠同時并行處理很多請求.

1.3 區(qū)別與聯(lián)系

并發(fā): 多個線程操作相同的資源,保證線程安全,合理使用資源

高并發(fā):服務能同時處理很多請求,提高程序性能

2 CPU 2.1 CPU 多級緩存

為什么需要CPU cache

CPU的頻率太快了,快到主存跟不上
如此,在處理器時鐘周期內(nèi),CPU常常需要等待主存,浪費資源。所以cache的出現(xiàn),是為了緩解CPU和內(nèi)存之間速度的不匹配問題(結構:cpu-> cache-> memory ).

CPU cache的意義
1) 時間局部性
如果某個數(shù)據(jù)被訪問,那么在不久的將來它很可能被再次訪問
2) 空間局部性

如果某個數(shù)據(jù)被訪問,那么與它相鄰的數(shù)據(jù)很快也可能被訪問

2.2 緩存一致性(MESI)

用于保證多個 CPU cache 之間緩存共享數(shù)據(jù)的一致

M-modified被修改

該緩存行只被緩存在該 CPU 的緩存中,并且是被修改過的,與主存中數(shù)據(jù)是不一致的,需在未來某個時間點寫回主存,該時間是允許在其他CPU 讀取主存中相應的內(nèi)存之前,當這里的值被寫入主存之后,該緩存行狀態(tài)變?yōu)?E

E-exclusive獨享

緩存行只被緩存在該 CPU 的緩存中,未被修改過,與主存中數(shù)據(jù)一致
可在任何時刻當被其他 CPU讀取該內(nèi)存時變成 S 態(tài),被修改時變?yōu)?M態(tài)

S-shared共享

該緩存行可被多個 CPU 緩存,與主存中數(shù)據(jù)一致

I-invalid無效

亂序執(zhí)行優(yōu)化

處理器為提高運算速度而做出違背代碼原有順序的優(yōu)化

并發(fā)的優(yōu)勢與風險

3 項目準備 3.1 項目初始化



3.2 并發(fā)模擬-Jmeter壓測





3.3 并發(fā)模擬-代碼 CountDownLatch

Semaphore(信號量)


以上二者通常和線程池搭配

下面開始做并發(fā)模擬

package com.mmall.concurrency;

import com.mmall.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @author shishusheng
 * @date 18/4/1
 */
@Slf4j
@NotThreadSafe
public class ConcurrencyTest {

    /**
     * 請求總數(shù)
     */
    public static int clientTotal = 5000;

    /**
     * 同時并發(fā)執(zhí)行的線程數(shù)
     */
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量,給出允許并發(fā)的線程數(shù)目
        final Semaphore semaphore = new Semaphore(threadTotal);
        //統(tǒng)計計數(shù)結果
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        //將請求放入線程池
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    //信號量的獲取
                    semaphore.acquire();
                    add();
                    //釋放
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        //關閉線程池
        executorService.shutdown();
        log.info("count:{}", count);
    }

    /**
     * 統(tǒng)計方法
     */
    private static void add() {
        count++;
    }
}

運行發(fā)現(xiàn)結果隨機,所以非線程安全

4線程安全性 4.1 線程安全性

當多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式或者這些進程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行為,那么就稱這個類是線程安全的

4.2 原子性 4.2.1 Atomic 包

AtomicXXX:CAS,Unsafe.compareAndSwapInt

提供了互斥訪問,同一時刻只能有一個線程來對它進行操作

package com.mmall.concurrency.example.atomic;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author shishusheng
 */
@Slf4j
@ThreadSafe
public class AtomicExample2 {

    /**
     * 請求總數(shù)
     */
    public static int clientTotal = 5000;

    /**
     * 同時并發(fā)執(zhí)行的線程數(shù)
     */
    public static int threadTotal = 200;

    /**
     * 工作內(nèi)存
     */
    public static AtomicLong count = new AtomicLong(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println();
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        //主內(nèi)存
        log.info("count:{}", count.get());
    }
    
    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}
package com.mmall.concurrency.example.atomic;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author shishusheng
 * @date 18/4/3
 */
@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference count = new AtomicReference<>(0);

    public static void main(String[] args) {
        // 2
        count.compareAndSet(0, 2);
        // no
        count.compareAndSet(0, 1);
        // no
        count.compareAndSet(1, 3);
        // 4
        count.compareAndSet(2, 4);
        // no
        count.compareAndSet(3, 5); 
        log.info("count:{}", count.get());
    }
}

AtomicReference,AtomicReferenceFieldUpdater

AtomicBoolean

AtomicStampReference : CAS的 ABA 問題

4.2.2 鎖

synchronized:依賴 JVM

修飾代碼塊:大括號括起來的代碼,作用于調(diào)用的對象

修飾方法: 整個方法,作用于調(diào)用的對象

修飾靜態(tài)方法:整個靜態(tài)方法,作用于所有對象

package com.mmall.concurrency.example.count;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @author shishusheng
 */
@Slf4j
@ThreadSafe
public class CountExample3 {

    /**
     * 請求總數(shù)
     */
    public static int clientTotal = 5000;

    /**
     * 同時并發(fā)執(zhí)行的線程數(shù)
     */
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private synchronized static void add() {
        count++;
    }
}

synchronized 修正計數(shù)類方法

修飾類:括號括起來的部分,作用于所有對象

子類繼承父類的被 synchronized 修飾方法時,是沒有 synchronized 修飾的!!!

Lock: 依賴特殊的 CPU 指令,代碼實現(xiàn)

4.2.3 對比

synchronized: 不可中斷鎖,適合競爭不激烈,可讀性好

Lock: 可中斷鎖,多樣化同步,競爭激烈時能維持常態(tài)

Atomic: 競爭激烈時能維持常態(tài),比Lock性能好; 只能同步一

個值

4.3 可見性

一個線程對主內(nèi)存的修改可以及時的被其他線程觀察到

4.3.1 導致共享變量在線程間不可見的原因

線程交叉執(zhí)行

重排序結合線程交叉執(zhí)行

共享變量更新后的值沒有在工作內(nèi)存與主存間及時更新

4.3.2 可見性之synchronized

JMM關于synchronized的規(guī)定

線程解鎖前,必須把共享變量的最新值刷新到主內(nèi)存

線程加鎖時,將清空工作內(nèi)存中共享變量的值,從而使

用共享變量時需要從主內(nèi)存中重新讀取最新的值(加鎖與解鎖是同一把鎖)

4.3.3 可見性之volatile

通過加入內(nèi)存屏障和禁止重排序優(yōu)化來實現(xiàn)

對volatile變量寫操作時,會在寫操作后加入一條store

屏障指令,將本地內(nèi)存中的共享變量值刷新到主內(nèi)存

對volatile變量讀操作時,會在讀操作前加入一條load

屏障指令,從主內(nèi)存中讀取共享變量


volatile使用

volatile boolean inited = false;

//線程1:
context = loadContext();
inited= true;

// 線程2:
while( !inited ){
    sleep();
}
doSomethingWithConfig(context)
4.4 有序性

一個線程觀察其他線程中的指令執(zhí)行順序,由于指令重排序的存在,該觀察結果一般雜亂無序

JMM允許編譯器和處理器對指令進行重排序,但是重排序過程不會影響到單線程程序的執(zhí)行,卻會影響到多線程并發(fā)執(zhí)行的正確性

4.4.1 happens-before 規(guī)則 5發(fā)布對象



5.1 安全發(fā)布對象




package com.mmall.concurrency.example.singleton;

import com.mmall.concurrency.annoations.NotThreadSafe;

/**
 * 懶漢模式 -》 雙重同步鎖單例模式
 * 單例實例在第一次使用時進行創(chuàng)建
 * @author shishusheng
 */
@NotThreadSafe
public class SingletonExample4 {

    /**
     * 私有構造函數(shù)
     */
    private SingletonExample4() {

    }

    // 1、memory = allocate() 分配對象的內(nèi)存空間
    // 2、ctorInstance() 初始化對象
    // 3、instance = memory 設置instance指向剛分配的內(nèi)存

    // JVM和cpu優(yōu)化,發(fā)生了指令重排

    // 1、memory = allocate() 分配對象的內(nèi)存空間
    // 3、instance = memory 設置instance指向剛分配的內(nèi)存
    // 2、ctorInstance() 初始化對象

    /**
     * 單例對象
     */
    private static SingletonExample4 instance = null;

    /**
     * 靜態(tài)的工廠方法
     *
     * @return
     */
    public static SingletonExample4 getInstance() {
        // 雙重檢測機制 // B
        if (instance == null) {        
            // 同步鎖
            synchronized (SingletonExample4.class) { 
                if (instance == null) {
                    // A - 3
                    instance = new SingletonExample4(); 
                }
            }
        }
        return instance;
    }
}


7 AQS 7.1 介紹

使用Node實現(xiàn)FIFO隊列,可以用于構建鎖或者其他同步裝置的基礎框架

利用了一個int類型表示狀態(tài)

使用方法是繼承

子類通過繼承并通過實現(xiàn)它的方法管理其狀態(tài){acquire 和release} 的方法操縱狀態(tài)

可以同時實現(xiàn)排它鎖和共享鎖模式(獨占、共享)

同步組件

CountDownLatch
package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author shishusheng
 */
@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* 指定時間內(nèi)處理任務
* 
* @author shishusheng 
* 
*/
@Slf4j
public class CountDownLatchExample2 {

   private final static int threadCount = 200;

   public static void main(String[] args) throws Exception {

       ExecutorService exec = Executors.newCachedThreadPool();

       final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   test(threadNum);
               } catch (Exception e) {
                   log.error("exception", e);
               } finally {
                   countDownLatch.countDown();
               }
           });
       }
       countDownLatch.await(10, TimeUnit.MILLISECONDS);
       log.info("finish");
       exec.shutdown();
   }

   private static void test(int threadNum) throws Exception {
       Thread.sleep(100);
       log.info("{}", threadNum);
   }
}
Semaphore用法



CycliBarrier
package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author shishusheng
 */
@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author shishusheng
 */
@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum);
    }
}

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * @author shishusheng
 */
@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    // 嘗試獲取一個許可
                    if (semaphore.tryAcquire()) {
                        test(threadNum);
                        // 釋放一個許可
                        semaphore.release();
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }


}
9 線程池 9.1 newCachedThreadPool

9.2 newFixedThreadPool

9.3 newSingleThreadExecutor

看出是順序執(zhí)行的

9.4 newScheduledThreadPool


10 死鎖


文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72545.html

相關文章

  • Java并發(fā)Java并發(fā)編程與高并發(fā)基礎概念

    摘要:筆記來源并發(fā)編程與高并發(fā)解決方案并發(fā)基礎綜述多級緩存緩存一致性亂序執(zhí)行優(yōu)化內(nèi)存模型規(guī)定抽象結構同步八種操作及規(guī)則并發(fā)的優(yōu)勢與風險并發(fā)與高并發(fā)基本概念基本概念并發(fā)同時擁有兩個或者多個線程,如果程序在單核處理器上運行,多個線程將交替地換入或者換 筆記來源:【IMOOC】Java并發(fā)編程與高并發(fā)解決方案 并發(fā)基礎 綜述: CPU多級緩存:緩存一致性、亂序執(zhí)行優(yōu)化 Java內(nèi)存模型:JM...

    stackfing 評論0 收藏0
  • 做IT這幾年,我整理了這些干貨想要送給你!

    摘要:資源獲取方式根據(jù)下面的索引,大家可以選擇自己需要的資源,然后在松哥公眾號牧碼小子后臺回復對應的口令,就可以獲取到資源的百度云盤下載地址。公眾號二維碼如下另外本文會定期更新,松哥有新資源的時候會及時分享給大家,歡迎各位小伙伴保持關注。 沒有一條路是容易的,特別是轉(zhuǎn)行計算機這條路。 松哥接觸過很多轉(zhuǎn)行做開發(fā)的小伙伴,我了解到很多轉(zhuǎn)行人的不容易,記得松哥大二時剛剛決定轉(zhuǎn)行計算機,完全不知道這...

    王晗 評論0 收藏0
  • 從小白程序員一路晉升為大廠高級技術專家我看過哪些書籍?(建議收藏)

    摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術專家我看過哪些技術類書籍。 大家好,我是...

    sf_wangchong 評論0 收藏0
  • 并發(fā)編程 - 探索

    摘要:并發(fā)表示在一段時間內(nèi)有多個動作存在。并發(fā)帶來的問題在享受并發(fā)編程帶來的高性能高吞吐量的同時,也會因為并發(fā)編程帶來一些意想不到弊端。并發(fā)過程中多線程之間的切換調(diào)度,上下文的保存恢復等都會帶來額外的線程切換開銷。 0x01 什么是并發(fā) 要理解并發(fā)首選我們來區(qū)分下并發(fā)和并行的概念。 并發(fā):表示在一段時間內(nèi)有多個動作存在。 并行:表示在同一時間點有多個動作同時存在。 例如:此刻我正在寫博客,但...

    pcChao 評論0 收藏0
  • Java并發(fā)】線程安全性

    摘要:另一個是使用鎖的機制來處理線程之間的原子性。依賴于去實現(xiàn)鎖,因此在這個關鍵字作用對象的作用范圍內(nèi),都是同一時刻只能有一個線程對其進行操作的。 線程安全性 定義:當多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行為,那么就稱這個類是線程安全的。 線程安全性主要體現(xiàn)在三個方面:原子性、可見性...

    劉玉平 評論0 收藏0

發(fā)表評論

0條評論

SimpleTriangle

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<