在本系列的上一篇文章中,我們了解了 Resilience4j 以及如何使用其 Retry 模塊?,F(xiàn)在讓我們了解 RateLimiter - 它是什么,何時(shí)以及如何使用它,以及在實(shí)施速率限制(或者也稱為“節(jié)流”)時(shí)要注意什么。

代碼示例

本文附有GitHub 上的工作代碼示例。

什么是 Resilience4j?

請(qǐng)參閱上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理。

什么是限速?

我們可以從兩個(gè)角度來看待速率限制——作為服務(wù)提供者和作為服務(wù)消費(fèi)者。

服務(wù)端限速

作為服務(wù)提供商,我們實(shí)施速率限制以保護(hù)我們的資源免受過載和拒絕服務(wù) (DoS) 攻-擊。

為了滿足我們與所有消費(fèi)者的服務(wù)水平協(xié)議 (SLA),我們希望確保一個(gè)導(dǎo)致流量激增的消費(fèi)者不會(huì)影響我們對(duì)他人的服務(wù)質(zhì)量。

我們通過設(shè)置在給定時(shí)間單位內(nèi)允許消費(fèi)者發(fā)出多少請(qǐng)求的限制來做到這一點(diǎn)。我們通過適當(dāng)?shù)捻憫?yīng)拒絕任何超出限制的請(qǐng)求,例如 HTTP 狀態(tài) 429(請(qǐng)求過多)。這稱為服務(wù)器端速率限制。

速率限制以每秒請(qǐng)求數(shù) (rps)、每分鐘請(qǐng)求數(shù) (rpm) 或類似形式指定。某些服務(wù)在不同的持續(xù)時(shí)間(例如 50 rpm 且不超過 2500 rph)和一天中的不同時(shí)間(例如,白天 100 rps 和晚上 150 rps)有多個(gè)速率限制。該限制可能適用于單個(gè)用戶(由用戶 ID、IP 地址、API 訪問密鑰等標(biāo)識(shí))或多租戶應(yīng)用程序中的租戶。

客戶端限速

作為服務(wù)的消費(fèi)者,我們希望確保我們不會(huì)使服務(wù)提供者過載。此外,我們不想招致意外的成本——無論是金錢上的還是服務(wù)質(zhì)量方面的。

如果我們消費(fèi)的服務(wù)是有彈性的,就會(huì)發(fā)生這種情況。服務(wù)提供商可能不會(huì)限制我們的請(qǐng)求,而是會(huì)因額外負(fù)載而向我們收取額外費(fèi)用。有些甚至在短時(shí)間內(nèi)禁止行為不端的客戶。消費(fèi)者為防止此類問題而實(shí)施的速率限制稱為客戶端速率限制。

何時(shí)使用 RateLimiter?

resilience4j-ratelimiter 用于客戶端速率限制。

服務(wù)器端速率限制需要諸如緩存和多個(gè)服務(wù)器實(shí)例之間的協(xié)調(diào)之類的東西,這是 resilience4j 不支持的。對(duì)于服務(wù)器端的速率限制,有 API 網(wǎng)關(guān)和 API 過濾器,例如 Kong API GatewayRepose API Filter。Resilience4j 的 RateLimiter 模塊并不打算取代它們。

Resilience4j RateLimiter 概念

想要調(diào)用遠(yuǎn)程服務(wù)的線程首先向 RateLimiter 請(qǐng)求許可。如果 RateLimiter 允許,則線程繼續(xù)。 否則,RateLimiter 會(huì)停放線程或?qū)⑵渲糜诘却隣顟B(tài)。

RateLimiter 定期創(chuàng)建新權(quán)限。當(dāng)權(quán)限可用時(shí),線程會(huì)收到通知,然后可以繼續(xù)。

一段時(shí)間內(nèi)允許的調(diào)用次數(shù)稱為 limitForPeriod。RateLimiter 刷新權(quán)限的頻率由 limitRefreshPeriod 指定。timeoutDuration 指定線程可以等待多長(zhǎng)時(shí)間獲取權(quán)限。如果在等待時(shí)間結(jié)束時(shí)沒有可用的權(quán)限,RateLimiter 將拋出 RequestNotPermitted 運(yùn)行時(shí)異常。

