在本系列的上一篇文章中,我們了解了 Resilience4j 以及如何使用其 Retry 模塊?,F(xiàn)在讓我們了解 RateLimiter - 它是什么,何時(shí)以及如何使用它,以及在實(shí)施速率限制(或者也稱為“節(jié)流”)時(shí)要注意什么。
代碼示例
本文附有GitHub 上的工作代碼示例。
什么是 Resilience4j?
請(qǐng)參閱上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理。
什么是限速?
我們可以從兩個(gè)角度來看待速率限制——作為服務(wù)提供者和作為服務(wù)消費(fèi)者。
服務(wù)端限速
作為服務(wù)提供商,我們實(shí)施速率限制以保護(hù)我們的資源免受過載和拒絕服務(wù) (DoS) 攻-擊。
為了滿足我們與所有消費(fèi)者的服務(wù)水平協(xié)議 (SLA),我們希望確保一個(gè)導(dǎo)致流量激增的消費(fèi)者不會(huì)影響我們對(duì)他人的服務(wù)質(zhì)量。
我們通過設(shè)置在給定時(shí)間單位內(nèi)允許消費(fèi)者發(fā)出多少請(qǐng)求的限制來做到這一點(diǎn)。我們通過適當(dāng)?shù)捻憫?yīng)拒絕任何超出限制的請(qǐng)求,例如 HTTP 狀態(tài) 429(請(qǐng)求過多)。這稱為服務(wù)器端速率限制。
速率限制以每秒請(qǐng)求數(shù) (rps)、每分鐘請(qǐng)求數(shù) (rpm) 或類似形式指定。某些服務(wù)在不同的持續(xù)時(shí)間(例如 50 rpm 且不超過 2500 rph)和一天中的不同時(shí)間(例如,白天 100 rps 和晚上 150 rps)有多個(gè)速率限制。該限制可能適用于單個(gè)用戶(由用戶 ID、IP 地址、API 訪問密鑰等標(biāo)識(shí))或多租戶應(yīng)用程序中的租戶。
客戶端限速
作為服務(wù)的消費(fèi)者,我們希望確保我們不會(huì)使服務(wù)提供者過載。此外,我們不想招致意外的成本——無論是金錢上的還是服務(wù)質(zhì)量方面的。
如果我們消費(fèi)的服務(wù)是有彈性的,就會(huì)發(fā)生這種情況。服務(wù)提供商可能不會(huì)限制我們的請(qǐng)求,而是會(huì)因額外負(fù)載而向我們收取額外費(fèi)用。有些甚至在短時(shí)間內(nèi)禁止行為不端的客戶。消費(fèi)者為防止此類問題而實(shí)施的速率限制稱為客戶端速率限制。
何時(shí)使用 RateLimiter?
resilience4j-ratelimiter 用于客戶端速率限制。
服務(wù)器端速率限制需要諸如緩存和多個(gè)服務(wù)器實(shí)例之間的協(xié)調(diào)之類的東西,這是 resilience4j 不支持的。對(duì)于服務(wù)器端的速率限制,有 API 網(wǎng)關(guān)和 API 過濾器,例如 Kong API Gateway 和 Repose API Filter。Resilience4j 的 RateLimiter 模塊并不打算取代它們。
Resilience4j RateLimiter 概念
想要調(diào)用遠(yuǎn)程服務(wù)的線程首先向 RateLimiter 請(qǐng)求許可。如果 RateLimiter 允許,則線程繼續(xù)。 否則,RateLimiter 會(huì)停放線程或?qū)⑵渲糜诘却隣顟B(tài)。
RateLimiter 定期創(chuàng)建新權(quán)限。當(dāng)權(quán)限可用時(shí),線程會(huì)收到通知,然后可以繼續(xù)。
一段時(shí)間內(nèi)允許的調(diào)用次數(shù)稱為 limitForPeriod
。RateLimiter 刷新權(quán)限的頻率由 limitRefreshPeriod
指定。timeoutDuration
指定線程可以等待多長(zhǎng)時(shí)間獲取權(quán)限。如果在等待時(shí)間結(jié)束時(shí)沒有可用的權(quán)限,RateLimiter 將拋出 RequestNotPermitted
運(yùn)行時(shí)異常。
使用Resilience4j RateLimiter 模塊
RateLimiterRegistry
、RateLimiterConfig
和 RateLimiter
是 resilience4j-ratelimiter 的主要抽象。
RateLimiterRegistry
是一個(gè)用于創(chuàng)建和管理 RateLimiter
對(duì)象的工廠。
RateLimiterConfig
封裝了 limitForPeriod
、limitRefreshPeriod
和 timeoutDuration
配置。每個(gè) RateLimiter
對(duì)象都與一個(gè) RateLimiterConfig
相關(guān)聯(lián)。
RateLimiter
提供輔助方法來為包含遠(yuǎn)程調(diào)用的函數(shù)式接口或 lambda 表達(dá)式創(chuàng)建裝飾器。
讓我們看看如何使用 RateLimiter 模塊中可用的各種功能。假設(shè)我們正在為一家航空公司建立一個(gè)網(wǎng)站,以允許其客戶搜索和預(yù)訂航班。我們的服務(wù)與 FlightSearchService
類封裝的遠(yuǎn)程服務(wù)對(duì)話。
基本示例
第一步是創(chuàng)建一個(gè) RateLimiterConfig
:
RateLimiterConfig config = RateLimiterConfig.ofDefaults();
這將創(chuàng)建一個(gè) RateLimiterConfig
,其默認(rèn)值為 limitForPeriod
(50)、limitRefreshPeriod
(500ns) 和 timeoutDuration
(5s)。
假設(shè)我們與航空公司服務(wù)的合同規(guī)定我們可以以 1 rps 調(diào)用他們的搜索 API。然后我們將像這樣創(chuàng)建 RateLimiterConfig
:
RateLimiterConfig config = RateLimiterConfig.custom() .limitForPeriod(1) .limitRefreshPeriod(Duration.ofSeconds(1)) .timeoutDuration(Duration.ofSeconds(1)) .build();
如果線程無法在指定的 1 秒 timeoutDuration
內(nèi)獲取權(quán)限,則會(huì)出錯(cuò)。
然后我們創(chuàng)建一個(gè) RateLimiter
并裝飾 searchFlights()
調(diào)用:
RateLimiterRegistry registry = RateLimiterRegistry.of(config);RateLimiter limiter = registry.rateLimiter("flightSearchService");// FlightSearchService and SearchRequest creation omittedSupplier> flightsSupplier = RateLimiter.decorateSupplier(limiter, () -> service.searchFlights(request));
最后,我們多次使用裝飾過的 Supplier<List<Flight>>
:
for (int i=0; i<3; i++) { System.out.println(flightsSupplier.get());}
示例輸出中的時(shí)間戳顯示每秒發(fā)出一個(gè)請(qǐng)求:
Searching for flights; current time = 15:29:40 786...[Flight{flightNumber=XY 765, ... }, ... ]Searching for flights; current time = 15:29:41 791...[Flight{flightNumber=XY 765, ... }, ... ]
如果超出限制,我們會(huì)收到 RequestNotPermitted
異常:
Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter flightSearchService does not permit further calls at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43) at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)... other lines omitted ...
裝飾方法拋出已檢異常
假設(shè)我們正在調(diào)用FlightSearchService.searchFlightsThrowingException()
,它可以拋出一個(gè)已檢 Exception
。那么我們就不能使用RateLimiter.decorateSupplier()
。我們將使用RateLimiter.decorateCheckedSupplier()
代替:
CheckedFunction0> flights = RateLimiter.decorateCheckedSupplier(limiter, () -> service.searchFlightsThrowingException(request));try { System.out.println(flights.apply());} catch (...) { // exception handling}
RateLimiter.decorateCheckedSupplier()
返回一個(gè) CheckedFunction0
,它表示一個(gè)沒有參數(shù)的函數(shù)。請(qǐng)注意對(duì) CheckedFunction0
對(duì)象的 apply()
調(diào)用以調(diào)用遠(yuǎn)程操作。
如果我們不想使用 Suppliers
,RateLimiter
提供了更多的輔助裝飾器方法,如 decorateFunction()
、decorateCheckedFunction()
、decorateRunnable()
、decorateCallable()
等,以與其他語言結(jié)構(gòu)一起使用。decorateChecked*
方法用于裝飾拋出已檢查異常的方法。
應(yīng)用多個(gè)速率限制
假設(shè)航空公司的航班搜索有多個(gè)速率限制:2 rps 和 40 rpm。 我們可以通過創(chuàng)建多個(gè) RateLimiters
在客戶端應(yīng)用多個(gè)限制:
RateLimiterConfig rpsConfig = RateLimiterConfig.custom(). limitForPeriod(2). limitRefreshPeriod(Duration.ofSeconds(1)). timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterConfig rpmConfig = RateLimiterConfig.custom(). limitForPeriod(40). limitRefreshPeriod(Duration.ofMinutes(1)). timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterRegistry registry = RateLimiterRegistry.of(rpsConfig);RateLimiter rpsLimiter = registry.rateLimiter("flightSearchService_rps", rpsConfig);RateLimiter rpmLimiter = registry.rateLimiter("flightSearchService_rpm", rpmConfig); 然后我們使用兩個(gè) RateLimiters 裝飾 searchFlights() 方法:Supplier> rpsLimitedSupplier = RateLimiter.decorateSupplier(rpsLimiter, () -> service.searchFlights(request));Supplier> flightsSupplier = RateLimiter.decorateSupplier(rpmLimiter, rpsLimitedSupplier);
示例輸出顯示每秒發(fā)出 2 個(gè)請(qǐng)求,并且限制為 40 個(gè)請(qǐng)求:
Searching for flights; current time = 15:13:21 246...Searching for flights; current time = 15:13:21 249...Searching for flights; current time = 15:13:22 212...Searching for flights; current time = 15:13:40 215...Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted:RateLimiter flightSearchService_rpm does not permit further callsat io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)
在運(yùn)行時(shí)更改限制
如果需要,我們可以在運(yùn)行時(shí)更改 limitForPeriod
和 timeoutDuration
的值:
limiter.changeLimitForPeriod(2);limiter.changeTimeoutDuration(Duration.ofSeconds(2));
例如,如果我們的速率限制根據(jù)一天中的時(shí)間而變化,則此功能很有用 - 我們可以有一個(gè)計(jì)劃線程來更改這些值。新值不會(huì)影響當(dāng)前正在等待權(quán)限的線程。
RateLimiter和 Retry一起使用
假設(shè)我們想在收到 RequestNotPermitted
異常時(shí)重試,因?yàn)樗且粋€(gè)暫時(shí)性錯(cuò)誤。我們會(huì)像往常一樣創(chuàng)建 RateLimiter
和 Retry
對(duì)象。然后我們裝飾一個(gè) Supplier
的供應(yīng)商并用 Retry
包裝它:
Supplier> rateLimitedFlightsSupplier = RateLimiter.decorateSupplier(rateLimiter, () -> service.searchFlights(request));Supplier> retryingFlightsSupplier = Retry.decorateSupplier(retry, rateLimitedFlightsSupplier);
示例輸出顯示為 RequestNotPermitted
異常重試請(qǐng)求:
Searching for flights; current time = 15:29:39 847Flight search successful[Flight{flightNumber=XY 765, ... }, ... ]Searching for flights; current time = 17:10:09 218...[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]2020-07-27T17:10:09.484: Retry rateLimitedFlightSearch, waiting PT1S until attempt 1. Last attempt failed with exception io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter flightSearchService does not permit further calls.Searching for flights; current time = 17:10:10 492...2020-07-27T17:10:10.494: Retry rateLimitedFlightSearch recorded a successful retry attempt...[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]
我們創(chuàng)建裝飾器的順序很重要。如果我們將 Retry
與 RateLimiter
包裝在一起,它將不起作用。
RateLimiter 事件
RateLimiter
有一個(gè) EventPublisher
,它在調(diào)用遠(yuǎn)程操作時(shí)生成 RateLimiterOnSuccessEvent
和 RateLimiterOnFailureEvent
類型的事件,以指示獲取權(quán)限是否成功。我們可以監(jiān)聽這些事件并記錄它們,例如:
RateLimiter limiter = registry.rateLimiter("flightSearchService");limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));limiter.getEventPublisher().onFailure(e -> System.out.println(e.toString()));
日志輸出示例如下:
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName=flightSearchService, creationTime=2020-07-21T19:14:33.127+05:30}... other lines omitted ...RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName=flightSearchService, creationTime=2020-07-21T19:14:33.186+05:30}
RateLimiter 指標(biāo)
假設(shè)在實(shí)施客戶端節(jié)流后,我們發(fā)現(xiàn) API 的響應(yīng)時(shí)間增加了。這是可能的 - 正如我們所見,如果在線程調(diào)用遠(yuǎn)程操作時(shí)權(quán)限不可用,RateLimiter
會(huì)將線程置于等待狀態(tài)。
如果我們的請(qǐng)求處理線程經(jīng)常等待獲得許可,則可能意味著我們的 limitForPeriod
太低。也許我們需要與我們的服務(wù)提供商合作并首先獲得額外的配額。
監(jiān)控 RateLimiter
指標(biāo)可幫助我們識(shí)別此類容量問題,并確保我們?cè)?RateLimiterConfig
上設(shè)置的值運(yùn)行良好。
RateLimiter
跟蹤兩個(gè)指標(biāo):可用權(quán)限的數(shù)量(resilience4j.ratelimiter.available.permissions
)和等待權(quán)限的線程數(shù)量(resilience4j.ratelimiter.waiting.threads
)。
首先,我們像往常一樣創(chuàng)建 RateLimiterConfig
、RateLimiterRegistry
和 RateLimiter
。然后,我們創(chuàng)建一個(gè) MeterRegistry
并將 RateLimiterRegistry
綁定到它:
MeterRegistry meterRegistry = new SimpleMeterRegistry();TaggedRateLimiterMetrics.ofRateLimiterRegistry(registry) .bindTo(meterRegistry);
運(yùn)行幾次限速操作后,我們顯示捕獲的指標(biāo):
Consumer meterConsumer = meter -> { String desc = meter.getId().getDescription(); String metricName = meter.getId().getName(); Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false) .filter(m -> m.getStatistic().name().equals("VALUE")) .findFirst() .map(m -> m.getValue()) .orElse(0.0); System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);
這是一些示例輸出:
The number of available permissions - resilience4j.ratelimiter.available.permissions: -6.0The number of waiting threads - resilience4j.ratelimiter.waiting_threads: 7.0
resilience4j.ratelimiter.available.permissions
的負(fù)值顯示為請(qǐng)求線程保留的權(quán)限數(shù)。在實(shí)際應(yīng)用中,我們會(huì)定期將數(shù)據(jù)導(dǎo)出到監(jiān)控系統(tǒng),并在儀表板上進(jìn)行分析。
實(shí)施客戶端速率限制時(shí)的陷阱和良好實(shí)踐
使速率限制器成為單例
對(duì)給定遠(yuǎn)程服務(wù)的所有調(diào)用都應(yīng)通過相同的 RateLimiter
實(shí)例。對(duì)于給定的遠(yuǎn)程服務(wù),RateLimiter
必須是單例。
如果我們不強(qiáng)制執(zhí)行此操作,我們代碼庫(kù)的某些區(qū)域可能會(huì)繞過 RateLimiter
直接調(diào)用遠(yuǎn)程服務(wù)。為了防止這種情況,對(duì)遠(yuǎn)程服務(wù)的實(shí)際調(diào)用應(yīng)該在核心、內(nèi)部層和其他區(qū)域應(yīng)該使用內(nèi)部層暴露的限速裝飾器。
我們?nèi)绾未_保未來的新開發(fā)人員理解這一意圖?查看 Tom 的文章,其中揭示了一種解決此類問題的方法,即通過組織包結(jié)構(gòu)來明確此類意圖。此外,它還展示了如何通過在 ArchUnit 測(cè)試中編碼意圖來強(qiáng)制執(zhí)行此操作。
為多個(gè)服務(wù)器實(shí)例配置速率限制器
為配置找出正確的值可能很棘手。如果我們?cè)诩褐羞\(yùn)行多個(gè)服務(wù)實(shí)例,limitForPeriod
的值必須考慮到這一點(diǎn)。
例如,如果上游服務(wù)的速率限制為 100 rps,而我們的服務(wù)有 4 個(gè)實(shí)例,那么我們將配置 25 rps 作為每個(gè)實(shí)例的限制。
然而,這假設(shè)我們每個(gè)實(shí)例上的負(fù)載大致相同。 如果情況并非如此,或者如果我們的服務(wù)本身具有彈性并且實(shí)例數(shù)量可能會(huì)有所不同,那么 Resilience4j 的 RateLimiter
可能不適合。
在這種情況下,我們需要一個(gè)速率限制器,將其數(shù)據(jù)保存在分布式緩存中,而不是像 Resilience4j RateLimiter
那樣保存在內(nèi)存中。但這會(huì)影響我們服務(wù)的響應(yīng)時(shí)間。另一種選擇是實(shí)現(xiàn)某種自適應(yīng)速率限制。盡管 Resilience4j 可能會(huì)支持它,但尚不清楚何時(shí)可用。
選擇正確的超時(shí)時(shí)間
對(duì)于 timeoutDuration
配置值,我們應(yīng)該牢記 API 的預(yù)期響應(yīng)時(shí)間。
如果我們將 timeoutDuration 設(shè)置得太高,響應(yīng)時(shí)間和吞吐量就會(huì)受到影響。如果它太低,我們的錯(cuò)誤率可能會(huì)增加。
由于此處可能涉及一些反復(fù)試驗(yàn),因此一個(gè)好的做法是將我們?cè)?RateLimiterConfig
中使用的值(如 timeoutDuration
、limitForPeriod
和 limitRefreshPeriod
)作為我們服務(wù)之外的配置進(jìn)行維護(hù)。然后我們可以在不更改代碼的情況下更改它們。
調(diào)優(yōu)客戶端和服務(wù)器端速率限制器
實(shí)現(xiàn)客戶端速率限制并不能保證我們永遠(yuǎn)不會(huì)受到上游服務(wù)的速率限制。
假設(shè)我們有來自上游服務(wù)的 2 rps 的限制,并且我們將 limitForPeriod
配置為 2,將 limitRefreshPeriod
配置為 1s。如果我們?cè)诘诙氲淖詈髱缀撩氚l(fā)出兩個(gè)請(qǐng)求,在此之前沒有其他調(diào)用,RateLimiter
將允許它們。如果我們?cè)谙乱幻氲那皫缀撩雰?nèi)再進(jìn)行兩次調(diào)用,RateLimiter
也會(huì)允許它們,因?yàn)橛袃蓚€(gè)新權(quán)限可用。但是上游服務(wù)可能會(huì)拒絕這兩個(gè)請(qǐng)求,因?yàn)榉?wù)器通常會(huì)實(shí)現(xiàn)基于滑動(dòng)窗口的速率限制。
為了保證我們永遠(yuǎn)不會(huì)從上游服務(wù)中獲得超過速率,我們需要將客戶端中的固定窗口配置為短于服務(wù)中的滑動(dòng)窗口。因此,如果我們?cè)谇懊娴氖纠袑?limitForPeriod
配置為 1 并將 limitRefreshPeriod
配置為 500ms,我們就不會(huì)出現(xiàn)超出速率限制的錯(cuò)誤。但是,第一個(gè)請(qǐng)求之后的所有三個(gè)請(qǐng)求都會(huì)等待,從而增加響應(yīng)時(shí)間并降低吞吐量。
結(jié)論
在本文中,我們學(xué)習(xí)了如何使用 Resilience4j 的 RateLimiter 模塊來實(shí)現(xiàn)客戶端速率限制。 我們通過實(shí)際示例研究了配置它的不同方法。我們學(xué)習(xí)了一些在實(shí)施速率限制時(shí)要記住的良好做法和注意事項(xiàng)。
您可以使用 GitHub 上的代碼演示一個(gè)完整的應(yīng)用程序來說明這些想法。
本文譯自: Implementing Rate Limiting with Resilience4j - Reflectoring