摘要:轉(zhuǎn)載請(qǐng)注明出處翻譯下面的代碼展示了版的查看源碼的同等實(shí)現(xiàn)如下可以通過(guò)調(diào)用方法實(shí)現(xiàn)同步執(zhí)行示例如下測(cè)試如下不提供同步執(zhí)行方法但是如果確定其只會(huì)產(chǎn)生一個(gè)值那么也可以用如下方式實(shí)現(xiàn)如果實(shí)際上產(chǎn)生了多個(gè)值上述的代碼將會(huì)拋出可以通過(guò)調(diào)用方法實(shí)現(xiàn)異步
轉(zhuǎn)載請(qǐng)注明出處: 翻譯:Hystrix - How To Use
Hello World!下面的代碼展示了HystrixCommand版的Hello World:
public class CommandHelloWorld extends HystrixCommand{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // a real example would do work like a network call here return "Hello " + name + "!"; } }
查看源碼
HystrixObservableCommand的同等實(shí)現(xiàn)如下:
public class CommandHelloWorld extends HystrixObservableCommandSynchronous Execution{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable construct() { return Observable.create(new Observable.OnSubscribe () { @Override public void call(Subscriber super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
可以通過(guò)調(diào)用HystrixCommand.execute()方法實(shí)現(xiàn)同步執(zhí)行, 示例如下:
String s = new CommandHelloWorld("World").execute();
測(cè)試如下:
@Test public void testSynchronous() { assertEquals("Hello World!", new CommandHelloWorld("World").execute()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute()); }
HystrixObservableCommand不提供同步執(zhí)行方法, 但是如果確定其只會(huì)產(chǎn)生一個(gè)值, 那么也可以用如下方式實(shí)現(xiàn):
HystrixObservableCommand.observe().observe().toBlocking().toFuture().get()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture().get()
如果實(shí)際上產(chǎn)生了多個(gè)值, 上述的代碼將會(huì)拋出java.lang.IllegalArgumentException: Sequence contains too many elements.
Asynchronous Execution可以通過(guò)調(diào)用HystrixCommand.queue()方法實(shí)現(xiàn)異步執(zhí)行, 示例如下:
Futurefs = new CommandHelloWorld("World").queue();
此時(shí)可以通過(guò)Future.get()方法獲取command執(zhí)行結(jié)果:
String s = fs.get();
測(cè)試代碼如下:
@Test public void testAsynchronous1() throws Exception { assertEquals("Hello World!", new CommandHelloWorld("World").queue().get()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get()); } @Test public void testAsynchronous2() throws Exception { FuturefWorld = new CommandHelloWorld("World").queue(); Future fBob = new CommandHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }
下面的兩種實(shí)現(xiàn)是等價(jià)的:
String s1 = new CommandHelloWorld("World").execute(); String s2 = new CommandHelloWorld("World").queue().get();
HystrixObservableCommand不提供queue方法, 但是如果確定其只會(huì)產(chǎn)生一個(gè)值, 那么也可以用如下方式實(shí)現(xiàn):
HystrixObservableCommand.observe().observe().toBlocking().toFuture()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture()
如果實(shí)際上產(chǎn)生了多個(gè)值, 上述的代碼將會(huì)拋出java.lang.IllegalArgumentException: Sequence contains too many elements.
Reactive Execution你也可以將HystrixCommand當(dāng)做一個(gè)可觀察對(duì)象(Observable)來(lái)觀察(Observe)其產(chǎn)生的結(jié)果, 可以使用以下任意一個(gè)方法實(shí)現(xiàn):
observe(): 一旦調(diào)用該方法, 請(qǐng)求將立即開(kāi)始執(zhí)行, 其利用ReplaySubject特性可以保證不會(huì)丟失任何command產(chǎn)生的結(jié)果, 即使結(jié)果在你訂閱之前產(chǎn)生的也不會(huì)丟失.
toObservable(): 調(diào)用該方法后不會(huì)立即執(zhí)行請(qǐng)求, 而是當(dāng)有訂閱者訂閱時(shí)才會(huì)執(zhí)行.
Observableho = new CommandHelloWorld("World").observe(); // or Observable co = new CommandHelloWorld("World").toObservable();
然后你可以通過(guò)訂閱到這個(gè)Observable來(lái)取得command產(chǎn)生的結(jié)果:
ho.subscribe(new Action1() { @Override public void call(String s) { // value emitted here } });
測(cè)試如下:
@Test public void testObservable() throws Exception { ObservablefWorld = new CommandHelloWorld("World").observe(); Observable fBob = new CommandHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlockingObservable().single()); assertEquals("Hello Bob!", fBob.toBlockingObservable().single()); // non-blocking // - this is a verbose anonymous inner-class approach and doesn"t do assertions fWorld.subscribe(new Observer () { @Override public void onCompleted() { // nothing needed here } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); // non-blocking // - also verbose anonymous inner-class // - ignore errors and onCompleted signal fBob.subscribe(new Action1 () { @Override public void call(String v) { System.out.println("onNext: " + v); } }); }
使用Java 8的Lambda表達(dá)式可以使代碼更簡(jiǎn)潔:
fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }, (exception) -> { exception.printStackTrace(); })
關(guān)于Observable的信息可以在這里查閱
Reactive Commands相比將HystrixCommand使用上述方法轉(zhuǎn)換成一個(gè)Observable, 你也可以選擇創(chuàng)建一個(gè)HystrixObservableCommand對(duì)象. HystrixObservableCommand包裝的Observable允許產(chǎn)生多個(gè)結(jié)果(譯者注: Subscriber.onNext可以調(diào)用多次), 而HystrixCommand即使轉(zhuǎn)換成了Observable也只能產(chǎn)生一個(gè)結(jié)果.
使用HystrixObservableCommnad時(shí), 你需要重載construct方法來(lái)實(shí)現(xiàn)你的業(yè)務(wù)邏輯, 而不是重載run方法, contruct方法將會(huì)返回你需要包裝的Observable.
使用下面任意一個(gè)方法可以從HystrixObservableCommand中獲取Observable對(duì)象:
observe(): 一旦調(diào)用該方法, 請(qǐng)求將立即開(kāi)始執(zhí)行, 其利用ReplaySubject特性可以保證不會(huì)丟失任何command產(chǎn)生的結(jié)果, 即使結(jié)果在你訂閱之前產(chǎn)生的也不會(huì)丟失.
toObservable(): 調(diào)用該方法后不會(huì)立即執(zhí)行請(qǐng)求, 而是當(dāng)有訂閱者訂閱時(shí)才會(huì)執(zhí)行.
Fallback大多數(shù)情況下, 我們都希望command在執(zhí)行失敗時(shí)能夠有一個(gè)候選方法來(lái)處理, 如: 返回一個(gè)默認(rèn)值或執(zhí)行其他失敗處理邏輯, 除了以下幾個(gè)情況:
執(zhí)行寫操作的command: 當(dāng)command的目標(biāo)是執(zhí)行寫操作而不是讀操作, 那么通常需要將寫操作失敗的錯(cuò)誤交給調(diào)用者處理.
批處理系統(tǒng)/離線計(jì)算: 如果command的目標(biāo)是做一些離線計(jì)算、生成報(bào)表、填充緩存等, 那么同樣應(yīng)該將失敗交給調(diào)用者處理.
無(wú)論command是否實(shí)現(xiàn)了getFallback()方法, command執(zhí)行失敗時(shí), Hystrix的狀態(tài)和斷路器(circuit-breaker)的狀態(tài)/指標(biāo)都會(huì)進(jìn)行更新.
HystrixCommand可以通過(guò)實(shí)現(xiàn)getFallback()方法來(lái)實(shí)現(xiàn)降級(jí)處理, run()方法異常、執(zhí)行超時(shí)、線程池或信號(hào)量已滿拒絕提供服務(wù)、斷路器短路時(shí), 都會(huì)調(diào)用getFallback():
public class CommandHelloFailure extends HystrixCommand{ private final String name; public CommandHelloFailure(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { throw new RuntimeException("this command always fails"); } @Override protected String getFallback() { return "Hello Failure " + name + "!"; } }
查看源碼
這個(gè)命令的run()方法總是會(huì)執(zhí)行失敗, 但是調(diào)用者總是能收到getFallback()方法返回的值, 而不是收到一個(gè)異常:
@Test public void testSynchronous() { assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute()); assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute()); }
HystrixObservableCommand可以通過(guò)重載resumeWithFallback方法實(shí)現(xiàn)原Observable執(zhí)行失敗時(shí)返回回另一個(gè)Observable, 需要注意的是, 原Observable有可能在發(fā)出多個(gè)結(jié)果之后才出現(xiàn)錯(cuò)誤, 因此在fallback實(shí)現(xiàn)的邏輯中不應(yīng)該假設(shè)訂閱者只會(huì)收到失敗邏輯中發(fā)出的結(jié)果.
Hystrix內(nèi)部使用了RxJava的onErrorResumeNext操作符來(lái)實(shí)現(xiàn)Observable之間的無(wú)縫轉(zhuǎn)移.
Error Propagation除HystrixBadRequestException異常外, run方法中拋出的所有異常都會(huì)被認(rèn)為是執(zhí)行失敗且會(huì)觸發(fā)getFallback()方法和斷路器的邏輯.
你可以在HystrixBadRequestException中包裝想要拋出的異常, 然后通過(guò)getCause()方法獲取. HystrixBadRequestException使用在不應(yīng)該被錯(cuò)誤指標(biāo)(failure metrics)統(tǒng)計(jì)和不應(yīng)該觸發(fā)getFallback()方法的場(chǎng)景, 例如報(bào)告參數(shù)不合法或者非系統(tǒng)異常等.
對(duì)于HystrixObservableCommand, 不可恢復(fù)的錯(cuò)誤都會(huì)在通過(guò)onError方法通知, 并通過(guò)獲取用戶實(shí)現(xiàn)的resumeWithFallback()方法返回的Observable來(lái)完成回退機(jī)制.
執(zhí)行異常類型Failure Type | Exception class | Exception.cause |
---|---|---|
FAILURE | HystrixRuntimeException | underlying exception(user-controlled) |
TIMEOUT | HystrixRuntimeException | j.u.c.TimeoutException |
SHORT_CIRCUITED | HystrixRuntimeException | j.l.RuntimeException |
THREAD_POOL_REJECTED | HystrixRuntimeException | j.u.c.RejectedExecutionException |
SEMAPHORE_REJECTED | HystrixRuntimeException | j.l.RuntimeException |
BAD_REQUEST | HystrixBadRequestException | underlying exception(user-controller) |
默認(rèn)的command name是從類名中派生的:
getClass().getSimpleName()
可以通過(guò)HystrixCommand或HystrixObservableCommand的構(gòu)造器來(lái)指定command name:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))); this.name = name; }
可以通過(guò)如下方式來(lái)重用Setter:
private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")); public CommandHelloWorld(String name) { super(cachedSetter); this.name = name; }
HystrixCommandKey是一個(gè)接口, 因此可以將其實(shí)現(xiàn)為一個(gè)枚舉或者常規(guī)的類, 但是它已經(jīng)內(nèi)置了一個(gè)Factory類來(lái)構(gòu)建幫助構(gòu)建內(nèi)部實(shí)例, 使用方式如下:
HystrixCommandKey.Factory.asKey("Hello World");Command Group
Hystrix使用command group來(lái)為分組, 分組信息主要用于報(bào)告、警報(bào)、儀表盤上顯示, 或者是標(biāo)識(shí)團(tuán)隊(duì)/庫(kù)的擁有者.
默認(rèn)情況下, 除非已經(jīng)用這個(gè)名字定義了一個(gè)信號(hào)量, 否則 Hystrix將使用這個(gè)名稱來(lái)定義command的線程池.
HystrixCommandGroupKey是一個(gè)接口, 因此可以將其實(shí)現(xiàn)為一個(gè)枚舉或者常規(guī)的類, 但是它已經(jīng)內(nèi)置了一個(gè)Factory類來(lái)構(gòu)建幫助構(gòu)建內(nèi)部實(shí)例, 使用方式如下:
HystrixCommandGroupKey.Factory.asKey("Example Group")Command Thread-pool
thread-pool key主要用于在監(jiān)控、指標(biāo)發(fā)布、緩存等類似場(chǎng)景中標(biāo)識(shí)一個(gè)HystrixThreadPool, 一個(gè)HystrixCommand于其構(gòu)造函數(shù)中傳入的HystrixThreadPoolKey指定的HystrixThreadPool相關(guān)聯(lián), 如果未指定的話, 則使用HystrixCommandGroupKey來(lái)獲取/創(chuàng)建HystrixThreadPool.
可以通過(guò)HystrixCommand或HystrixObservableCommand的構(gòu)造器來(lái)指定其值:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"))); this.name = name; }
HystrixCommandThreadPoolKey是一個(gè)接口, 因此可以將其實(shí)現(xiàn)為一個(gè)枚舉或者常規(guī)的類, 但是它已經(jīng)內(nèi)置了一個(gè)Factory類來(lái)構(gòu)建幫助構(gòu)建內(nèi)部實(shí)例, 使用方式如下:
HystrixThreadPoolKey.Factory.asKey("Hello World Pool")
使用HystrixThreadPoolKey而不是使用不同的HystrixCommandGroupKey的原因是: 可能會(huì)有多條command在邏輯功能上屬于同一個(gè)組(group), 但是其中的某些command需要和其他command隔離開(kāi), 例如:
兩條用于訪問(wèn)視頻元數(shù)據(jù)的command
兩條command的group name都是VideoMetadata
command A與資源#1互斥
command B與資源#2互斥
如果command A由于延遲等原因?qū)е缕渌诘木€程池資源耗盡, 不應(yīng)該影響command B對(duì)#2的執(zhí)行, 因?yàn)樗麄冊(cè)L問(wèn)的是不同的后端資源.
因此, 從邏輯上來(lái)說(shuō), 我們希望這兩條command應(yīng)該被分到同一個(gè)分組, 但是我們同樣系統(tǒng)將這兩條命令的執(zhí)行隔離開(kāi)來(lái), 因此我們使用HystrixThreadPoolKey將其分配到不同的線程池.
Request Cache可以通過(guò)實(shí)現(xiàn)HystrixCommand或HystrixObservableCommand的getCacheKey()方法開(kāi)啟用對(duì)請(qǐng)求的緩存功能:
public class CommandUsingRequestCache extends HystrixCommand{ private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); } }
由于該功能依賴于請(qǐng)求的上下文信息, 因此我們必須初始化一個(gè)HystrixRequestContext, 使用方式如下:
@Test public void testWithoutCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }
通常情況下, 上下文信息(HystrixRequestContext)應(yīng)該在持有用戶請(qǐng)求的ServletFilter或者其他擁有生命周期管理功能的類來(lái)初始化和關(guān)閉.
下面的例子展示了command如何從緩存中獲取數(shù)據(jù), 以及如何查詢一個(gè)數(shù)據(jù)是否是從緩存中獲取到的:
@Test public void testWithCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command2a = new CommandUsingRequestCache(2); CommandUsingRequestCache command2b = new CommandUsingRequestCache(2); assertTrue(command2a.execute()); // this is the first time we"ve executed this command with // the value of "2" so it should not be from cache assertFalse(command2a.isResponseFromCache()); assertTrue(command2b.execute()); // this is the second time we"ve executed this command with // the same value so it should return from cache assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } // start a new request context context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command3b = new CommandUsingRequestCache(2); assertTrue(command3b.execute()); // this is a new request context so this // should not come from cache assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } }Request Collapsing
請(qǐng)求合并可以用于將多條請(qǐng)求綁定到一起, 由同一個(gè)HystrixCommand實(shí)例執(zhí)行.
collapser可以通過(guò)batch size和batch創(chuàng)建以來(lái)的耗時(shí)來(lái)自動(dòng)將請(qǐng)求合并執(zhí)行.
Hystrix支持兩個(gè)請(qǐng)求合并方式: 請(qǐng)求級(jí)的合并和全局級(jí)的合并. 默認(rèn)是請(qǐng)求范圍的合并, 可以在構(gòu)造collapser時(shí)指定值.
請(qǐng)求級(jí)(request-scoped)的collapser只會(huì)合并每一個(gè)HystrixRequestContext中的請(qǐng)求, 而全局級(jí)(globally-scoped)的collapser則可以跨HystrixRequestContext合并請(qǐng)求. 因此, 如果你下游的依賴者無(wú)法再一個(gè)command中處理多個(gè)HystrixRequestContext的話, 那么你應(yīng)該使用請(qǐng)求級(jí)的合并.
在Netflix, 我們只會(huì)使用請(qǐng)求級(jí)的合并, 因?yàn)槲覀儺?dāng)前所有的系統(tǒng)都是基于一個(gè)command對(duì)應(yīng)一個(gè)HystrixRequestContext的設(shè)想下構(gòu)建的. 因此, 當(dāng)一個(gè)command使用不同的參數(shù)在一個(gè)請(qǐng)求中并發(fā)執(zhí)行時(shí), 合并是有效的.
下面的代碼展示了如何實(shí)現(xiàn)請(qǐng)求級(jí)的HystrixCollapser:
public class CommandCollapserGetValueForKey extends HystrixCollapser, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand
> createCommand(final Collection
> requests) { return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List batchResponse, Collection > requests) { int count = 0; for (CollapsedRequest request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand > { private final Collection
> requests; private BatchCommand(Collection > requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List run() { ArrayList response = new ArrayList (); for (CollapsedRequest request : requests) { // artificial response for each argument received in the batch response.add("ValueForKey: " + request.getArgument()); } return response; } } }
下面的代碼展示了如果使用collapser自動(dòng)合并4個(gè)CommandCollapserGetValueForKey到一個(gè)HystrixCommand中執(zhí)行:
@Test public void testCollapser() throws Exception { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { FutureRequest Context Setupf1 = new CommandCollapserGetValueForKey(1).queue(); Future f2 = new CommandCollapserGetValueForKey(2).queue(); Future f3 = new CommandCollapserGetValueForKey(3).queue(); Future f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals("ValueForKey: 1", f1.get()); assertEquals("ValueForKey: 2", f2.get()); assertEquals("ValueForKey: 3", f3.get()); assertEquals("ValueForKey: 4", f4.get()); // assert that the batch command "GetValueForKey" was in fact // executed and that it executed only once assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand>[1])[0]; // assert the command is the one we"re expecting assertEquals("GetValueForKey", command.getCommandKey().name()); // confirm that it was a COLLAPSED command execution assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); // and that it was successful assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); } }
使用請(qǐng)求級(jí)的特性時(shí)(如: 請(qǐng)求緩存、請(qǐng)求合并、請(qǐng)求日志)你必須管理HystrixRequestContext的生命周期(或者實(shí)現(xiàn)HystrixConcurrencyStategy).
這意味著你必須在請(qǐng)求之前執(zhí)行如下代碼:
HystrixRequestContext context = HystrixRequestContext.initializeContext();
并在請(qǐng)求結(jié)束后執(zhí)行如下代碼:
context.shutdown();
在標(biāo)準(zhǔn)的Java web應(yīng)用中, 你可以使用Setvlet Filter實(shí)現(xiàn)的如下的過(guò)濾器來(lái)管理:
public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } } }
可以在web.xml中加入如下代碼實(shí)現(xiàn)對(duì)所有的請(qǐng)求都使用該過(guò)濾器:
Common PatternsHystrixRequestContextServletFilter HystrixRequestContextServletFilter com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter HystrixRequestContextServletFilter /*
以下是HystrixCommand和HystrixObservableCommand的一般用法和使用模式.
Fail Fast最基本的使用是執(zhí)行一條只做一件事情且沒(méi)有實(shí)現(xiàn)回退方法的command, 這樣的command在發(fā)生任何錯(cuò)誤時(shí)都會(huì)拋出異常:
public class CommandThatFailsFast extends HystrixCommand{ private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } }
下面的代碼演示了上述行為:
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsFast(false).execute()); } @Test public void testFailure() { try { new CommandThatFailsFast(true).execute(); fail("we should have thrown an exception"); } catch (HystrixRuntimeException e) { assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage()); e.printStackTrace(); } }
HystrixObservableCommand需要重載resumeWithFallback()方法來(lái)實(shí)現(xiàn)同樣的行為:
@Override protected ObservableFail SilentresumeWithFallback() { if (throwException) { return Observable.error(new Throwable("failure from CommandThatFailsFast")); } else { return Observable.just("success"); } }
靜默失敗等同于返回一個(gè)空的響應(yīng)或者移除功能. 可以是返回null、空Map、空List, 或者其他類似的響應(yīng).
可以通過(guò)實(shí)現(xiàn)HystrixCommand.getFallback()方法實(shí)現(xiàn)該功能:
public class CommandThatFailsSilently extends HystrixCommand{ private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } } @Override protected String getFallback() { return null; } }
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsSilently(false).execute()); } @Test public void testFailure() { try { assertEquals(null, new CommandThatFailsSilently(true).execute()); } catch (HystrixRuntimeException e) { fail("we should not get an exception as we fail silently with a fallback"); } }
或者返回一個(gè)空List的實(shí)現(xiàn)如下:
@Override protected ListgetFallback() { return Collections.emptyList(); }
HystrixObservableCommand可以通過(guò)重載resumeWithFallback()方法實(shí)現(xiàn)同樣的行為:
@Override protected ObservableFallback: StaticresumeWithFallback() { return Observable.empty(); }
Fallback可以返回代碼里設(shè)定的默認(rèn)值, 這種方式可以通過(guò)默認(rèn)行為來(lái)有效避免于靜默失敗帶來(lái)影響.
例如, 如果一個(gè)應(yīng)返回true/false的用戶認(rèn)證的command執(zhí)行失敗了, 那么其默認(rèn)行為可以如下:
@Override protected Boolean getFallback() { return true; }
對(duì)于HystrixObservableCommand可以通過(guò)重載resumeWithFallback()方法實(shí)現(xiàn)同樣的行為:
@Override protected ObservableFallback: StubbedresumeWithFallback() { return Observable.just( true ); }
當(dāng)command返回的是一個(gè)包含多個(gè)字段的復(fù)合對(duì)象, 且該對(duì)象的一部分字段值可以通過(guò)其他請(qǐng)求狀態(tài)獲得, 另一部分狀態(tài)可以通過(guò)設(shè)置默認(rèn)值獲得時(shí), 你通常需要使用存根(stubbed)模式.
你可能可以從存根值(stubbed values)中得到適當(dāng)?shù)闹档那闆r如下:
cookies
請(qǐng)求參數(shù)和請(qǐng)求頭
當(dāng)前失敗請(qǐng)求的前一個(gè)服務(wù)請(qǐng)求的響應(yīng)
在fallback代碼塊內(nèi)可以靜態(tài)地獲取請(qǐng)求范圍內(nèi)的存根(stubbed)值, 但是通常我們更推薦在構(gòu)建command實(shí)例時(shí)注入這些值, 就像下面實(shí)例的代碼中的countryCodeFromGeoLookup一樣:
public class CommandWithStubbedFallback extends HystrixCommand{ private final int customerId; private final String countryCodeFromGeoLookup; /** * @param customerId * The customerID to retrieve UserAccount for * @param countryCodeFromGeoLookup * The default country code from the HTTP request geo code lookup used for fallback. */ protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.customerId = customerId; this.countryCodeFromGeoLookup = countryCodeFromGeoLookup; } @Override protected UserAccount run() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException("forcing failure for example"); } @Override protected UserAccount getFallback() { /** * Return stubbed fallback with some static defaults, placeholders, * and an injected value "countryCodeFromGeoLookup" that we"ll use * instead of what we would have retrieved from the remote service. */ return new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false); } public static class UserAccount { private final int customerId; private final String name; private final String countryCode; private final boolean isFeatureXPermitted; private final boolean isFeatureYPermitted; private final boolean isFeatureZPermitted; UserAccount(int customerId, String name, String countryCode, boolean isFeatureXPermitted, boolean isFeatureYPermitted, boolean isFeatureZPermitted) { this.customerId = customerId; this.name = name; this.countryCode = countryCode; this.isFeatureXPermitted = isFeatureXPermitted; this.isFeatureYPermitted = isFeatureYPermitted; this.isFeatureZPermitted = isFeatureZPermitted; } } }
下面的代碼演示了上述行為:
@Test public void test() { CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "ca"); UserAccount account = command.execute(); assertTrue(command.isFailedExecution()); assertTrue(command.isResponseFromFallback()); assertEquals(1234, account.customerId); assertEquals("ca", account.countryCode); assertEquals(true, account.isFeatureXPermitted); assertEquals(true, account.isFeatureYPermitted); assertEquals(false, account.isFeatureZPermitted); }
對(duì)于HystrixObservableCommand可以通過(guò)重載resumeWithFallback()方法實(shí)現(xiàn)同樣的行為:
@Override protected ObservableresumeWithFallback() { return Observable.just( new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false) ); }
如果你想要從Observable中發(fā)出多個(gè)值, 那么當(dāng)失敗發(fā)生時(shí), 原本的Observable可能已經(jīng)發(fā)出的一部分值, 此時(shí)你或許更希望能夠只從fallback邏輯中發(fā)出另一部分未被發(fā)出的值, 下面的例子就展示了如何實(shí)現(xiàn)這一個(gè)目的: 它通過(guò)追蹤原Observable發(fā)出的最后一個(gè)值來(lái)實(shí)現(xiàn)fallback邏輯中的Observable應(yīng)該從什么地方繼續(xù)發(fā)出存根值(stubbed value) :
@Override protected ObservableFallback: Cache via Networkconstruct() { return Observable.just(1, 2, 3) .concatWith(Observable. error(new RuntimeException("forced error"))) .doOnNext(new Action1 () { @Override public void call(Integer t1) { lastSeen = t1; } }) .subscribeOn(Schedulers.computation()); } @Override protected Observable resumeWithFallback() { if (lastSeen < 4) { return Observable.range(lastSeen + 1, 4 - lastSeen); } else { return Observable.empty(); } }
有時(shí)后端的服務(wù)異常也會(huì)引起command執(zhí)行失敗, 此時(shí)我們也可以從緩存中(如: memcached)取得相關(guān)的數(shù)據(jù).
由于在fallback的邏輯代碼中訪問(wèn)網(wǎng)絡(luò)可能會(huì)再次失敗, 因此必須構(gòu)建新的HystrixCommand或HystrixObservableCommand來(lái)執(zhí)行:
很重要的一點(diǎn)是執(zhí)行fallback邏輯的command需要在一個(gè)不同的線程池中執(zhí)行, 否則如果原command的延遲變高且其所在線程池已經(jīng)滿了的話, 執(zhí)行fallback邏輯的command將無(wú)法在同一個(gè)線程池中執(zhí)行.
下面的代碼展示了CommandWithFallbackViaNetwork如何在getFallback()方法中執(zhí)行FallbackViaNetwork.
注意, FallbackViaNetwork同樣也具有回退機(jī)制, 這里通過(guò)返回null來(lái)實(shí)現(xiàn)fail silent.
FallbackViaNetwork默認(rèn)會(huì)從HystrixCommandGroupKey中繼承線程池的配置RemoteServiceX, 因此需要在其構(gòu)造器中注入HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")來(lái)使其在不同的線程池中執(zhí)行.
這樣, CommandWithFallbackViaNetwork會(huì)在名為RemoteServiceX的線程池中執(zhí)行, 而FallbackViaNetwork會(huì)在名為RemoteServiceXFallback的線程池中執(zhí)行.
public class CommandWithFallbackViaNetwork extends HystrixCommandPrimary + Secondary with Fallback{ private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand"))); this.id = id; } @Override protected String run() { // RemoteServiceXClient.getValue(id); throw new RuntimeException("force failure for example"); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand { private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand")) // use a different threadpool for the fallback command // so saturating the RemoteServiceX pool won"t prevent // fallbacks from executing .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback"))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { // the fallback also failed // so this fallback-of-a-fallback will // fail silently and return null return null; } } }
有些系統(tǒng)可能具有是以雙系統(tǒng)模式搭建的 — 主從模式或主備模式.
有時(shí)從系統(tǒng)或備用系統(tǒng)會(huì)被認(rèn)為是失敗狀態(tài)的一種, 僅在執(zhí)行fallback邏輯是才使用它;這種場(chǎng)景和Cache via Network一節(jié)中描述的場(chǎng)景是一樣的.
然而, 如果切換到從系統(tǒng)是一個(gè)很正常時(shí), 例如發(fā)布新代碼時(shí)(這是有狀態(tài)的系統(tǒng)發(fā)布代碼的一種方式), 此時(shí)每當(dāng)切換到從系統(tǒng)使用時(shí), 主系統(tǒng)都是處于不可用狀態(tài),斷路器將會(huì)打開(kāi)且發(fā)出警報(bào).
這并不是我們期望發(fā)生的事, 這種狼來(lái)了式的警報(bào)可能會(huì)導(dǎo)致真正發(fā)生問(wèn)題的時(shí)候我們卻把它當(dāng)成正常的誤報(bào)而忽略了.
因此, 我們可以通過(guò)在其前面放置一個(gè)門面HystrixCommand(見(jiàn)下文), 將主/從系統(tǒng)的切換視為正常的、健康的狀態(tài).
主從HystrixCommand都是需要訪問(wèn)網(wǎng)絡(luò)且實(shí)現(xiàn)了特定的業(yè)務(wù)邏輯, 因此其實(shí)現(xiàn)上應(yīng)該是線程隔離的. 它們可能具有顯著的性能差距(通常從系統(tǒng)是一個(gè)靜態(tài)緩存), 因此將兩個(gè)command隔離的另一個(gè)好處是可以針對(duì)性地調(diào)優(yōu).
你不需要將這兩個(gè)command都公開(kāi)發(fā)布, 只需要將它們隱藏在另一個(gè)由信號(hào)量隔離的HystrixCommand中(稱之為門面HystrixCommand), 在這個(gè)command中去實(shí)現(xiàn)主系統(tǒng)還是從系統(tǒng)的調(diào)用選擇. 只有當(dāng)主從系統(tǒng)都失敗了, 才會(huì)去執(zhí)行這個(gè)門面command的fallback邏輯.
門面HystrixCommand可以使用信號(hào)量隔離的, 因?yàn)槠錁I(yè)務(wù)邏輯僅僅是調(diào)用另外兩個(gè)線程隔離的HystrixCommand, 它不涉及任何的網(wǎng)絡(luò)訪問(wèn)、重試等容易出錯(cuò)的事, 因此沒(méi)必要將這部分代碼放到其他線程去執(zhí)行.
public class CommandFacadeWithPrimarySecondary extends HystrixCommandClient Doesn"t Perform Network Access{ private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand")) .andCommandPropertiesDefaults( // we want to default to semaphore-isolation since this wraps // 2 others commands that are already thread isolated HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive "primary" service call return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast "secondary" service call return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
當(dāng)你使用HystrixCommand實(shí)現(xiàn)的業(yè)務(wù)邏輯不涉及到網(wǎng)絡(luò)訪問(wèn)、對(duì)延遲敏感且無(wú)法接受多線程帶來(lái)的開(kāi)銷時(shí), 你需要設(shè)置executionIsolationStrategy)屬性的值為ExecutionIsolationStrategy.SEMAPHORE, 此時(shí)Hystrix會(huì)使用信號(hào)量隔離代替線程隔離.
下面的代碼展示了如何為command設(shè)置該屬性(也可以在運(yùn)行時(shí)動(dòng)態(tài)改變這個(gè)屬性的值):
public class CommandUsingSemaphoreIsolation extends HystrixCommandGet-Set-Get with Request Cache Invalidation{ private final int id; public CommandUsingSemaphoreIsolation(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) // since we"re doing an in-memory cache lookup we choose SEMAPHORE isolation .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { // a real implementation would retrieve data from in memory data structure return "ValueFromHashMap_" + id; } }
Get-Set-Get是指: Get請(qǐng)求的結(jié)果被緩存下來(lái)后, 另一個(gè)command對(duì)同一個(gè)資源發(fā)出了Set請(qǐng)求, 此時(shí)由Get請(qǐng)求緩存的結(jié)果應(yīng)該失效, 避免隨后的Get請(qǐng)求獲取到過(guò)時(shí)的緩存結(jié)果, 此時(shí)可以通過(guò)調(diào)用HystrixRequestCache.clear())方法來(lái)使緩存失效.
public class CommandUsingRequestCacheInvalidation { /* represents a remote data store */ private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_"; public static class GetterCommand extends HystrixCommand{ private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand"); private final int id; public GetterCommand(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet")) .andCommandKey(GETTER_KEY)); this.id = id; } @Override protected String run() { return prefixStoredOnRemoteDataStore + id; } @Override protected String getCacheKey() { return String.valueOf(id); } /** * Allow the cache to be flushed for this object. * * @param id * argument that would normally be passed to the command */ public static void flushCache(int id) { HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id)); } } public static class SetterCommand extends HystrixCommand { private final int id; private final String prefix; public SetterCommand(int id, String prefix) { super(HystrixCommandGroupKey.Factory.asKey("GetSetGet")); this.id = id; this.prefix = prefix; } @Override protected Void run() { // persist the value against the datastore prefixStoredOnRemoteDataStore = prefix; // flush the cache GetterCommand.flushCache(id); // no return value return null; } } }
@Test public void getGetSetGet() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertEquals("ValueBeforeSet_1", new GetterCommand(1).execute()); GetterCommand commandAgainstCache = new GetterCommand(1); assertEquals("ValueBeforeSet_1", commandAgainstCache.execute()); // confirm it executed against cache the second time assertTrue(commandAgainstCache.isResponseFromCache()); // set the new value new SetterCommand(1, "ValueAfterSet_").execute(); // fetch it again GetterCommand commandAfterSet = new GetterCommand(1); // the getter should return with the new prefix, not the value from cache assertFalse(commandAfterSet.isResponseFromCache()); assertEquals("ValueAfterSet_1", commandAfterSet.execute()); } finally { context.shutdown(); } } }Migrating a Library to Hystrix
如果你要遷移一個(gè)已有的客戶端庫(kù)到Hystrix, 你應(yīng)該將所有的服務(wù)方法(service methods)替換成HystrixCommand.
服務(wù)方法(service methods)轉(zhuǎn)而調(diào)用HystrixCommand且不在包含任何額外的業(yè)務(wù)邏輯.
因此, 在遷移之前, 一個(gè)服務(wù)庫(kù)可能是這樣的:
遷移完成之后, 服務(wù)庫(kù)的用戶要能直接訪問(wèn)到HystrixCommand, 或者通過(guò)服務(wù)門面(service facade)的代理間接訪問(wèn)到HystrixCommand.
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/73416.html
摘要:使用線程池的好處通過(guò)線程在自己的線程池中隔離的好處是該應(yīng)用程序完全可以不受失控的客戶端庫(kù)的威脅。簡(jiǎn)而言之,由線程池提供的隔離功能可以使客戶端庫(kù)和子系統(tǒng)性能特性的不斷變化和動(dòng)態(tài)組合得到優(yōu)雅的處理,而不會(huì)造成中斷。 ? 工作流程圖 下面的流程圖展示了當(dāng)使用Hystrix的依賴請(qǐng)求,Hystrix是如何工作的。showImg(https://segmentfault.com/img/bV0...
摘要:斷路器本身是一種開(kāi)關(guān)裝置,用于在電路上保護(hù)線路過(guò)載,當(dāng)線路中有電器發(fā)生短路時(shí),斷路器能夠及時(shí)的切斷故障電路,防止發(fā)生過(guò)載發(fā)熱甚至起火等嚴(yán)重后果。具備擁有回退機(jī)制和斷路器功能的線程和信號(hào)隔離,請(qǐng)求緩存和請(qǐng)求打包,以及監(jiān)控和配置等功能。 轉(zhuǎn)載請(qǐng)注明出處 http://www.paraller.com 代碼機(jī)制:熔斷 & Fallback & 資源隔離 熔斷 概念: 在微服務(wù)架構(gòu)中,我們將系...
摘要:腳本位置依賴內(nèi)采樣率,默認(rèn)即如需測(cè)試時(shí)每次都看到則修改為,但對(duì)性能有影響,注意上線時(shí)修改為合理值運(yùn)行查詢參考規(guī)范推薦推薦谷歌的大規(guī)模分布式跟蹤系統(tǒng)分布式服務(wù)的 zipkin-server pom io.zipkin zipkin-ui 1.39.3 or...
摘要:斷路器原理斷路器在和執(zhí)行過(guò)程中起到至關(guān)重要的作用。其中通過(guò)來(lái)定義,每一個(gè)命令都需要有一個(gè)來(lái)標(biāo)識(shí),同時(shí)根據(jù)這個(gè)可以找到對(duì)應(yīng)的斷路器實(shí)例。一個(gè)啥都不做的斷路器,它允許所有請(qǐng)求通過(guò),并且斷路器始終處于閉合狀態(tài)斷路器的另一個(gè)實(shí)現(xiàn)類。 斷路器原理 斷路器在HystrixCommand和HystrixObservableCommand執(zhí)行過(guò)程中起到至關(guān)重要的作用。查看一下核心組件HystrixCi...
閱讀 1975·2023-04-26 01:59
閱讀 3279·2021-10-11 11:07
閱讀 3311·2021-09-22 15:43
閱讀 3394·2021-09-02 15:21
閱讀 2578·2021-09-01 10:49
閱讀 916·2019-08-29 15:15
閱讀 3104·2019-08-29 13:59
閱讀 2843·2019-08-26 13:36