摘要:執(zhí)行將異常后的刪除。根據(jù)元素個(gè)數(shù)上限進(jìn)行清理的策略思路在新緩存值的時(shí)候比對(duì)下是否緩存容量元素個(gè)數(shù)已經(jīng)達(dá)到上限,如果達(dá)到上限按照算法進(jìn)行淘汰元素。在讀寫完成后會(huì)進(jìn)行通知源碼會(huì)回調(diào)進(jìn)行緩存元素刪除后置處理。
Google Guava LocalLoadingCache 前言
在我們編程的過(guò)程中會(huì)遇到一些在程序中需要重試使用的數(shù)據(jù),在這種情況下我們就可以考慮利用緩存(內(nèi)存)的優(yōu)勢(shì)來(lái)提供程序訪問(wèn)這些數(shù)據(jù)的一個(gè)性能了。利用了緩存可以在一定程度上緩解很大的性能消耗:
網(wǎng)絡(luò)傳輸開(kāi)銷
數(shù)據(jù)序列化反序列話
數(shù)據(jù)庫(kù)、文件系統(tǒng)數(shù)據(jù)訪問(wèn)慢
緩存器是利用內(nèi)存進(jìn)行數(shù)據(jù)存儲(chǔ)的,在存儲(chǔ)容量上有一定的限制,所以我們?cè)谖覀兪褂镁彺娴臅r(shí)候也分兩種場(chǎng)景:
全量數(shù)據(jù)緩存
緩存熱數(shù)據(jù),這也是基于緩存容量的一個(gè)考慮
好了本篇我們就來(lái)聊聊寫程序過(guò)程中常能用到的本地緩存的方式。
JDK提供的數(shù)據(jù)結(jié)構(gòu)(Map)緩存數(shù)據(jù)的存儲(chǔ)格式一般都是以Key-Value的方式,那這里我們主要來(lái)討論下Map的實(shí)現(xiàn)ConcurrentHashMap實(shí)現(xiàn)的緩存。
String key = StringUtils.EMPTY; ConcurrentMaplocalCache = new ConcurrentHashMap(); if(StringUtils.isEmpty(localCache.get(key))) { String value = queryFromDB(key); localCache.put(key,value); return value; } return localCache.get(key);
這樣就能構(gòu)造一個(gè)非常簡(jiǎn)單的緩存。
注意:這個(gè)緩存還是有非常多的問(wèn)題
沒(méi)有一個(gè)清除緩存的策略,最終所有被訪問(wèn)過(guò)得數(shù)據(jù)都會(huì)全量給緩存起來(lái),直到顯式清除。
同時(shí)緩存沒(méi)命中的情況下需要應(yīng)用顯式去加載(queryFromDB )。
LocalLoadingCache好了主角要登場(chǎng)了,先簡(jiǎn)單介紹下這個(gè)cache的一些用法,這個(gè)cache比較好的解決了我上面提到通過(guò)Map用作緩存的兩個(gè)缺陷。
用法LoadingCachegraphs = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .removalListener(MY_LISTENER) .build( new CacheLoader () { public Graph load(Key key) throws AnyException { return createExpensiveGraph(key); } });
通過(guò)這種方式一個(gè)緩存就已經(jīng)創(chuàng)建好了,上面定義的load函數(shù)在緩存中不存在key對(duì)應(yīng)的value的時(shí)候會(huì)去執(zhí)行將數(shù)據(jù)load放到緩存中。
其底層存儲(chǔ)采用基于數(shù)組的java.util.concurrent.atomic.AtomicReferenceArray進(jìn)行緩存元素的存取。
load如何被加載先分析下load函數(shù)是怎么被執(zhí)行的:graphs.getUnchecked(new Key());從緩存中獲取數(shù)據(jù),如果沒(méi)有進(jìn)行put操作,首次get的時(shí)候緩存中沒(méi)有其緩存值,這個(gè)時(shí)候必然要觸發(fā)load函數(shù)進(jìn)行value load了,那我們就從get函數(shù)進(jìn)行深入分析(分析源碼基于16.0.1)。
com.google.common.cache.LocalCache.Segment#get(K, int, com.google.common.cache.CacheLoader super K,V>) V get(K key, int hash, CacheLoader super K, V> loader) throws ExecutionException { checkNotNull(key); checkNotNull(loader); try { if (count != 0) { // read-volatile // don"t call getLiveEntry, which would ignore loading values ReferenceEntrye = getEntry(key, hash); if (e != null) { long now = map.ticker.read(); V value = getLiveValue(e, now); if (value != null) { recordRead(e, now); statsCounter.recordHits(1); return scheduleRefresh(e, key, hash, value, now, loader); } ValueReference valueReference = e.getValueReference(); if (valueReference.isLoading()) { return waitForLoadingValue(e, key, valueReference); } } } // at this point e is either null or expired; return lockedGetOrLoad(key, hash, loader); } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof Error) { throw new ExecutionError((Error) cause); } else if (cause instanceof RuntimeException) { throw new UncheckedExecutionException(cause); } throw ee; } finally { postReadCleanup(); } }
首次調(diào)用會(huì)執(zhí)行l(wèi)ockedGetOrLoad函數(shù)
V lockedGetOrLoad(K key, int hash, CacheLoader super K, V> loader) throws ExecutionException { ReferenceEntrye; ValueReference valueReference = null; LoadingValueReference loadingValueReference = null; boolean createNewEntry = true; lock(); try { // re-read ticker once inside the lock long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count - 1; AtomicReferenceArray > table = this.table; int index = hash & (table.length() - 1); ReferenceEntry first = table.get(index); for (e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { valueReference = e.getValueReference(); if (valueReference.isLoading()) { createNewEntry = false; } else { V value = valueReference.get(); if (value == null) { enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED); } else if (map.isExpired(e, now)) { // This is a duplicate check, as preWriteCleanup already purged expired // entries, but let"s accomodate an incorrect expiration queue. enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED); } else { recordLockedRead(e, now); statsCounter.recordHits(1); // we were concurrent with loading; don"t consider refresh return value; } // immediately reuse invalid entries writeQueue.remove(e); accessQueue.remove(e); this.count = newCount; // write-volatile } break; } } if (createNewEntry) { loadingValueReference = new LoadingValueReference (); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); } }
最后調(diào)用loadSync(key, hash, loadingValueReference, loader);進(jìn)行進(jìn)行數(shù)據(jù)load。
public ListenableFutureloadFuture(K key, CacheLoader super K, V> loader) { stopwatch.start(); V previousValue = oldValue.get(); try { if (previousValue == null) { V newValue = loader.load(key); return set(newValue) ? futureValue : Futures.immediateFuture(newValue); } ListenableFuture newValue = loader.reload(key, previousValue); if (newValue == null) { return Futures.immediateFuture(null); } // To avoid a race, make sure the refreshed value is set into loadingValueReference // *before* returning newValue from the cache query. return Futures.transform(newValue, new Function () { @Override public V apply(V newValue) { LoadingValueReference.this.set(newValue); return newValue; } }); } catch (Throwable t) { if (t instanceof InterruptedException) { Thread.currentThread().interrupt(); } return setException(t) ? futureValue : fullyFailedFuture(t); } }
執(zhí)行loader.load將數(shù)據(jù)load進(jìn)緩存,可能你會(huì)想如果這個(gè)時(shí)候從DB或其他非內(nèi)存存儲(chǔ)中也沒(méi)找到數(shù)據(jù),這個(gè)時(shí)候LocalLoadingCache是怎么處理的呢?其實(shí)在這種情況下只需要throw異常信息就好,這樣LocalLoadingCache會(huì)放棄緩存。
但是讀源代碼細(xì)心的你可能會(huì)發(fā)現(xiàn)在lockedGetOrLoad中會(huì)先newEntry后面才load
if (createNewEntry) { loadingValueReference = new LoadingValueReference(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); }
其實(shí)實(shí)現(xiàn)很簡(jiǎn)單他在cache到異常信息后又會(huì)對(duì)緩存中的entry進(jìn)行remove操作,當(dāng)時(shí)找這段異常被cache的代碼也是找了很久時(shí)間了。
com.google.common.cache.LocalCache.Segment#getAndRecordStats V getAndRecordStats(K key, int hash, LoadingValueReferenceloadingValueReference, ListenableFuture newValue) throws ExecutionException { V value = null; try { value = getUninterruptibly(newValue); if (value == null) { throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + "."); } statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos()); storeLoadedValue(key, hash, loadingValueReference, value); return value; } finally { if (value == null) { statsCounter.recordLoadException(loadingValueReference.elapsedNanos()); removeLoadingValue(key, hash, loadingValueReference); } } }
執(zhí)行removeLoadingValue將load異常后的key刪除。
緩存策略從用法那小結(jié)可以看到我們?cè)趧?chuàng)建緩存的時(shí)候除了load還有一些其他特性如下:
maximumSize(10000) expireAfterWrite(10, TimeUnit.MINUTES)
這又是什么意思呢?這其實(shí)就是LocalLoadingCache提供的緩存策略。
maximumSize(10000) 設(shè)置緩存能保存的最多元素?cái)?shù)量。
expireAfterWrite(10, TimeUnit.MINUTES) 設(shè)置元素在寫后多久進(jìn)行銷毀。
其實(shí)還有maximumWeight、expireAfterAccess兩種元素過(guò)期策略。
maximumSize是maximumWeight的一種特殊形式,將所有的元素設(shè)置weight為1,也即就轉(zhuǎn)化為能存儲(chǔ)元素個(gè)數(shù)的上限值了。
expireAfterAccess和expireAfterWrite基本就一個(gè)意思,只是內(nèi)部用了兩種不同的計(jì)數(shù)方式(通過(guò)不同的queue進(jìn)行管理,被訪問(wèn)/修改進(jìn)行入隊(duì)操作)進(jìn)行訪問(wèn)、寫操作的記錄。
不多說(shuō)讓源碼說(shuō)話。
根據(jù)過(guò)期時(shí)間進(jìn)行緩存的淘汰策略思路:在進(jìn)行g(shù)et/put操作完成后對(duì)隊(duì)列(每次對(duì)緩存的操作頭會(huì)被其記錄下來(lái))進(jìn)行一次遍歷,然后按照過(guò)期時(shí)間淘汰過(guò)期的元素。
根據(jù)元素個(gè)數(shù)上限進(jìn)行清理的策略思路:在load新緩存值的時(shí)候比對(duì)下是否緩存容量(元素個(gè)數(shù))已經(jīng)達(dá)到上限,如果達(dá)到上限按照LRU算法進(jìn)行淘汰元素。
過(guò)期時(shí)間淘汰策略
從分析load那小結(jié)我們已經(jīng)展示過(guò)get的代碼,其中最后finally中有段postReadCleanup();方法,深入下去方法體就不然看出:
@GuardedBy("Segment.this") void expireEntries(long now) { drainRecencyQueue(); ReferenceEntrye; while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } }
進(jìn)行過(guò)期key清除策略,從這段代碼也能看出我為什么說(shuō)expireAfterAccess和expireAfterWrite基本就一個(gè)意思了吧。
其實(shí)還有一種清除緩存的策略:基于引用的回收但是還沒(méi)研究清除不便多說(shuō),這個(gè)策略清除的時(shí)機(jī)和過(guò)期時(shí)間策略一樣。
@GuardedBy("Segment.this") void drainReferenceQueues() { if (map.usesKeyReferences()) { drainKeyReferenceQueue(); } if (map.usesValueReferences()) { drainValueReferenceQueue(); } }
容量回收策略
在新key對(duì)應(yīng)的value load完后需要將value存放到緩存中去,插入完成后會(huì)進(jìn)行容量的check如果超過(guò)容量限制會(huì)執(zhí)行淘汰策略。對(duì)應(yīng)源碼:
com.google.common.cache.LocalCache.Segment#storeLoadedValue boolean storeLoadedValue(K key, int hash, LoadingValueReferenceoldValueReference, V newValue) { lock(); try { long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count + 1; if (newCount > this.threshold) { // ensure capacity expand(); newCount = this.count + 1; } AtomicReferenceArray > table = this.table; int index = hash & (table.length() - 1); ReferenceEntry first = table.get(index); for (ReferenceEntry e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { ValueReference valueReference = e.getValueReference(); V entryValue = valueReference.get(); // replace the old LoadingValueReference if it"s live, otherwise // perform a putIfAbsent if (oldValueReference == valueReference || (entryValue == null && valueReference != UNSET)) { ++modCount; if (oldValueReference.isActive()) { RemovalCause cause = (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED; enqueueNotification(key, hash, oldValueReference, cause); newCount--; } setValue(e, key, newValue, now); this.count = newCount; // write-volatile evictEntries(); return true; } // the loaded value was already clobbered valueReference = new WeightedStrongValueReference (newValue, 0); enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED); return false; } } ++modCount; ReferenceEntry newEntry = newEntry(key, hash, first); setValue(newEntry, key, newValue, now); table.set(index, newEntry); this.count = newCount; // write-volatile evictEntries(); return true; } finally { unlock(); postWriteCleanup(); } }
上面的存儲(chǔ)操作最終在進(jìn)行setValue后會(huì)執(zhí)行:
com.google.common.cache.LocalCache.Segment#evictEntries @GuardedBy("Segment.this") void evictEntries() { if (!map.evictsBySize()) { return; } drainRecencyQueue(); while (totalWeight > maxSegmentWeight) { ReferenceEntrye = getNextEvictable(); if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) { throw new AssertionError(); } } } // TODO(fry): instead implement this with an eviction head ReferenceEntry getNextEvictable() { for (ReferenceEntry e : accessQueue) { int weight = e.getValueReference().getWeight(); if (weight > 0) { return e; } } throw new AssertionError(); }
這里最終會(huì)根據(jù)LRU從緩存中將最近沒(méi)有使用過(guò)的元素進(jìn)行剔除操作。
最后說(shuō)下removalListener在LocalLoadingCache中提供了在元素被移除的時(shí)候供應(yīng)用進(jìn)行回調(diào)的函數(shù),這個(gè)函數(shù)通過(guò)removalListener進(jìn)行注冊(cè),當(dāng)有元素從緩存中淘汰后就會(huì)觸發(fā)其進(jìn)行調(diào)用。
接著上面移除元素進(jìn)行分析函數(shù)removeEntry
@GuardedBy("Segment.this") boolean removeEntry(ReferenceEntryentry, int hash, RemovalCause cause) { int newCount = this.count - 1; AtomicReferenceArray > table = this.table; int index = hash & (table.length() - 1); ReferenceEntry first = table.get(index); for (ReferenceEntry e = first; e != null; e = e.getNext()) { if (e == entry) { ++modCount; ReferenceEntry newFirst = removeValueFromChain( first, e, e.getKey(), hash, e.getValueReference(), cause); newCount = this.count - 1; table.set(index, newFirst); this.count = newCount; // write-volatile return true; } } return false; }
最終會(huì)調(diào)用
@GuardedBy("Segment.this") void enqueueNotification(@Nullable K key, int hash, ValueReferencevalueReference, RemovalCause cause) { totalWeight -= valueReference.getWeight(); if (cause.wasEvicted()) { statsCounter.recordEviction(); } if (map.removalNotificationQueue != DISCARDING_QUEUE) { V value = valueReference.get(); RemovalNotification notification = new RemovalNotification (key, value, cause); map.removalNotificationQueue.offer(notification); } }
將建立一個(gè)RemovalNotification隊(duì)列進(jìn)行保存刪除元素。
在讀/寫完成后會(huì)進(jìn)行通知
com.google.common.cache.LocalCache.Segment#postWriteCleanup /** * Performs routine cleanup following a write. */ void postWriteCleanup() { runUnlockedCleanup(); } void cleanUp() { long now = map.ticker.read(); runLockedCleanup(now); runUnlockedCleanup(); }
runUnlockedCleanup源碼會(huì)回調(diào)com.google.common.cache.RemovalListener#onRemoval進(jìn)行緩存元素刪除后置處理。
void processPendingNotifications() { RemovalNotification最后類圖一張notification; while ((notification = removalNotificationQueue.poll()) != null) { try { removalListener.onRemoval(notification); } catch (Throwable e) { logger.log(Level.WARNING, "Exception thrown by removal listener", e); } } }
覺(jué)得圖不夠清晰可以點(diǎn)擊查看大圖。
本篇也主要是對(duì)LocalLoadingCache從運(yùn)用這個(gè)層次更向前走了一步,對(duì)我們使用過(guò)程其邏輯背后的實(shí)現(xiàn)進(jìn)行了一定深入分析。我在初次看到這個(gè)方式也是很疑惑其底層到底是如何實(shí)現(xiàn)的,于是有了這篇文章,通過(guò)源碼進(jìn)行跟蹤分析其背后的實(shí)現(xiàn)邏輯。
后面還會(huì)分析org.springframework.cache.guava.GuavaCacheManager如何將GuavaCache進(jìn)行管理的,通過(guò)和spring更好的結(jié)合而消除顯式調(diào)用cache get/put的方式。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/61890.html
摘要:執(zhí)行將異常后的刪除。根據(jù)元素個(gè)數(shù)上限進(jìn)行清理的策略思路在新緩存值的時(shí)候比對(duì)下是否緩存容量元素個(gè)數(shù)已經(jīng)達(dá)到上限,如果達(dá)到上限按照算法進(jìn)行淘汰元素。在讀寫完成后會(huì)進(jìn)行通知源碼會(huì)回調(diào)進(jìn)行緩存元素刪除后置處理。 Google Guava LocalLoadingCache 前言 在我們編程的過(guò)程中會(huì)遇到一些在程序中需要重試使用的數(shù)據(jù),在這種情況下我們就可以考慮利用緩存(內(nèi)存)的優(yōu)勢(shì)來(lái)提供程序訪...
摘要:并且添加了監(jiān)聽(tīng)器,當(dāng)數(shù)據(jù)被刪除后會(huì)打印日志。六總結(jié)回顧緩存加載顯示插入緩存回收,定時(shí),,軟弱引用,顯示刪除接口方法,監(jiān)聽(tīng)器清理緩存時(shí)間只有在獲取數(shù)據(jù)時(shí)才或清理緩存,使用者可以單起線程采用方法主動(dòng)清理。 摘要: 學(xué)習(xí)Google內(nèi)部使用的工具包Guava,在Java項(xiàng)目中輕松地增加緩存,提高程序獲取數(shù)據(jù)的效率。 一、什么是緩存? 根據(jù)科普中國(guó)的定義,緩存就是數(shù)據(jù)交換的緩沖區(qū)(稱作Cach...
摘要:并且添加了監(jiān)聽(tīng)器,當(dāng)數(shù)據(jù)被刪除后會(huì)打印日志。六總結(jié)回顧緩存加載顯示插入緩存回收,定時(shí),,軟弱引用,顯示刪除接口方法,監(jiān)聽(tīng)器清理緩存時(shí)間只有在獲取數(shù)據(jù)時(shí)才或清理緩存,使用者可以單起線程采用方法主動(dòng)清理。 摘要: 學(xué)習(xí)Google內(nèi)部使用的工具包Guava,在Java項(xiàng)目中輕松地增加緩存,提高程序獲取數(shù)據(jù)的效率。一、什么是緩存?根據(jù)科普中國(guó)的定義,緩存就是數(shù)據(jù)交換的緩沖區(qū)(稱作Cache)...
摘要:依賴這里使用配置配置文件配置配置文件配置使用 maven依賴 org.springframework.boot spring-boot-starter-cache com.google.guava guava 19...
摘要:的配置文件,使用前綴的屬性進(jìn)行配置。在方法的調(diào)用前并不會(huì)檢查緩存,方法始終都會(huì)被調(diào)用。手動(dòng)使用在實(shí)際開(kāi)發(fā)過(guò)程中,存在不使用注解,需要自己添加緩存的情況。如果該屬性值為,則表示對(duì)象可以無(wú)限期地存在于緩存中。 SpringBoot在annotation的層面實(shí)現(xiàn)了數(shù)據(jù)緩存的功能,基于Spring的AOP技術(shù)。所有的緩存配置只是在annotation層面配置,像聲明式事務(wù)一樣。 Spring...
閱讀 1880·2021-11-25 09:43
閱讀 2155·2021-11-19 09:40
閱讀 3435·2021-11-18 13:12
閱讀 1748·2021-09-29 09:35
閱讀 671·2021-08-24 10:00
閱讀 2516·2019-08-30 15:55
閱讀 1720·2019-08-30 12:56
閱讀 1826·2019-08-28 17:59