成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

CircuitBreaker模式的Java實(shí)現(xiàn)

animabear / 553人閱讀

摘要:序狀態(tài)轉(zhuǎn)換閉開(kāi)在設(shè)定的時(shí)間窗口內(nèi)失敗次數(shù)達(dá)到閾值,由閉開(kāi)。進(jìn)入開(kāi)的同時(shí)啟動(dòng)進(jìn)入半開(kāi)狀態(tài)的定時(shí)器。半開(kāi)狀態(tài)的計(jì)數(shù)器目前半開(kāi)狀態(tài)沒(méi)有使用時(shí)間窗口,僅僅使用連續(xù)成功次數(shù)來(lái)計(jì)算,一旦失敗,則將斷路器設(shè)置為狀態(tài)。

狀態(tài)轉(zhuǎn)換

閉->開(kāi)

在設(shè)定的時(shí)間窗口內(nèi)失敗次數(shù)達(dá)到閾值,由閉->開(kāi)。

開(kāi)->半開(kāi)

在處于開(kāi)的狀態(tài),對(duì)目標(biāo)的調(diào)用做失敗返回,進(jìn)入開(kāi)的時(shí)候,啟動(dòng)計(jì)時(shí)器,設(shè)定時(shí)間過(guò)后進(jìn)入半開(kāi)狀態(tài)。

半開(kāi)->開(kāi)

進(jìn)入半開(kāi)狀態(tài),會(huì)啟動(dòng)一個(gè)計(jì)數(shù)器,記錄連續(xù)成功的調(diào)用次數(shù),超過(guò)閾值,進(jìn)入閉狀態(tài)。有一次失敗則進(jìn)入開(kāi)狀態(tài),同時(shí)清零連續(xù)成功調(diào)用次數(shù)。進(jìn)入開(kāi)的同時(shí)啟動(dòng)進(jìn)入半開(kāi)狀態(tài)的定時(shí)器。

半開(kāi)->閉

進(jìn)入半開(kāi)狀態(tài),會(huì)啟動(dòng)一個(gè)計(jì)數(shù)器,記錄連續(xù)成功的調(diào)用次數(shù),超過(guò)閾值,進(jìn)入閉狀態(tài),同時(shí)清零連續(xù)成功調(diào)用次數(shù)。

實(shí)現(xiàn)要點(diǎn)

切到開(kāi)狀態(tài)啟動(dòng)的定時(shí)器

這里如果使用定時(shí)線程來(lái)做的話,開(kāi)的線程多,管理比較麻煩,故這里改為維護(hù)一個(gè)切換到開(kāi)狀態(tài)的時(shí)間,在每次方法調(diào)用,判斷是開(kāi)狀態(tài)時(shí),判斷是否已經(jīng)過(guò)了這個(gè)超時(shí)閾值,超過(guò)的話,進(jìn)入半開(kāi)狀態(tài)。

半開(kāi)狀態(tài)的計(jì)數(shù)器

目前半開(kāi)狀態(tài)沒(méi)有使用時(shí)間窗口,僅僅使用連續(xù)成功次數(shù)來(lái)計(jì)算,一旦失敗,則將斷路器設(shè)置為open狀態(tài)。如果連續(xù)成功次數(shù)達(dá)到閾值,則進(jìn)入close狀態(tài)。每次進(jìn)入half-open的狀態(tài)時(shí),連續(xù)成功的計(jì)數(shù)器清零。

主要代碼 斷路器狀態(tài)
public enum CircuitBreakerState {
    CLOSED,    // working normally, calls are transparently passing through
    OPEN,      // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead
    HALF_OPEN  // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN
}
帶時(shí)間窗口的計(jì)數(shù)器
/**
 * 帶時(shí)間窗口的限流計(jì)數(shù)器
 */
public class LimitCounter {
    private long startTime;
    private long timeIntervalInMs;
    private int maxLimit;
    private AtomicInteger currentCount;

    public LimitCounter(long timeIntervalInMs, int maxLimit) {
        super();
        this.timeIntervalInMs = timeIntervalInMs;
        this.maxLimit = maxLimit;
        startTime = System.currentTimeMillis();
        currentCount = new AtomicInteger(0);
    }


    public int incrAndGet() {
        long currentTime = System.currentTimeMillis();
        if ((startTime + timeIntervalInMs) < currentTime) {
            synchronized (this) {
                if ((startTime + timeIntervalInMs) < currentTime) {
                    startTime = currentTime;
                    currentCount.set(0);
                }
            }
        }
        return currentCount.incrementAndGet();
    }

    public boolean thresholdReached(){
        return currentCount.get() > maxLimit;
    }

    public int get(){
        return currentCount.get();
    }

    public /*synchronized*/ void reset(){
        currentCount.set(0);
    }
}
主要配置
public class CircuitBreakerConfig {