使用Resilience4j RateLimiter 模塊

RateLimiterRegistry、RateLimiterConfigRateLimiterresilience4j-ratelimiter 的主要抽象。

RateLimiterRegistry 是一個(gè)用于創(chuàng)建和管理 RateLimiter 對(duì)象的工廠。

RateLimiterConfig 封裝了 limitForPeriod、limitRefreshPeriodtimeoutDuration 配置。每個(gè) RateLimiter 對(duì)象都與一個(gè) RateLimiterConfig 相關(guān)聯(lián)。

RateLimiter 提供輔助方法來為包含遠(yuǎn)程調(diào)用的函數(shù)式接口或 lambda 表達(dá)式創(chuàng)建裝飾器。

讓我們看看如何使用 RateLimiter 模塊中可用的各種功能。假設(shè)我們正在為一家航空公司建立一個(gè)網(wǎng)站,以允許其客戶搜索和預(yù)訂航班。我們的服務(wù)與 FlightSearchService 類封裝的遠(yuǎn)程服務(wù)對(duì)話。

基本示例

第一步是創(chuàng)建一個(gè) RateLimiterConfig

RateLimiterConfig config = RateLimiterConfig.ofDefaults();

這將創(chuàng)建一個(gè) RateLimiterConfig,其默認(rèn)值為 limitForPeriod (50)、limitRefreshPeriod(500ns) 和 timeoutDuration (5s)。

假設(shè)我們與航空公司服務(wù)的合同規(guī)定我們可以以 1 rps 調(diào)用他們的搜索 API。然后我們將像這樣創(chuàng)建 RateLimiterConfig

RateLimiterConfig config = RateLimiterConfig.custom()  .limitForPeriod(1)  .limitRefreshPeriod(Duration.ofSeconds(1))  .timeoutDuration(Duration.ofSeconds(1))  .build();

如果線程無法在指定的 1 秒 timeoutDuration 內(nèi)獲取權(quán)限,則會(huì)出錯(cuò)。

然后我們創(chuàng)建一個(gè) RateLimiter 并裝飾 searchFlights() 調(diào)用:

RateLimiterRegistry registry = RateLimiterRegistry.of(config);RateLimiter limiter = registry.rateLimiter("flightSearchService");// FlightSearchService and SearchRequest creation omittedSupplier> flightsSupplier =  RateLimiter.decorateSupplier(limiter,    () -> service.searchFlights(request));

最后,我們多次使用裝飾過的 Supplier<List<Flight>>

for (int i=0; i<3; i++) {  System.out.println(flightsSupplier.get());}

示例輸出中的時(shí)間戳顯示每秒發(fā)出一個(gè)請(qǐng)求:

Searching for flights; current time = 15:29:40 786...[Flight{flightNumber=XY 765, ... }, ... ]Searching for flights; current time = 15:29:41 791...[Flight{flightNumber=XY 765, ... }, ... ]

如果超出限制,我們會(huì)收到 RequestNotPermitted 異常:

Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter flightSearchService does not permit further calls at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43) at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)... other lines omitted ...

裝飾方法拋出已檢異常

假設(shè)我們正在調(diào)用
FlightSearchService.searchFlightsThrowingException() ,它可以拋出一個(gè)已檢 Exception。那么我們就不能使用
RateLimiter.decorateSupplier()。我們將使用
RateLimiter.decorateCheckedSupplier() 代替:

CheckedFunction0> flights =  RateLimiter.decorateCheckedSupplier(limiter,    () -> service.searchFlightsThrowingException(request));try {  System.out.println(flights.apply());} catch (...) {  // exception handling}

RateLimiter.decorateCheckedSupplier() 返回一個(gè) CheckedFunction0,它表示一個(gè)沒有參數(shù)的函數(shù)。請(qǐng)注意對(duì) CheckedFunction0 對(duì)象的 apply() 調(diào)用以調(diào)用遠(yuǎn)程操作。

如果我們不想使用 Suppliers ,RateLimiter 提供了更多的輔助裝飾器方法,如 decorateFunction()、decorateCheckedFunction()、decorateRunnable()、decorateCallable() 等,以與其他語言結(jié)構(gòu)一起使用。decorateChecked* 方法用于裝飾拋出已檢查異常的方法。

應(yīng)用多個(gè)速率限制

假設(shè)航空公司的航班搜索有多個(gè)速率限制:2 rps 和 40 rpm。 我們可以通過創(chuàng)建多個(gè) RateLimiters 在客戶端應(yīng)用多個(gè)限制:

RateLimiterConfig rpsConfig = RateLimiterConfig.custom().  limitForPeriod(2).  limitRefreshPeriod(Duration.ofSeconds(1)).  timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterConfig rpmConfig = RateLimiterConfig.custom().  limitForPeriod(40).  limitRefreshPeriod(Duration.ofMinutes(1)).  timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterRegistry registry = RateLimiterRegistry.of(rpsConfig);RateLimiter rpsLimiter =  registry.rateLimiter("flightSearchService_rps", rpsConfig);RateLimiter rpmLimiter =  registry.rateLimiter("flightSearchService_rpm", rpmConfig);  然后我們使用兩個(gè) RateLimiters 裝飾 searchFlights() 方法:Supplier> rpsLimitedSupplier =  RateLimiter.decorateSupplier(rpsLimiter,    () -> service.searchFlights(request));Supplier> flightsSupplier  = RateLimiter.decorateSupplier(rpmLimiter, rpsLimitedSupplier);

示例輸出顯示每秒發(fā)出 2 個(gè)請(qǐng)求,并且限制為 40 個(gè)請(qǐng)求:

Searching for flights; current time = 15:13:21 246...Searching for flights; current time = 15:13:21 249...Searching for flights; current time = 15:13:22 212...Searching for flights; current time = 15:13:40 215...Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted:RateLimiter flightSearchService_rpm does not permit further callsat io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)

在運(yùn)行時(shí)更改限制

如果需要,我們可以在運(yùn)行時(shí)更改 limitForPeriodtimeoutDuration 的值:

limiter.changeLimitForPeriod(2);limiter.changeTimeoutDuration(Duration.ofSeconds(2));

例如,如果我們的速率限制根據(jù)一天中的時(shí)間而變化,則此功能很有用 - 我們可以有一個(gè)計(jì)劃線程來更改這些值。新值不會(huì)影響當(dāng)前正在等待權(quán)限的線程。

RateLimiter和 Retry一起使用

假設(shè)我們想在收到 RequestNotPermitted 異常時(shí)重試,因?yàn)樗且粋€(gè)暫時(shí)性錯(cuò)誤。我們會(huì)像往常一樣創(chuàng)建 RateLimiterRetry 對(duì)象。然后我們裝飾一個(gè) Supplier 的供應(yīng)商并用 Retry 包裝它:

Supplier> rateLimitedFlightsSupplier =  RateLimiter.decorateSupplier(rateLimiter,    () -> service.searchFlights(request));Supplier> retryingFlightsSupplier =  Retry.decorateSupplier(retry, rateLimitedFlightsSupplier);

示例輸出顯示為 RequestNotPermitted 異常重試請(qǐng)求:

Searching for flights; current time = 15:29:39 847Flight search successful[Flight{flightNumber=XY 765, ... }, ... ]Searching for flights; current time = 17:10:09 218...[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]2020-07-27T17:10:09.484: Retry rateLimitedFlightSearch, waiting PT1S until attempt 1. Last attempt failed with exception io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter flightSearchService does not permit further calls.Searching for flights; current time = 17:10:10 492...2020-07-27T17:10:10.494: Retry rateLimitedFlightSearch recorded a successful retry attempt...[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]

我們創(chuàng)建裝飾器的順序很重要。如果我們將 RetryRateLimiter 包裝在一起,它將不起作用。

RateLimiter 事件

RateLimiter 有一個(gè) EventPublisher,它在調(diào)用遠(yuǎn)程操作時(shí)生成 RateLimiterOnSuccessEventRateLimiterOnFailureEvent 類型的事件,以指示獲取權(quán)限是否成功。我們可以監(jiān)聽這些事件并記錄它們,例如:

RateLimiter limiter = registry.rateLimiter("flightSearchService");limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));limiter.getEventPublisher().onFailure(e -> System.out.println(e.toString()));

日志輸出示例如下:

RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName=flightSearchService, creationTime=2020-07-21T19:14:33.127+05:30}... other lines omitted ...RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName=flightSearchService, creationTime=2020-07-21T19:14:33.186+05:30}

RateLimiter 指標(biāo)

假設(shè)在實(shí)施客戶端節(jié)流后,我們發(fā)現(xiàn) API 的響應(yīng)時(shí)間增加了。這是可能的 - 正如我們所見,如果在線程調(diào)用遠(yuǎn)程操作時(shí)權(quán)限不可用,RateLimiter 會(huì)將線程置于等待狀態(tài)。

如果我們的請(qǐng)求處理線程經(jīng)常等待獲得許可,則可能意味著我們的 limitForPeriod 太低。也許我們需要與我們的服務(wù)提供商合作并首先獲得額外的配額。

監(jiān)控 RateLimiter 指標(biāo)可幫助我們識(shí)別此類容量問題,并確保我們?cè)?RateLimiterConfig 上設(shè)置的值運(yùn)行良好。

RateLimiter 跟蹤兩個(gè)指標(biāo):可用權(quán)限的數(shù)量(
resilience4j.ratelimiter.available.permissions)和等待權(quán)限的線程數(shù)量(
resilience4j.ratelimiter.waiting.threads)。

首先,我們像往常一樣創(chuàng)建 RateLimiterConfig、RateLimiterRegistryRateLimiter。然后,我們創(chuàng)建一個(gè) MeterRegistry 并將 RateLimiterRegistry 綁定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();TaggedRateLimiterMetrics.ofRateLimiterRegistry(registry)  .bindTo(meterRegistry);

運(yùn)行幾次限速操作后,我們顯示捕獲的指標(biāo):

Consumer meterConsumer = meter -> {  String desc = meter.getId().getDescription();  String metricName = meter.getId().getName();  Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)    .filter(m -> m.getStatistic().name().equals("VALUE"))    .findFirst()    .map(m -> m.getValue())    .orElse(0.0);  System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

這是一些示例輸出:

The number of available permissions - resilience4j.ratelimiter.available.permissions: -6.0The number of waiting threads - resilience4j.ratelimiter.waiting_threads: 7.0

resilience4j.ratelimiter.available.permissions 的負(fù)值顯示為請(qǐng)求線程保留的權(quán)限數(shù)。在實(shí)際應(yīng)用中,我們會(huì)定期將數(shù)據(jù)導(dǎo)出到監(jiān)控系統(tǒng),并在儀表板上進(jìn)行分析。

實(shí)施客戶端速率限制時(shí)的陷阱和良好實(shí)踐

使速率限制器成為單例

對(duì)給定遠(yuǎn)程服務(wù)的所有調(diào)用都應(yīng)通過相同的 RateLimiter 實(shí)例。對(duì)于給定的遠(yuǎn)程服務(wù),RateLimiter 必須是單例。

如果我們不強(qiáng)制執(zhí)行此操作,我們代碼庫(kù)的某些區(qū)域可能會(huì)繞過 RateLimiter 直接調(diào)用遠(yuǎn)程服務(wù)。為了防止這種情況,對(duì)遠(yuǎn)程服務(wù)的實(shí)際調(diào)用應(yīng)該在核心、內(nèi)部層和其他區(qū)域應(yīng)該使用內(nèi)部層暴露的限速裝飾器。

我們?nèi)绾未_保未來的新開發(fā)人員理解這一意圖?查看 Tom 的文章,其中揭示了一種解決此類問題的方法,即通過組織包結(jié)構(gòu)來明確此類意圖。此外,它還展示了如何通過在 ArchUnit 測(cè)試中編碼意圖來強(qiáng)制執(zhí)行此操作。

為多個(gè)服務(wù)器實(shí)例配置速率限制器

