🌏 环境:JDK11
Intellij IDEA 2019.03
🌾 依赖版本:Resilience4j v0.13.2
🍃 知识依赖:JUC
,位图
一、什么是熔断 在分布式系统中,各服务间的相互调用更加频繁,上下游调用中充满了可能性,一个服务可能会被很多其他服务依赖并调用,在这个过程中如果某个服务由于某种原因出错(业务出错、负载过高),可能会导致整个分布式调用链路失败:
上面这个过程最终可能会导致全链路瘫痪(服务雪崩),此时需要一种可以解决上述问题的策略,此策略设计目标为:
在发现有服务调用失败后,及时计算失败率
失败率达到某种阈值时,切断与该服务的所有交互,服务走切断后的自定义逻辑
切断并且不再调用该服务后主动监听被切断的服务是否已经恢复了处理能力,若恢复,则继续让其提供服务
这个策略被放进图1
中,就变成了下面这样:
这个过程中,C服务在自己出问题的情况下,并不会像图1
里那样仍然有大量流量打进来,也不会影响到上游服务,这个结果让调用链看起来比图1
更加的稳定,这个过程就叫熔断
。
针对这个过程,可以看到在C不可用时,B走了熔断后的降级逻辑,这个逻辑可以自定义,如果C在整个调用链里属于那种必须要成功的服务,那么这里的逻辑就可以是直接抛错,如果C属于那种失败了也无所谓,不影响整个业务处理,那么降级逻辑里就可以不做处理,例如下面的场景:
类似这种接口,降级策略很适合不做处理,返回空信息即可,这样最坏的情况就是页面少了某个板块的信息,可能会对用户造成不太好的体验,但是不影响其对外服务,被熔断的服务恢复后页面也会重新回归正常。熔断后的降级处理方式是件值得思考的事情,熔断和降级是相互独立的概念,熔断后必然会有降级操作(哪怕直接抛异常也是一种降级策略),这个降级操作是熔断这个动作导致的,所以很多时候会把熔断和降级放在一起说,其实降级还可以由其他动作触发,比如限流后抛出“系统繁忙
”,这也是一种降级策略,只不过它是由限流触发的,再比如通过开关埋点在系统负载过高时主动关停一些次要服务来提升核心功能的响应速度,这也是一种降级策略,降级是最终产物,而产生它的方式有很多种。
二、Resilience4j中的熔断器 2.1:Resilience4j是什么? 它是一个轻量、易用、可组装的高可用框架,支持熔断
、高频控制
、隔离
、限流
、限时
、重试
等多种高可用机制。本篇文章只关注其熔断部分
。
2.2:如何使用? 通过第一部分的介绍,可以认为一个熔断器必须要具备统计单位请求内的错误率、全熔断、半熔断放量、恢复这几个流程,带着这个流程,下面来介绍下Resilience4j
里熔断器的用法。
通过图2
里服务B调用服务C的例子,现在利用java类来进行简单模拟下这个流程。
首先定义ServerC类,用于模拟服务C:
代码块1 1 2 3 4 5 6 7 8 9 public class ServerC { public String getCInfo (int id) { if (id == 0 ) { throw new RuntimeException ("输入0异常" ); } return "id=" + id + "的C信息" ; } }
再定义ServerB类,用于模拟服务B,这里给服务B调用服务C方法那里加上熔断器处理,注意这个类里的注释,会详细说明熔断器的主要配置项以及其使用方法:
代码块2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class ServerB { private CircuitBreakerRegistry breakerRegistry; private ServerC serverC = new ServerC (); ServerB() { breakerRegistry = CircuitBreakerRegistry.of(CircuitBreakerConfig.custom() .enableAutomaticTransitionFromOpenToHalfOpen() .failureRateThreshold(50 ) .ringBufferSizeInClosedState(100 ) .ringBufferSizeInHalfOpenState(10 ) .waitDurationInOpenState(Duration.ofMillis(1000L )) .build()); } public String getCInfo (int id) { CircuitBreaker breaker = breakerRegistry.circuitBreaker("getCInfo" ); try { return breaker.executeCallable(() -> serverC.getCInfo(id)); } catch (CircuitBreakerOpenException e) { return "服务C出错,触发服务B的降级逻辑" ; } catch (Exception e) { return "调用服务C出错" ; } } public CircuitBreaker getBreaker () { return breakerRegistry.circuitBreaker("getCInfo" ); } }
上述配置的熔断器解释如下:
在熔断器闭合
的情况下(也即是正常情况下),以100个请求为单位窗口
计算错误率,一旦错误率达到50%,立刻进入全熔断
状态,该状态下服务B不会再发生对服务C的调用,直接走自己的降级逻辑,经过1000ms后恢复为半熔断
状态,此时流量开始打进服务C,此时仍然会计算错误率,只是半熔断
状态下,是以10个请求为单位窗口计算的错误率,这个可以保证在服务C没有恢复正常的情况下可以更快速的进入全熔断
状态。
2.3:测试-熔断器状态切换 然后开始编写测试方法,下面会通过测试方法来详细解析该熔断器的状态变迁:
代码块3 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public void testBreak () throws Exception { for (int i = 0 ; i < 100 ; i++) { if (i < 50 ) { serverB.getCInfo(0 ); } else { serverB.getCInfo(1 ); } } System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.OPEN)); System.out.println(serverB.getCInfo(1 )); Thread.sleep(500L ); System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.OPEN)); Thread.sleep(500L ); System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.HALF_OPEN)); System.out.println(serverB.getCInfo(1 )); System.out.println(serverB.getCInfo(0 )); for (int i = 0 ; i < 10 ; i++) { if (i < 4 ) { serverB.getCInfo(0 ); } else { serverB.getCInfo(1 ); } } System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.OPEN)); System.out.println(serverB.getCInfo(1 )); Thread.sleep(1000L ); for (int i = 0 ; i < 10 ; i++) { if (i < 6 ) { serverB.getCInfo(1 ); } else { serverB.getCInfo(0 ); } } System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.CLOSED)); System.out.println(serverB.getCInfo(1 )); System.out.println(serverB.getCInfo(0 )); }
最终输出如下:
1 2 3 4 5 6 7 8 9 10 11 true 服务C出错,触发服务B的降级逻辑 true true id=1的C信息 调用服务C出错 true 服务C出错,触发服务B的降级逻辑 true id=1的C信息 调用服务C出错
可以看到,单位请求内达到错误率阈值后熔断器会进入全开状态
(全熔断
),全开状态下走降级逻辑
,此时不再会实际请求服务C,一段时间后(全开持续时间),进入半开状态
(半熔断
),半开时仍然正常打入服务C,只是由于单位请求量相比闭合时更小,若服务还没恢复,计算错误率会更快达到错误率阈值而迅速进入全开状态,以此类推。如果服务已经恢复,那么将会从半开状态进入闭合状态。
2.4:测试-错误率统计方式 通过上面的测试用例可以知道触发熔断器状态切换的时机,而且闭合状态下和半熔断状态下统计错误率的单位请求数不相同,那么这个请求数量又是怎么统计的呢?如果一个请求先错误了49次,然后在第101次请求的时候再错误1次是否可以成功触发熔断器全开?如果把这49次失败往后挪一位呢?现在再来按照设想测试下其错误率的统计方式:
代码块4 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void testRate () { for (int i = 0 ; i < 100 ; i++) { if (i < 49 ) { serverB.getCInfo(0 ); } else { serverB.getCInfo(1 ); } } System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.CLOSED)); serverB.getCInfo(0 ); System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.CLOSED)); }
输出结果为:
然后我们让第一次失败的那次请求和其后面出错的请求后移一位:
代码块5 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void testRate () { for (int i = 0 ; i < 100 ; i++) { if (i != 0 && i < 50 ) { serverB.getCInfo(0 ); } else { serverB.getCInfo(1 ); } } System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.CLOSED)); serverB.getCInfo(0 ); System.out.println(serverB.getBreaker().getState().equals(CircuitBreaker.State.OPEN)); }
输出结果为:
用图来描述下导致这两种情况发生的流程:
所以Resilience4j
在计算失败率的时候,是会发生滑动的,错误率是根据当前滑动窗口
内的请求进行计算得出的,每次请求都会导致窗口移动,重新计算当前失败率,这个在源码解析里会说明这是怎样的一种结构,这里简单了解即可。
三、源码解析 3.1:注册器入口 通过上面ServerB类里的使用,首先会通过CircuitBreakerRegistry.of
生成一个注册器对象,然后利用注册器对象的circuitBreaker
方法来生成一个实际的breaker对象
,代码如下:
代码块6 1 2 3 4 5 6 public interface CircuitBreakerRegistry { static CircuitBreakerRegistry of (CircuitBreakerConfig circuitBreakerConfig) { return new InMemoryCircuitBreakerRegistry (circuitBreakerConfig); } }
InMemoryCircuitBreakerRegistry
类代码如下(已简化处理,只展示流程相关代码):
代码块7 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public final class InMemoryCircuitBreakerRegistry implements CircuitBreakerRegistry { private final ConcurrentMap<String, CircuitBreaker> circuitBreakers; private final CircuitBreakerConfig defaultCircuitBreakerConfig; public InMemoryCircuitBreakerRegistry (CircuitBreakerConfig defaultCircuitBreakerConfig) { this .defaultCircuitBreakerConfig = Objects.requireNonNull(defaultCircuitBreakerConfig, "CircuitBreakerConfig must not be null" ); this .circuitBreakers = new ConcurrentHashMap <>(); } @Override public CircuitBreaker circuitBreaker (String name) { return circuitBreakers.computeIfAbsent(Objects.requireNonNull(name, "Name must not be null" ), (k) -> CircuitBreaker.of(name, defaultCircuitBreakerConfig)); } }
这个流程很简单,就是用一个map来维护所有breaker的,所以需要注意的是,命名breaker的时候,不要携带一些id之类的字段,很容易把map撑爆。
3.2:Breaker实体-CircuitBreaker 拿到breaker实体后首先会通过其executeCallable方法执行需要被熔断的逻辑块,之前提到的所有的错误率统计、状态切换都发生在这个实体内。
代码块8 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public interface CircuitBreaker { default T executeCallable (Callable callable) throws Exception{ return decorateCallable(this , callable).call(); } static Callable decorateCallable (CircuitBreaker circuitBreaker, Callable callable) { return () -> { if (!circuitBreaker.isCallPermitted()) { throw new CircuitBreakerOpenException (String.format("CircuitBreaker '%s' is open" , circuitBreaker.getName())); } long start = System.nanoTime(); try { T returnValue = callable.call(); long durationInNanos = System.nanoTime() - start; circuitBreaker.onSuccess(durationInNanos); return returnValue; } catch (Throwable throwable) { long durationInNanos = System.nanoTime() - start; circuitBreaker.onError(durationInNanos, throwable); throw throwable; } }; } }
CircuitBreaker
是一个接口,CircuitBreakerStateMachine
是它的实现类,上述代码里比较关键的isCallPermitted
、onSuccess
、onError
都是在这个CircuitBreakerStateMachine
类里实现的。 CircuitBreakerStateMachine
类比较复杂,牵扯到整个熔断器的状态切换、错误统计触发等,精简一下该类,只关注核心部分:
代码块9 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 public final class CircuitBreakerStateMachine implements CircuitBreaker { private final String name; private final AtomicReference stateReference; private final CircuitBreakerConfig circuitBreakerConfig; private final CircuitBreakerEventProcessor eventProcessor; public CircuitBreakerStateMachine (String name, CircuitBreakerConfig circuitBreakerConfig) { this .name = name; this .circuitBreakerConfig = circuitBreakerConfig; this .stateReference = new AtomicReference <>(new ClosedState (this )); this .eventProcessor = new CircuitBreakerEventProcessor (); } @Override public void transitionToClosedState () { stateTransition(CLOSED, currentState -> new ClosedState (this , currentState.getMetrics())); } @Override public void transitionToOpenState () { stateTransition(OPEN, currentState -> new OpenState (this , currentState.getMetrics())); } @Override public void transitionToHalfOpenState () { stateTransition(HALF_OPEN, currentState -> new HalfOpenState (this )); } private void stateTransition (State newState, Function<CircuitBreakerState, CircuitBreakerState> newStateGenerator) { CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> { if (currentState.getState() == newState) { return currentState; } return newStateGenerator.apply(currentState); }); if (previousState.getState() != newState) { publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState)); } } @Override public boolean isCallPermitted () { boolean callPermitted = stateReference.get().isCallPermitted(); if (!callPermitted) { publishCallNotPermittedEvent(); } return callPermitted; } @Override public void onError (long durationInNanos, Throwable throwable) { if (circuitBreakerConfig.getRecordFailurePredicate().test(throwable)) { publishCircuitErrorEvent(name, durationInNanos, throwable); stateReference.get().onError(throwable); } else { publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable); } } @Override public void onSuccess (long durationInNanos) { publishSuccessEvent(durationInNanos); stateReference.get().onSuccess(); } }
3.3:状态类 通过上面的代码可以知道isCallPermitted
、onSuccess
、onError
这三个方法实际上都是调用对应XXState
对象里的方法,下面来看下ClosedState
、OpenState
、HalfOpenState
这三个状态对象里有关这三个方法的实现(因为上面的测试用例只涉及这三种状态的互转,实际上这三种状态也是最常用的,所以为了避免混乱,只展示这三种,所有状态类均继承自CircuitBreakerState抽象类
)
3.3.1:ClosedState 闭合状态时初始状态,中途只会由半熔断状态切换而来,正常情况下都是闭合状态,代码如下:
代码块10 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 final class ClosedState extends CircuitBreakerState { private final CircuitBreakerMetrics circuitBreakerMetrics; private final float failureRateThreshold; ClosedState(CircuitBreakerStateMachine stateMachine) { this (stateMachine, null ); } ClosedState(CircuitBreakerStateMachine stateMachine, CircuitBreakerMetrics circuitBreakerMetrics) { super (stateMachine); CircuitBreakerConfig circuitBreakerConfig = stateMachine.getCircuitBreakerConfig(); if (circuitBreakerMetrics == null ){ this .circuitBreakerMetrics = new CircuitBreakerMetrics ( circuitBreakerConfig.getRingBufferSizeInClosedState()); }else { this .circuitBreakerMetrics = circuitBreakerMetrics.copy(circuitBreakerConfig.getRingBufferSizeInClosedState()); } this .failureRateThreshold = stateMachine.getCircuitBreakerConfig().getFailureRateThreshold(); } @Override boolean isCallPermitted () { return true ; } @Override void onError (Throwable throwable) { checkFailureRate(circuitBreakerMetrics.onError()); } @Override void onSuccess () { checkFailureRate(circuitBreakerMetrics.onSuccess()); } private void checkFailureRate (float currentFailureRate) { if (currentFailureRate >= failureRateThreshold) { stateMachine.transitionToOpenState(); } } }
3.3.2:OpenState 一般全熔断状态会从闭合或者半熔断状态里切换而来,它的代码如下:
代码块11 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 final class OpenState extends CircuitBreakerState { private final Instant retryAfterWaitDuration; private final CircuitBreakerMetrics circuitBreakerMetrics; OpenState(CircuitBreakerStateMachine stateMachine, CircuitBreakerMetrics circuitBreakerMetrics) { super (stateMachine); final Duration waitDurationInOpenState = stateMachine.getCircuitBreakerConfig().getWaitDurationInOpenState(); this .retryAfterWaitDuration = Instant.now().plus(waitDurationInOpenState); this .circuitBreakerMetrics = circuitBreakerMetrics; if (stateMachine.getCircuitBreakerConfig().isAutomaticTransitionFromOpenToHalfOpenEnabled()) { AutoTransitioner.scheduleAutoTransition(stateMachine::transitionToHalfOpenState, waitDurationInOpenState); } } @Override boolean isCallPermitted () { if (Instant.now().isAfter(retryAfterWaitDuration)) { stateMachine.transitionToHalfOpenState(); return true ; } circuitBreakerMetrics.onCallNotPermitted(); return false ; } @Override void onError (Throwable throwable) { circuitBreakerMetrics.onError(); } @Override void onSuccess () { circuitBreakerMetrics.onSuccess(); } }
3.3.3:HalfOpenState 半熔断状态一定是由全熔断切换出来的,来看下它的代码:
代码块12 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 final class HalfOpenState extends CircuitBreakerState { private CircuitBreakerMetrics circuitBreakerMetrics; private final float failureRateThreshold; HalfOpenState(CircuitBreakerStateMachine stateMachine) { super (stateMachine); CircuitBreakerConfig circuitBreakerConfig = stateMachine.getCircuitBreakerConfig(); this .circuitBreakerMetrics = new CircuitBreakerMetrics ( circuitBreakerConfig.getRingBufferSizeInHalfOpenState()); this .failureRateThreshold = stateMachine.getCircuitBreakerConfig().getFailureRateThreshold(); } @Override boolean isCallPermitted () { return true ; } @Override void onError (Throwable throwable) { checkFailureRate(circuitBreakerMetrics.onError()); } @Override void onSuccess () { checkFailureRate(circuitBreakerMetrics.onSuccess()); } private void checkFailureRate (float currentFailureRate) { if (currentFailureRate != -1 ) { if (currentFailureRate >= failureRateThreshold) { stateMachine.transitionToOpenState(); } else { stateMachine.transitionToClosedState(); } } } }
3.3.4:状态间的切换关系 上面三种状态的切换关系如下:
在这些状态中,最初为熔断闭合状态,ServerB的所有请求正常访问ServerC,ServerC报错,错误率累计达到50%后触发熔断全开状态,此时Server对ServerC发出的请求将走ServerB的降级逻辑,不再实际访问ServerC的方法,这个状态会持续waitDurationInOpenState
这么久(测试用例中是1000ms),然后进入熔断半开状态,此时跟闭合状态一样,ServerB的所有请求仍会正常访问ServerC,不同的是半开状态下只需要满足ringBufferSizeInHalfOpenState
次调用(测试用例中是10次),就可以直接判断错误率是否达到阈值,这点可以在代码块12
里的checkFailureRate
方法体现,图5
中可以看到,如果未达到错误阈值表示ServerC已恢复,则可以关闭熔断,否则再次进入全熔断状态。
3.3.5:度量对象(CircuitBreakerMetrics)的传递 这个对象在3.4
中会详细说明,目前只需要知道该类用于做错误统计用,错误率计算的核心,核心方法为onError
和onSuccess
,这俩方法用于错误/正确请求的触发点,用于触发CircuitBreakerMetrics
对象对错误率的统计。
通过代码块10、11、12
可以看到CircuitBreakerMetrics
对象的流向,首先初始化的时候是调用ClosedState
第一个构造器触发第二个构造器,第二个构造器会new一个CircuitBreakerMetrics
,传过去的size为ringBufferSizeInClosedState
,然后由ClosedState
切换至OpenState
状态时,其CircuitBreakerMetrics
会被传递给OpenState
对象,根据代码块11
可以知道,OpenState
利用该对象统计熔断期间被熔断的次数,然后OpenState
切换至HalfOpenState
时,HalfOpenState
没有接受CircuitBreakerMetrics
对象的构造器,不管由谁切换到半开状态
,CircuitBreakerMetrics
对象都是全新的,由代码块12
可知,初始化CircuitBreakerMetrics
对象时传过去的size就是ringBufferSizeInHalfOpenState
。
CircuitBreakerMetrics
对象的传递以及传递后在State对象
里所做的操作:
图6
根据代码块10、11、12
画出,简单体现了Metrics对象
的生成以及流向,以及这个对象在各State对象
里所做的主要操作。通过图6
可以看出实际产生新的Metrics对象的地方为闭合态
和半开态
,因为这俩地方是需要做错误统计的,需要全新的Metric对象,全开态下仅接收前一状态的Metrics对象,在命中熔断后对其内部numberOfNotPermittedCalls
(不是很懂这个属性,简单的累加,连用到的地方都没,可能仅仅是做个熔断数统计让业务方获取的吧,做监控可以用),在半开态再次进入闭合态时,其Metrics仍然被传递给了闭合态,由代码块10
可知,如果传了Metrics对象
,闭合态
在产生新的Metrics对象
时,会通过copy
方法来产生,这个方法在3.4
会详细说明,简单来说就是把前一个状态(只可能是半开态
)的Metrics里的请求计数同步到它自己的Metrics里,这样做有一个好处,就是新的闭合态不用重新累计错误率了,以单元测试所配的参数试想一下,如果在半开态下,进行了10次请求,发生了4次错误,此时会切回闭合态,闭合态copy
了这10次请求的数据,那么只需要再经过90次请求和46次错误便可以再次进入全熔断状态(其实就是保证了状态的平滑切换
,不丢失之前已经统计了的数据)。
3.4:错误统计 3.4.1:CircuitBreakerMetrics 通过3.3
的了解,闭合和半开时的请求状态计数都是通过CircuitBreakerMetrics
对象来完成的,现在来看下这个类里都干了些什么:
代码块13 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 class CircuitBreakerMetrics implements CircuitBreaker .Metrics { private final int ringBufferSize; private final RingBitSet ringBitSet; private final LongAdder numberOfNotPermittedCalls; CircuitBreakerMetrics(int ringBufferSize) { this (ringBufferSize, null ); } public CircuitBreakerMetrics copy (int targetRingBufferSize) { return new CircuitBreakerMetrics (targetRingBufferSize, this .ringBitSet); } CircuitBreakerMetrics(int ringBufferSize, RingBitSet sourceSet) { this .ringBufferSize = ringBufferSize; if (sourceSet != null ) { this .ringBitSet = new RingBitSet (this .ringBufferSize, sourceSet); }else { this .ringBitSet = new RingBitSet (this .ringBufferSize); } this .numberOfNotPermittedCalls = new LongAdder (); } float onError () { int currentNumberOfFailedCalls = ringBitSet.setNextBit(true ); return getFailureRate(currentNumberOfFailedCalls); } float onSuccess () { int currentNumberOfFailedCalls = ringBitSet.setNextBit(false ); return getFailureRate(currentNumberOfFailedCalls); } void onCallNotPermitted () { numberOfNotPermittedCalls.increment(); } @Override public float getFailureRate () { return getFailureRate(getNumberOfFailedCalls()); } @Override public int getNumberOfBufferedCalls () { return this .ringBitSet.length(); } @Override public int getNumberOfFailedCalls () { return this .ringBitSet.cardinality(); } private float getFailureRate (int numberOfFailedCalls) { if (getNumberOfBufferedCalls() < ringBufferSize) { return -1.0f ; } return numberOfFailedCalls * 100.0f / ringBufferSize; } }
通过上面的代码可以知道最终统计错误数的是在RingBitSet
结构中,下面来仔细了解下这个类~
3.4.2:位图&BitSetMod 了解RingBitSet
之前,先来了解一种数据结构-位图
,如果已经了解过位图,那么可以直接去看RingBitSet。
RingBitSet持有一个BitSetMod
对象,BitSetMod基于位图实现,位图是怎样的一种结构呢?先看下图7
,然后再去解析它的源码实现。
通过上图可知,位图就是利用数组内每个元素的bit位存入一个标记,标记只有存在或者不存在(对应二进制的0和1),这样就可以做到用一个long型的数字就可以产生出64个标记信息,非常适合数据量庞大而判断状态少的应用场景,比如判断一个词语是否是屏蔽词,首先屏蔽词状态只有两种:命中or不命中,但是屏蔽词可能是个非常庞大的集合,如果一个个拿来比较,效率完全保证不了,那么就可以利用这个数据结构来解决这类问题,可以首先把所有的屏蔽词放到一个位图结构里,如果有相同的词语,只需要简单的两部运算就可以拿到是否命中结果,构建这个位图结构的过程如下:
通过上图,屏蔽词位图结构就构建好了,如果有个词语需要判定是否命中屏蔽词,只需要让这个词语通过上面的哈希算法计算出哈希值,然后找到对应的数组下标,通过位运算算出其所在位置,将该位置的值取出,如果是0,则认为没有命中,1则认为命中。
以上就是位图结构,通过上面的例子,可以认为同一个值一定命中位图里的同一个位置,那么抽象成熔断器的错误率,错误状态只有0和1,1表示错误,0表示正确,给每次请求编号,当成是图8中的哈希值,相同编号的请求一定会落到同一个位置,现在不理解没关系,这个要结合RingBitSet一起看,目前只需要理解位图特性即可。
Resilience4j里通过BitSetMod简单实现了一个位图结构,来看下代码(注:代码里有大量位运算,过程说明都写在了注释里):
代码块14 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 public class BitSetMod { private final static int ADDRESS_BITS_PER_WORD = 6 ; private final int size; private final long [] words; public BitSetMod (final int capacity) { int countOfWordsRequired = wordIndex(capacity - 1 ) + 1 ; size = countOfWordsRequired << ADDRESS_BITS_PER_WORD; words = new long [countOfWordsRequired]; } private static int wordIndex (int bitIndex) { return bitIndex >> ADDRESS_BITS_PER_WORD; } public int set (int bitIndex, boolean value) { int wordIndex = wordIndex(bitIndex); long bitMask = 1L << bitIndex; int previous = (words[wordIndex] & bitMask) != 0 ? 1 : 0 ; if (value) { words[wordIndex] = words[wordIndex] | bitMask; } else { words[wordIndex] = words[wordIndex] & ~bitMask; } return previous; } int size () { return size; } boolean get (int bitIndex) { int wordIndex = wordIndex(bitIndex); long bitMask = 1L << bitIndex; return (words[wordIndex] & bitMask) != 0 ; } }
上面是Resilience4j针对位图的简单实现,它负责存储单位请求内的错误/成功
标志。
3.4.3:RingBitSet 之前说过,最终请求被放到了一个环形结构里才对,沿着环执行一周就是一次单位请求,回看下图4,其实第101次请求就是顶替掉第一次请求的结果罢了,现在把图4中以100为请求窗口弯曲成一个环,假如第一次请求是失败的,第101次请求是成功的(绿色背景表示成功的请求,红色背景表示失败的请求):
如何利用位图结构记录每次请求的错误/成功
标记然后再实现图9
里的环形结构
呢?Resilience4j通过RingBitSet
来实现,来看下它的代码:
代码块15 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public class RingBitSet { private final int size; private final BitSetMod bitSet; private boolean notFull; private int index = -1 ; private volatile int length; private volatile int cardinality = 0 ; RingBitSet(int bitSetSize) { notFull = true ; size = bitSetSize; bitSet = new BitSetMod (bitSetSize); } RingBitSet(int bitSetSize, RingBitSet sourceSet) { this (bitSetSize); int targetLength = Integer.min(bitSetSize, sourceSet.length); int sourceIndex = sourceSet.index; int forwardIndex = sourceSet.size - sourceIndex; for (int i = 0 ; i < targetLength; i++) { this .setNextBit(sourceSet.bitSet.get(sourceIndex)); forwardIndex = (forwardIndex + 1 ) % sourceSet.size; sourceIndex = (sourceSet.size - forwardIndex) % sourceSet.size; } } public synchronized int setNextBit (boolean value) { increaseLength(); index = (index + 1 ) % size; int previous = bitSet.set(index, value); int current = value ? 1 : 0 ; cardinality = cardinality - previous + current; return cardinality; } public int cardinality () { return cardinality; } public int size () { return bitSet.size(); } public int length () { return length; } @Override public String toString () { StringBuilder result = new StringBuilder (); for (int i = 0 ; i < size; i++) { result.append(bitSet.get(i) ? '1' : '0' ); } return result.toString(); } synchronized int getIndex () { return index; } private void increaseLength () { if (notFull) { int nextLength = length + 1 ; if (nextLength < size) { length = nextLength; } else { length = size; notFull = false ; } } } }
四、总结 Resilience4j通过CircuitBreakerStateMachine
来独立出一个熔断器,其内部持有一个CircuitBreakerState
对象的引用,在错误率达到某个阈值时,会发生状态切换,CircuitBreakerState的引用会指向新的状态对象。每个状态对象持有一个CircuitBreakerMetrics
对象,用于做实时统计和错误率监听使用,CircuitBreakerMetrics对象通过RingBitSet
来完成单位请求窗口的错误率统计,这个统计是实时的,每次请求都会触发一次错误率的判断。RingBitSet通过Resilience4j自己实现的一个轻量级的位图
结构BitSetMod
来标记请求错误/成功
,顺便说下,这里通过RingBitSet来保证环形结构
,而位图只负责存储请求结果,那么既然这样,我用普通的数组或者其他的可以通过下标获取数值的集合结构也可以实现啊,为什么一定要用位图呢?猜测是位图既可以保证跟数组一样高效,都是O(1)
的复杂度,又可以节省存储空间,比如我的单位请求是1w次,如果是数组结构,虽然效率跟位图一样高,但是数组却需要存1w个0或1这样的数组,即便用byte类型的数组,每个数组元素都浪费了7个bit位。其他集合就更不用说了,效率无法保证,其次他们浪费的内存比单纯数组要高,所以,类似这种只有true或false的数据的存储,位图再适合不过了。
感觉有些地方说的不太清晰,待后续改进描述方式。