    //closed狀態(tài)的失敗次數(shù)閾值
    private int failThreshold = 5;

    //closed狀態(tài)的失敗計(jì)數(shù)的時(shí)間窗口
    private int failCountWindowInMs = 60*1000;

    //處于open狀態(tài)下進(jìn)入half-open的超時(shí)時(shí)間
    private int open2HalfOpenTimeoutInMs = 5*1000;

    //half-open狀態(tài)下成功次數(shù)閾值
    private int consecutiveSuccThreshold = 5;

    private CircuitBreakerConfig(){

    }

    public static CircuitBreakerConfig newDefault(){
        CircuitBreakerConfig config = new CircuitBreakerConfig();
        return config;
    }

    public int getFailThreshold() {
        return failThreshold;
    }

    public void setFailThreshold(int failThreshold) {
        this.failThreshold = failThreshold;
    }

    public int getFailCountWindowInMs() {
        return failCountWindowInMs;
    }

    public void setFailCountWindowInMs(int failCountWindowInMs) {
        this.failCountWindowInMs = failCountWindowInMs;
    }

    public int getOpen2HalfOpenTimeoutInMs() {
        return open2HalfOpenTimeoutInMs;
    }

    public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) {
        this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs;
    }

    public int getConsecutiveSuccThreshold() {
        return consecutiveSuccThreshold;
    }

    public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) {
        this.consecutiveSuccThreshold = consecutiveSuccThreshold;
    }
}
斷路器
public class CircuitBreaker {

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);

    private String name;

    private CircuitBreakerConfig config;

    private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED;

    //最近進(jìn)入open狀態(tài)的時(shí)間
    private volatile long lastOpenedTime;

    //closed狀態(tài)下失敗次數(shù)
    private LimitCounter failCount ;

    //half-open狀態(tài)的連續(xù)成功次數(shù),失敗立即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);


    //構(gòu)造器
    public CircuitBreaker(String name,CircuitBreakerConfig config) {
        this.config = config;
        this.name = name;
        failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold());
    }

    //狀態(tài)判斷
    public boolean isOpen(){
        return CircuitBreakerState.OPEN == state;
    }

    public boolean isHalfOpen(){
        return CircuitBreakerState.HALF_OPEN == state;
    }

    public boolean isClosed(){
        return CircuitBreakerState.CLOSED == state;
    }

    //狀態(tài)操作

    /**
     * closed->open | halfopen -> open
     */
    public void open(){
        lastOpenedTime = System.currentTimeMillis();
        state = CircuitBreakerState.OPEN;
        logger.debug("circuit open,key:{}",name);
    }

    /**
     * open -> halfopen
     */
    public void openHalf(){
        consecutiveSuccCount.set(0);
        state = CircuitBreakerState.HALF_OPEN;
        logger.debug("circuit open-half,key:{}",name);
    }

    /**
     * halfopen -> close
     */
    public void close(){
        failCount.reset();
        state = CircuitBreakerState.CLOSED;
        logger.debug("circuit close,key:{}",name);
    }

    //閾值判斷

    /**
     * 是否應(yīng)該轉(zhuǎn)到half open
     * 前提是 open state
     * @return
     */
    public boolean isOpen2HalfOpenTimeout(){
        return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime;
    }

    /**
     * 是否應(yīng)該從close轉(zhuǎn)到open
     * @return
     */
    public boolean isCloseFailThresholdReached(){
        return failCount.thresholdReached();
    }

    /**
     * half-open狀態(tài)下是否達(dá)到close的閾值
     * @return
     */
    public boolean isConsecutiveSuccessThresholdReached(){
        return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold();
    }

    //getter
    public void incrFailCount() {
        int count = failCount.incrAndGet();
        logger.debug("incr fail count:{},key:{}",count,name);
    }

    public AtomicInteger getConsecutiveSuccCount() {
        return consecutiveSuccCount;
    }

    public CircuitBreakerState getState() {
        return state;
    }
}
斷路器維護(hù)的變量
    //最近進(jìn)入open狀態(tài)的時(shí)間
    private volatile long lastOpenedTime;

    //closed狀態(tài)下失敗次數(shù)
    private LimitCounter failCount ;

    //half-open狀態(tài)的連續(xù)成功次數(shù),失敗立即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);
