摘要:序狀態(tài)轉(zhuǎn)換閉開(kāi)在設(shè)定的時(shí)間窗口內(nèi)失敗次數(shù)達(dá)到閾值,由閉開(kāi)。進(jìn)入開(kāi)的同時(shí)啟動(dòng)進(jìn)入半開(kāi)狀態(tài)的定時(shí)器。半開(kāi)狀態(tài)的計(jì)數(shù)器目前半開(kāi)狀態(tài)沒(méi)有使用時(shí)間窗口,僅僅使用連續(xù)成功次數(shù)來(lái)計(jì)算,一旦失敗,則將斷路器設(shè)置為狀態(tài)。
序 狀態(tài)轉(zhuǎn)換
閉->開(kāi)
在設(shè)定的時(shí)間窗口內(nèi)失敗次數(shù)達(dá)到閾值,由閉->開(kāi)。
開(kāi)->半開(kāi)
在處于開(kāi)的狀態(tài),對(duì)目標(biāo)的調(diào)用做失敗返回,進(jìn)入開(kāi)的時(shí)候,啟動(dòng)計(jì)時(shí)器,設(shè)定時(shí)間過(guò)后進(jìn)入半開(kāi)狀態(tài)。
半開(kāi)->開(kāi)
進(jìn)入半開(kāi)狀態(tài),會(huì)啟動(dòng)一個(gè)計(jì)數(shù)器,記錄連續(xù)成功的調(diào)用次數(shù),超過(guò)閾值,進(jìn)入閉狀態(tài)。有一次失敗則進(jìn)入開(kāi)狀態(tài),同時(shí)清零連續(xù)成功調(diào)用次數(shù)。進(jìn)入開(kāi)的同時(shí)啟動(dòng)進(jìn)入半開(kāi)狀態(tài)的定時(shí)器。
半開(kāi)->閉
進(jìn)入半開(kāi)狀態(tài),會(huì)啟動(dòng)一個(gè)計(jì)數(shù)器,記錄連續(xù)成功的調(diào)用次數(shù),超過(guò)閾值,進(jìn)入閉狀態(tài),同時(shí)清零連續(xù)成功調(diào)用次數(shù)。
實(shí)現(xiàn)要點(diǎn)切到開(kāi)狀態(tài)啟動(dòng)的定時(shí)器
這里如果使用定時(shí)線程來(lái)做的話,開(kāi)的線程多,管理比較麻煩,故這里改為維護(hù)一個(gè)切換到開(kāi)狀態(tài)的時(shí)間,在每次方法調(diào)用,判斷是開(kāi)狀態(tài)時(shí),判斷是否已經(jīng)過(guò)了這個(gè)超時(shí)閾值,超過(guò)的話,進(jìn)入半開(kāi)狀態(tài)。
半開(kāi)狀態(tài)的計(jì)數(shù)器
目前半開(kāi)狀態(tài)沒(méi)有使用時(shí)間窗口,僅僅使用連續(xù)成功次數(shù)來(lái)計(jì)算,一旦失敗,則將斷路器設(shè)置為open狀態(tài)。如果連續(xù)成功次數(shù)達(dá)到閾值,則進(jìn)入close狀態(tài)。每次進(jìn)入half-open的狀態(tài)時(shí),連續(xù)成功的計(jì)數(shù)器清零。
主要代碼 斷路器狀態(tài)public enum CircuitBreakerState { CLOSED, // working normally, calls are transparently passing through OPEN, // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead HALF_OPEN // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN }帶時(shí)間窗口的計(jì)數(shù)器
/** * 帶時(shí)間窗口的限流計(jì)數(shù)器 */ public class LimitCounter { private long startTime; private long timeIntervalInMs; private int maxLimit; private AtomicInteger currentCount; public LimitCounter(long timeIntervalInMs, int maxLimit) { super(); this.timeIntervalInMs = timeIntervalInMs; this.maxLimit = maxLimit; startTime = System.currentTimeMillis(); currentCount = new AtomicInteger(0); } public int incrAndGet() { long currentTime = System.currentTimeMillis(); if ((startTime + timeIntervalInMs) < currentTime) { synchronized (this) { if ((startTime + timeIntervalInMs) < currentTime) { startTime = currentTime; currentCount.set(0); } } } return currentCount.incrementAndGet(); } public boolean thresholdReached(){ return currentCount.get() > maxLimit; } public int get(){ return currentCount.get(); } public /*synchronized*/ void reset(){ currentCount.set(0); } }主要配置
public class CircuitBreakerConfig { //closed狀態(tài)的失敗次數(shù)閾值 private int failThreshold = 5; //closed狀態(tài)的失敗計(jì)數(shù)的時(shí)間窗口 private int failCountWindowInMs = 60*1000; //處于open狀態(tài)下進(jìn)入half-open的超時(shí)時(shí)間 private int open2HalfOpenTimeoutInMs = 5*1000; //half-open狀態(tài)下成功次數(shù)閾值 private int consecutiveSuccThreshold = 5; private CircuitBreakerConfig(){ } public static CircuitBreakerConfig newDefault(){ CircuitBreakerConfig config = new CircuitBreakerConfig(); return config; } public int getFailThreshold() { return failThreshold; } public void setFailThreshold(int failThreshold) { this.failThreshold = failThreshold; } public int getFailCountWindowInMs() { return failCountWindowInMs; } public void setFailCountWindowInMs(int failCountWindowInMs) { this.failCountWindowInMs = failCountWindowInMs; } public int getOpen2HalfOpenTimeoutInMs() { return open2HalfOpenTimeoutInMs; } public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) { this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs; } public int getConsecutiveSuccThreshold() { return consecutiveSuccThreshold; } public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) { this.consecutiveSuccThreshold = consecutiveSuccThreshold; } }斷路器
public class CircuitBreaker { private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class); private String name; private CircuitBreakerConfig config; private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED; //最近進(jìn)入open狀態(tài)的時(shí)間 private volatile long lastOpenedTime; //closed狀態(tài)下失敗次數(shù) private LimitCounter failCount ; //half-open狀態(tài)的連續(xù)成功次數(shù),失敗立即清零 private AtomicInteger consecutiveSuccCount = new AtomicInteger(0); //構(gòu)造器 public CircuitBreaker(String name,CircuitBreakerConfig config) { this.config = config; this.name = name; failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold()); } //狀態(tài)判斷 public boolean isOpen(){ return CircuitBreakerState.OPEN == state; } public boolean isHalfOpen(){ return CircuitBreakerState.HALF_OPEN == state; } public boolean isClosed(){ return CircuitBreakerState.CLOSED == state; } //狀態(tài)操作 /** * closed->open | halfopen -> open */ public void open(){ lastOpenedTime = System.currentTimeMillis(); state = CircuitBreakerState.OPEN; logger.debug("circuit open,key:{}",name); } /** * open -> halfopen */ public void openHalf(){ consecutiveSuccCount.set(0); state = CircuitBreakerState.HALF_OPEN; logger.debug("circuit open-half,key:{}",name); } /** * halfopen -> close */ public void close(){ failCount.reset(); state = CircuitBreakerState.CLOSED; logger.debug("circuit close,key:{}",name); } //閾值判斷 /** * 是否應(yīng)該轉(zhuǎn)到half open * 前提是 open state * @return */ public boolean isOpen2HalfOpenTimeout(){ return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime; } /** * 是否應(yīng)該從close轉(zhuǎn)到open * @return */ public boolean isCloseFailThresholdReached(){ return failCount.thresholdReached(); } /** * half-open狀態(tài)下是否達(dá)到close的閾值 * @return */ public boolean isConsecutiveSuccessThresholdReached(){ return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold(); } //getter public void incrFailCount() { int count = failCount.incrAndGet(); logger.debug("incr fail count:{},key:{}",count,name); } public AtomicInteger getConsecutiveSuccCount() { return consecutiveSuccCount; } public CircuitBreakerState getState() { return state; } }斷路器維護(hù)的變量
//最近進(jìn)入open狀態(tài)的時(shí)間 private volatile long lastOpenedTime; //closed狀態(tài)下失敗次數(shù) private LimitCounter failCount ; //half-open狀態(tài)的連續(xù)成功次數(shù),失敗立即清零 private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);基于jdk代理的攔截
public class CircuitBreakerInvocationHandler implements InvocationHandler{ private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class); private Object target; public CircuitBreakerInvocationHandler(Object target) { this.target = target; } //動(dòng)態(tài)生成代理對(duì)象 public Object proxy(){ return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class); if(breakerAnno == null){ return method.invoke(target,args); } Class extends Throwable>[] noTripExs = breakerAnno.noTripExceptions(); int timeout = breakerAnno.timeoutInMs(); int interval = breakerAnno.failCountWindowInMs(); int failThreshold = breakerAnno.failThreshold(); CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault(); if(interval != -1){ cfg.setFailCountWindowInMs(interval); } if(failThreshold != -1){ cfg.setFailThreshold(failThreshold); } String key = target.getClass().getSimpleName() + method.getName(); CircuitBreaker breaker = CircuitBreakerRegister.get(key); if(breaker == null){ breaker = new CircuitBreaker(key,cfg); CircuitBreakerRegister.putIfAbsent(key,breaker); } Object returnValue = null; logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString()); //breaker state if(breaker.isOpen()){ //判斷是否該進(jìn)入half open狀態(tài) if(breaker.isOpen2HalfOpenTimeout()){ //進(jìn)入half open狀態(tài) breaker.openHalf(); logger.debug("method:{} into half open",method.toGenericString()); returnValue = processHalfOpen(breaker,method,args,noTripExs); }else{ throw new CircuitBreakerOpenException(method.toGenericString()); } }else if(breaker.isClosed()){ try{ returnValue = method.invoke(target,args); // 這里看情況是否重置標(biāo)志 // breaker.close(); }catch (Throwable t){ if(isNoTripException(t,noTripExs)){ throw t; }else{ //增加計(jì)數(shù) breaker.incrFailCount(); if(breaker.isCloseFailThresholdReached()){ //觸發(fā)閾值,打開(kāi) logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString()); breaker.open(); throw new CircuitBreakerOpenException(method.toGenericString()); }else{ throw t; } } } }else if(breaker.isHalfOpen()){ returnValue = processHalfOpen(breaker,method,args,noTripExs); } return returnValue; } private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class extends Throwable>[] noTripExs) throws Throwable { try{ Object returnValue = method.invoke(target,args); breaker.getConsecutiveSuccCount().incrementAndGet(); if(breaker.isConsecutiveSuccessThresholdReached()){ //調(diào)用成功則進(jìn)入close狀態(tài) breaker.close(); } return returnValue; }catch (Throwable t){ if(isNoTripException(t,noTripExs)){ breaker.getConsecutiveSuccCount().incrementAndGet(); if(breaker.isConsecutiveSuccessThresholdReached()){ breaker.close(); } throw t; }else{ breaker.open(); throw new CircuitBreakerOpenException(method.toGenericString(), t); } } } private boolean isNoTripException(Throwable t,Class extends Throwable>[] noTripExceptions){ if(noTripExceptions == null || noTripExceptions.length == 0){ return false; } for(Class extends Throwable> ex:noTripExceptions){ //是否是拋出異常t的父類 //t java.lang.reflect.InvocationTargetException if(ex.isAssignableFrom(t.getCause().getClass())){ return true; } } return false; } }
github工程circuit-breaker參考
martinfowler-CircuitBreaker
microsoft-Circuit Breaker Pattern(必讀)
cloud-design-patterns-斷路器模式
HystrixCircuitBreaker
熔斷器設(shè)計(jì)模式(實(shí)現(xiàn)參考)
Creating a circuit breaker with Spring AOP(實(shí)現(xiàn)參考)
alenegro81/CircuitBreaker(參考jdk代理實(shí)現(xiàn))
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65827.html
摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個(gè)實(shí)現(xiàn)類分別是實(shí)現(xiàn)了接口,它不做任何操作實(shí)現(xiàn)了接口其方法會(huì)拋出方法首先判斷,如果為,則執(zhí)行方法如果為則調(diào)用,否則調(diào)用計(jì)算,沒(méi)有拋出異常的話,則最后執(zhí)序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...
摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個(gè)實(shí)現(xiàn)類分別是實(shí)現(xiàn)了接口,它不做任何操作實(shí)現(xiàn)了接口其方法會(huì)拋出方法首先判斷,如果為,則執(zhí)行方法如果為則調(diào)用,否則調(diào)用計(jì)算,沒(méi)有拋出異常的話,則最后執(zhí)序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...
摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個(gè)實(shí)現(xiàn)類分別是實(shí)現(xiàn)了接口,它不做任何操作實(shí)現(xiàn)了接口其方法會(huì)拋出方法首先判斷,如果為,則執(zhí)行方法如果為則調(diào)用,否則調(diào)用計(jì)算,沒(méi)有拋出異常的話,則最后執(zhí)序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...
閱讀 2845·2023-04-25 18:06
閱讀 2648·2021-11-22 09:34
閱讀 1728·2021-11-08 13:16
閱讀 1347·2021-09-24 09:47
閱讀 3078·2019-08-30 15:44
閱讀 2805·2019-08-29 17:24
閱讀 2613·2019-08-23 18:37
閱讀 2469·2019-08-23 16:55