摘要:我們繼續(xù)看代碼的意思是這個(gè)是一段內(nèi)嵌匯編代碼。也就是在語言中使用匯編代碼。就是匯編版的比較并交換。就是保證在多線程情況下,不阻塞線程的填充和消費(fèi)。微觀上看匯編的是實(shí)現(xiàn)操作系統(tǒng)級(jí)別的原子操作的基石。
原文地址:https://www.xilidou.com/2018/02/01/java-cas/
CAS 是現(xiàn)代操作系統(tǒng),解決并發(fā)問題的一個(gè)重要手段,最近在看 eureka 的源碼的時(shí)候。遇到了很多 CAS 的操作。今天就系統(tǒng)的回顧一下 Java 中的CAS。
閱讀這篇文章你將會(huì)了解到:
什么是 CAS
CAS 實(shí)現(xiàn)原理是什么?
CAS 在現(xiàn)實(shí)中的應(yīng)用
自旋鎖
原子類型
限流器
CAS 的缺點(diǎn)
什么是 CASCAS: 全稱Compare and swap,字面意思:”比較并交換“,一個(gè) CAS 涉及到以下操作:
我們假設(shè)內(nèi)存中的原數(shù)據(jù)V,舊的預(yù)期值A(chǔ),需要修改的新值B。
比較 A 與 V 是否相等。(比較)
如果比較相等,將 B 寫入 V。(交換)
返回操作是否成功。
當(dāng)多個(gè)線程同時(shí)對(duì)某個(gè)資源進(jìn)行CAS操作,只能有一個(gè)線程操作成功,但是并不會(huì)阻塞其他線程,其他線程只會(huì)收到操作失敗的信號(hào)??梢?CAS 其實(shí)是一個(gè)樂觀鎖。
CAS 是怎么實(shí)現(xiàn)的跟隨AtomInteger的代碼我們一路往下,就能發(fā)現(xiàn)最終調(diào)用的是 sum.misc.Unsafe 這個(gè)類??疵Q Unsafe 就是一個(gè)不安全的類,這個(gè)類是利用了 Java 的類和包在可見性的的規(guī)則中的一個(gè)恰到好處處的漏洞。Unsafe 這個(gè)類為了速度,在Java的安全標(biāo)準(zhǔn)上做出了一定的妥協(xié)。
再往下尋找我們發(fā)現(xiàn) Unsafe的compareAndSwapInt 是 Native 的方法:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
也就是說,這幾個(gè) CAS 的方法應(yīng)該是使用了本地的方法。所以這幾個(gè)方法的具體實(shí)現(xiàn)需要我們自己去 jdk 的源碼中搜索。
于是我下載一個(gè) OpenJdk 的源碼繼續(xù)向下探索,我們發(fā)現(xiàn)在 /jdk9u/hotspot/src/share/vm/unsafe.cpp 中有這樣的代碼:
{CC "compareAndSetInt", CC "(" OBJ "J""I""I"")Z", FN_PTR(Unsafe_CompareAndSetInt)},
這個(gè)涉及到,JNI 的調(diào)用,感興趣的同學(xué)可以自行學(xué)習(xí)。我們搜索 Unsafe_CompareAndSetInt后發(fā)現(xiàn):
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) { oop p = JNIHandles::resolve(obj); jint* addr = (jint *)index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; } UNSAFE_END
最終我們終于看到了核心代碼 Atomic::cmpxchg。
繼續(xù)向底層探索,在文件java/jdk9u/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.hpp有這樣的代碼:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value, cmpxchg_memory_order order) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }
我們通過文件名可以知道,針對(duì)不同的操作系統(tǒng),JVM 對(duì)于 Atomic::cmpxchg 應(yīng)該有不同的實(shí)現(xiàn)。由于我們服務(wù)基本都是使用的是64位linux,所以我們就看看linux_x86 的實(shí)現(xiàn)。
我們繼續(xù)看代碼:
__asm__ 的意思是這個(gè)是一段內(nèi)嵌匯編代碼。也就是在 C 語言中使用匯編代碼。
這里的 volatile和 JAVA 有一點(diǎn)類似,但不是為了內(nèi)存的可見性,而是告訴編譯器對(duì)訪問該變量的代碼就不再進(jìn)行優(yōu)化。
LOCK_IF_MP(%4) 的意思就比較簡單,就是如果操作系統(tǒng)是多核的,那就增加一個(gè) LOCK。
cmpxchgl 就是匯編版的“比較并交換”。但是我們知道比較并交換,有三個(gè)步驟,不是原子的。所以在多核情況下加一個(gè) LOCK,由CPU硬件保證他的原子性。
我們?cè)倏纯?LOCK 是怎么實(shí)現(xiàn)的呢?我們?nèi)ntel的官網(wǎng)上看看,可以知道LOCK在的早期實(shí)現(xiàn)是直接將 cup 的總線阻塞,這樣的實(shí)現(xiàn)可見效率是很低下的。后來優(yōu)化為X86 cpu 有鎖定一個(gè)特定內(nèi)存地址的能力,當(dāng)這個(gè)特定內(nèi)存地址被鎖定后,它就可以阻止其他的系統(tǒng)總線讀取或修改這個(gè)內(nèi)存地址。
關(guān)于 CAS 的底層探索我們就到此為止。我們總結(jié)一下 JAVA 的 cas 是怎么實(shí)現(xiàn)的:
java 的 cas 利用的的是 unsafe 這個(gè)類提供的 cas 操作。
unsafe 的cas 依賴了的是 jvm 針對(duì)不同的操作系統(tǒng)實(shí)現(xiàn)的 Atomic::cmpxchg
Atomic::cmpxchg 的實(shí)現(xiàn)使用了匯編的 cas 操作,并使用 cpu 硬件提供的 lock信號(hào)保證其原子性
CAS 的應(yīng)用了解了 CAS 的原理我們繼續(xù)就看看 CAS 的應(yīng)用:
自旋鎖public class SpinLock { private AtomicReferencesign =new AtomicReference<>(); public void lock(){ Thread current = Thread.currentThread(); while(!sign .compareAndSet(null, current)){ } } public void unlock (){ Thread current = Thread.currentThread(); sign .compareAndSet(current, null); } }
所謂自旋鎖,我覺得這個(gè)名字相當(dāng)?shù)男蜗?,在lock()的時(shí)候,一直while()循環(huán),直到 cas 操作成功為止。
AtomicInteger 的 incrementAndGet()public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
與自旋鎖有異曲同工之妙,就是一直while,直到操作成功為止。
令牌桶限流器所謂令牌桶限流器,就是系統(tǒng)以恒定的速度向桶內(nèi)增加令牌。每次請(qǐng)求前從令牌桶里面獲取令牌。如果獲取到令牌就才可以進(jìn)行訪問。當(dāng)令牌桶內(nèi)沒有令牌的時(shí)候,拒絕提供服務(wù)。我們來看看 eureka 的限流器是如何使用 CAS 來維護(hù)多線程環(huán)境下對(duì) token 的增加和分發(fā)的。
public class RateLimiter { private final long rateToMsConversion; private final AtomicInteger consumedTokens = new AtomicInteger(); private final AtomicLong lastRefillTime = new AtomicLong(0); @Deprecated public RateLimiter() { this(TimeUnit.SECONDS); } public RateLimiter(TimeUnit averageRateUnit) { switch (averageRateUnit) { case SECONDS: rateToMsConversion = 1000; break; case MINUTES: rateToMsConversion = 60 * 1000; break; default: throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported"); } } //提供給外界獲取 token 的方法 public boolean acquire(int burstSize, long averageRate) { return acquire(burstSize, averageRate, System.currentTimeMillis()); } public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) { if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go return true; } //添加token refillToken(burstSize, averageRate, currentTimeMillis); //消費(fèi)token return consumeToken(burstSize); } private void refillToken(int burstSize, long averageRate, long currentTimeMillis) { long refillTime = lastRefillTime.get(); long timeDelta = currentTimeMillis - refillTime; //根據(jù)頻率計(jì)算需要增加多少 token long newTokens = timeDelta * averageRate / rateToMsConversion; if (newTokens > 0) { long newRefillTime = refillTime == 0 ? currentTimeMillis : refillTime + newTokens * rateToMsConversion / averageRate; // CAS 保證有且僅有一個(gè)線程進(jìn)入填充 if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) { while (true) { int currentLevel = consumedTokens.get(); int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased int newLevel = (int) Math.max(0, adjustedLevel - newTokens); // while true 直到更新成功為止 if (consumedTokens.compareAndSet(currentLevel, newLevel)) { return; } } } } } private boolean consumeToken(int burstSize) { while (true) { int currentLevel = consumedTokens.get(); if (currentLevel >= burstSize) { return false; } // while true 直到?jīng)]有token 或者 獲取到為止 if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) { return true; } } } public void reset() { consumedTokens.set(0); lastRefillTime.set(0); } }
所以梳理一下 CAS 在令牌桶限流器的作用。就是保證在多線程情況下,不阻塞線程的填充token 和消費(fèi)token。
歸納通過上面的三個(gè)應(yīng)用我們歸納一下 CAS 的應(yīng)用場(chǎng)景:
CAS 的使用能夠避免線程的阻塞。
多數(shù)情況下我們使用的是 while true 直到成功為止。
CAS 缺點(diǎn)ABA 的問題,就是一個(gè)值從A變成了B又變成了A,使用CAS操作不能發(fā)現(xiàn)這個(gè)值發(fā)生變化了,處理方式是可以使用攜帶類似時(shí)間戳的版本AtomicStampedReference
性能問題,我們使用時(shí)大部分時(shí)間使用的是 while true 方式對(duì)數(shù)據(jù)的修改,直到成功為止。優(yōu)勢(shì)就是相應(yīng)極快,但當(dāng)線程數(shù)不停增加時(shí),性能下降明顯,因?yàn)槊總€(gè)線程都需要執(zhí)行,占用CPU時(shí)間。
總結(jié)CAS 是整個(gè)編程重要的思想之一。整個(gè)計(jì)算機(jī)的實(shí)現(xiàn)中都有CAS的身影。微觀上看匯編的 CAS 是實(shí)現(xiàn)操作系統(tǒng)級(jí)別的原子操作的基石。從編程語言角度來看 CAS 是實(shí)現(xiàn)多線程非阻塞操作的基石。宏觀上看,在分布式系統(tǒng)中,我們可以使用 CAS 的思想利用類似Redis的外部存儲(chǔ),也能實(shí)現(xiàn)一個(gè)分布式鎖。
從某個(gè)角度來說架構(gòu)就將微觀的實(shí)現(xiàn)放大,或者底層思想就是將宏觀的架構(gòu)進(jìn)行微縮。計(jì)算機(jī)的思想是想通的,所以說了解底層的實(shí)現(xiàn)可以提升架構(gòu)能力,提升架構(gòu)的能力同樣可加深對(duì)底層實(shí)現(xiàn)的理解。計(jì)算機(jī)知識(shí)浩如煙海,但是套路有限。抓住基礎(chǔ)的幾個(gè)套路突破,從思想和思維的角度學(xué)習(xí)計(jì)算機(jī)知識(shí)。不要將自己的精力花費(fèi)在不停的追求新技術(shù)的腳步上,跟隨‘start guide line’只能寫一個(gè)demo,所得也就是一個(gè)demo而已。
停下腳步,回顧基礎(chǔ)和經(jīng)典或許對(duì)于技術(shù)的提升更大一些。
希望這篇文章對(duì)大家有所幫助。
徒手?jǐn)]框架系列文章地址:
徒手?jǐn)]框架--高并發(fā)環(huán)境下的請(qǐng)求合并
徒手?jǐn)]框架--實(shí)現(xiàn)IoC
徒手?jǐn)]框架--實(shí)現(xiàn)Aop
歡迎關(guān)注我的微信公眾號(hào)
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68438.html
摘要:方法由兩個(gè)參數(shù),表示期望的值,表示要給設(shè)置的新值。操作包含三個(gè)操作數(shù)內(nèi)存位置預(yù)期原值和新值。如果處的值尚未同時(shí)更改,則操作成功。中就使用了這樣的操作。上面操作還有一點(diǎn)是將事務(wù)范圍縮小了,也提升了系統(tǒng)并發(fā)處理的性能。 這是java高并發(fā)系列第21篇文章。 本文主要內(nèi)容 從網(wǎng)站計(jì)數(shù)器實(shí)現(xiàn)中一步步引出CAS操作 介紹java中的CAS及CAS可能存在的問題 悲觀鎖和樂觀鎖的一些介紹及數(shù)據(jù)庫...
摘要:前言學(xué)習(xí)情況記錄時(shí)間子目標(biāo)多線程記錄在學(xué)習(xí)線程安全知識(shí)點(diǎn)中,關(guān)于的有關(guān)知識(shí)點(diǎn)。對(duì)于資源競(jìng)爭(zhēng)嚴(yán)重線程沖突嚴(yán)重的情況,自旋的概率會(huì)比較大,從而浪費(fèi)更多的資源,效率低于。 前言 學(xué)習(xí)情況記錄 時(shí)間:week 1 SMART子目標(biāo) :Java 多線程 記錄在學(xué)習(xí)線程安全知識(shí)點(diǎn)中,關(guān)于CAS的有關(guān)知識(shí)點(diǎn)。 線程安全是指:多個(gè)線程不管以何種方式訪問某個(gè)類,并且在主調(diào)代碼中不需要進(jìn)行同步,都能表...
摘要:該類將整數(shù)值與引用關(guān)聯(lián)起來,可用于原子的更數(shù)據(jù)和數(shù)據(jù)的版本號(hào)。 CAS的全稱為Compare And Swap,直譯就是比較交換。是一條CPU的原子指令,其作用是讓CPU先進(jìn)行比較兩個(gè)值是否相等,然后原子地更新某個(gè)位置的值,其實(shí)現(xiàn)方式是基于硬件平臺(tái)的匯編指令,在intel的CPU中,使用的是cmpxchg指令,就是說CAS是靠硬件實(shí)現(xiàn)的,從而在硬件層面提升效率。 CSA 原理 利用CP...
摘要:現(xiàn)在兩個(gè)核心同時(shí)執(zhí)行該條指令。至于這樣做的原因可以參考知乎的一個(gè)回答比較并交換。那么表示內(nèi)存地址為的內(nèi)存單元這一條指令的意思就是,將寄存器中的值與雙字內(nèi)存單元中的值進(jìn)行對(duì)比,如果相同,則將寄存器中的值存入內(nèi)存單元中。 1.簡介 CAS 全稱是 compare and swap,是一種用于在多線程環(huán)境下實(shí)現(xiàn)同步功能的機(jī)制。CAS 操作包含三個(gè)操作數(shù) -- 內(nèi)存位置、預(yù)期數(shù)值和新值。CAS...
摘要:算法算法會(huì)先對(duì)一個(gè)內(nèi)存變量位置和一個(gè)給定的值進(jìn)行比較,如果相等,則用一個(gè)新值去修改這個(gè)內(nèi)存變量位置。因?yàn)槭欠枪芥i,所以一上來就嘗試搶占鎖給定舊值并希望用新值去更新內(nèi)存變量。 本文翻譯和原創(chuàng)各占一半,所以還是厚顏無恥歸類到原創(chuàng)好了...https://howtodoinjava.com/jav...java 5 其中一個(gè)令人振奮的改進(jìn)是新增了支持原子操作的類型,例如 AtomicInt...
閱讀 893·2021-11-15 11:38
閱讀 2526·2021-09-08 09:45
閱讀 2828·2021-09-04 16:48
閱讀 2574·2019-08-30 15:54
閱讀 940·2019-08-30 13:57
閱讀 1629·2019-08-29 15:39
閱讀 506·2019-08-29 12:46
閱讀 3530·2019-08-26 13:39