基于jdk代理的攔截
public class CircuitBreakerInvocationHandler implements InvocationHandler{

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class);

    private Object target;

    public CircuitBreakerInvocationHandler(Object target) {
        this.target = target;
    }

    //動(dòng)態(tài)生成代理對(duì)象
    public Object proxy(){
        return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this);
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class);
        if(breakerAnno == null){
            return method.invoke(target,args);
        }
        Class[] noTripExs = breakerAnno.noTripExceptions();
        int timeout = breakerAnno.timeoutInMs();
        int interval = breakerAnno.failCountWindowInMs();
        int failThreshold = breakerAnno.failThreshold();
        CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault();
        if(interval != -1){
            cfg.setFailCountWindowInMs(interval);
        }
        if(failThreshold != -1){
            cfg.setFailThreshold(failThreshold);
        }

        String key = target.getClass().getSimpleName() + method.getName();
        CircuitBreaker breaker = CircuitBreakerRegister.get(key);
        if(breaker == null){
            breaker = new CircuitBreaker(key,cfg);
            CircuitBreakerRegister.putIfAbsent(key,breaker);
        }

        Object returnValue = null;

        logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString());
        //breaker state
        if(breaker.isOpen()){
            //判斷是否該進(jìn)入half open狀態(tài)
            if(breaker.isOpen2HalfOpenTimeout()){
                //進(jìn)入half open狀態(tài)
                breaker.openHalf();
                logger.debug("method:{} into half open",method.toGenericString());
                returnValue = processHalfOpen(breaker,method,args,noTripExs);
            }else{
                throw new CircuitBreakerOpenException(method.toGenericString());
            }
        }else if(breaker.isClosed()){
            try{
                returnValue = method.invoke(target,args);
//                這里看情況是否重置標(biāo)志
//                breaker.close();
            }catch (Throwable t){
                if(isNoTripException(t,noTripExs)){
                    throw t;
                }else{
                    //增加計(jì)數(shù)
                    breaker.incrFailCount();
                    if(breaker.isCloseFailThresholdReached()){
                        //觸發(fā)閾值,打開(kāi)
                        logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString());
                        breaker.open();
                        throw new CircuitBreakerOpenException(method.toGenericString());
                    }else{
                        throw t;
                    }
                }
            }

        }else if(breaker.isHalfOpen()){
            returnValue = processHalfOpen(breaker,method,args,noTripExs);
        }

        return returnValue;
    }

    private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class[] noTripExs) throws Throwable {
        try{
            Object returnValue = method.invoke(target,args);
            breaker.getConsecutiveSuccCount().incrementAndGet();
            if(breaker.isConsecutiveSuccessThresholdReached()){
                //調(diào)用成功則進(jìn)入close狀態(tài)
                breaker.close();
            }
            return returnValue;
        }catch (Throwable t){
            if(isNoTripException(t,noTripExs)){
                breaker.getConsecutiveSuccCount().incrementAndGet();
                if(breaker.isConsecutiveSuccessThresholdReached()){
                    breaker.close();
                }
                throw t;
            }else{
                breaker.open();
                throw new CircuitBreakerOpenException(method.toGenericString(), t);
            }
        }
    }

    private boolean isNoTripException(Throwable t,Class[] noTripExceptions){
        if(noTripExceptions == null || noTripExceptions.length == 0){
            return false;
        }
        for(Class ex:noTripExceptions){
            //是否是拋出異常t的父類
            //t java.lang.reflect.InvocationTargetException
            if(ex.isAssignableFrom(t.getCause().getClass())){
                return true;
            }
        }
        return false;
    }
}
github工程circuit-breaker
參考

martinfowler-CircuitBreaker

microsoft-Circuit Breaker Pattern(必讀)

cloud-design-patterns-斷路器模式

HystrixCircuitBreaker

熔斷器設(shè)計(jì)模式(實(shí)現(xiàn)參考)

Creating a circuit breaker with Spring AOP(實(shí)現(xiàn)參考)

alenegro81/CircuitBreaker(參考jdk代理實(shí)現(xiàn))

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65827.html

相關(guān)文章

  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個(gè)實(shí)現(xiàn)類分別是實(shí)現(xiàn)了接口,它不做任何操作實(shí)現(xiàn)了接口其方法會(huì)拋出方法首先判斷,如果為,則執(zhí)行方法如果為則調(diào)用,否則調(diào)用計(jì)算,沒(méi)有拋出異常的話,則最后執(zhí)序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    番茄西紅柿 評(píng)論0 收藏0
  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個(gè)實(shí)現(xiàn)類分別是實(shí)現(xiàn)了接口,它不做任何操作實(shí)現(xiàn)了接口其方法會(huì)拋出方法首先判斷,如果為,則執(zhí)行方法如果為則調(diào)用,否則調(diào)用計(jì)算,沒(méi)有拋出異常的話,則最后執(zhí)序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    番茄西紅柿 評(píng)論0 收藏0
  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個(gè)實(shí)現(xiàn)類分別是實(shí)現(xiàn)了接口,它不做任何操作實(shí)現(xiàn)了接口其方法會(huì)拋出方法首先判斷,如果為,則執(zhí)行方法如果為則調(diào)用,否則調(diào)用計(jì)算,沒(méi)有拋出異常的話,則最后執(zhí)序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    yangrd 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<