為配置找出正確的值可能很棘手。如果我們?cè)诩褐羞\(yùn)行多個(gè)服務(wù)實(shí)例,limitForPeriod 的值必須考慮到這一點(diǎn)。

例如,如果上游服務(wù)的速率限制為 100 rps,而我們的服務(wù)有 4 個(gè)實(shí)例,那么我們將配置 25 rps 作為每個(gè)實(shí)例的限制。

然而,這假設(shè)我們每個(gè)實(shí)例上的負(fù)載大致相同。 如果情況并非如此,或者如果我們的服務(wù)本身具有彈性并且實(shí)例數(shù)量可能會(huì)有所不同,那么 Resilience4j 的 RateLimiter 可能不適合。

在這種情況下,我們需要一個(gè)速率限制器,將其數(shù)據(jù)保存在分布式緩存中,而不是像 Resilience4j RateLimiter 那樣保存在內(nèi)存中。但這會(huì)影響我們服務(wù)的響應(yīng)時(shí)間。另一種選擇是實(shí)現(xiàn)某種自適應(yīng)速率限制。盡管 Resilience4j 可能會(huì)支持它,但尚不清楚何時(shí)可用。

選擇正確的超時(shí)時(shí)間

對(duì)于 timeoutDuration 配置值,我們應(yīng)該牢記 API 的預(yù)期響應(yīng)時(shí)間。

如果我們將 timeoutDuration 設(shè)置得太高,響應(yīng)時(shí)間和吞吐量就會(huì)受到影響。如果它太低,我們的錯(cuò)誤率可能會(huì)增加。

由于此處可能涉及一些反復(fù)試驗(yàn),因此一個(gè)好的做法是將我們?cè)?RateLimiterConfig 中使用的值(如 timeoutDuration、limitForPeriodlimitRefreshPeriod)作為我們服務(wù)之外的配置進(jìn)行維護(hù)。然后我們可以在不更改代碼的情況下更改它們。

調(diào)優(yōu)客戶端和服務(wù)器端速率限制器

實(shí)現(xiàn)客戶端速率限制并不能保證我們永遠(yuǎn)不會(huì)受到上游服務(wù)的速率限制。

假設(shè)我們有來自上游服務(wù)的 2 rps 的限制,并且我們將 limitForPeriod 配置為 2,將 limitRefreshPeriod 配置為 1s。如果我們?cè)诘诙氲淖詈髱缀撩氚l(fā)出兩個(gè)請(qǐng)求,在此之前沒有其他調(diào)用,RateLimiter 將允許它們。如果我們?cè)谙乱幻氲那皫缀撩雰?nèi)再進(jìn)行兩次調(diào)用,RateLimiter 也會(huì)允許它們,因?yàn)橛袃蓚€(gè)新權(quán)限可用。但是上游服務(wù)可能會(huì)拒絕這兩個(gè)請(qǐng)求,因?yàn)榉?wù)器通常會(huì)實(shí)現(xiàn)基于滑動(dòng)窗口的速率限制。

為了保證我們永遠(yuǎn)不會(huì)從上游服務(wù)中獲得超過速率,我們需要將客戶端中的固定窗口配置為短于服務(wù)中的滑動(dòng)窗口。因此,如果我們?cè)谇懊娴氖纠袑?limitForPeriod 配置為 1 并將 limitRefreshPeriod 配置為 500ms,我們就不會(huì)出現(xiàn)超出速率限制的錯(cuò)誤。但是,第一個(gè)請(qǐng)求之后的所有三個(gè)請(qǐng)求都會(huì)等待,從而增加響應(yīng)時(shí)間并降低吞吐量。

結(jié)論

在本文中,我們學(xué)習(xí)了如何使用 Resilience4j 的 RateLimiter 模塊來實(shí)現(xiàn)客戶端速率限制。 我們通過實(shí)際示例研究了配置它的不同方法。我們學(xué)習(xí)了一些在實(shí)施速率限制時(shí)要記住的良好做法和注意事項(xiàng)。

您可以使用 GitHub 上的代碼演示一個(gè)完整的應(yīng)用程序來說明這些想法。


本文譯自: Implementing Rate Limiting with Resilience4j - Reflectoring