当前位置: 首页 > news >正文

Sentinel源码—6.熔断降级和数据统计的实现一

大纲

1.DegradeSlot实现熔断降级的原理与源码

2.Sentinel数据指标统计的滑动窗口算法

1.DegradeSlot实现熔断降级的原理与源码

(1)熔断降级规则DegradeRule的配置Demo

(2)注册熔断降级监听器和加载熔断降级规则

(3)DegradeSlot根据熔断降级规则对请求进行验证

(1)熔断降级规则DegradeRule的配置Demo

首先熔断降级规则的应用场景有如下两种:

场景一:在微服务架构中,当一个服务出现问题时,可以通过配置熔断降级规则,防止故障扩散,保护整个系统的稳定性。

场景二:在调用第三方API时,可以配置熔断降级规则,避免因第三方API不稳定导致自身系统不稳定。

然后从下图可知,熔断降级规则包含以下属性:

属性一:熔断策略(grade)

这表示的是熔断降级规则的类型,取值范围分别是:

RuleConstant.DEGRADE_GRADE_RT(慢调用比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO(异常比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT(异常数)

其中,默认下的熔断降级规则是基于慢调用比例策略的,也就是默认值为:

RuleConstant.DEGRADE_GRADE_RT

属性二:熔断降级的阈值(count)

DegradeRule.count属性的具体含义取决于DegradeRule.grade属性的值。

如果grade为慢调用比例,则count表示慢调用比例阈值。
如果grade为异常比例,则count表示异常比例阈值。
如果grade为异常数,则count表示异常数阈值。

属性三:熔断时长(timeWindow)

这表示的是熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级。

属性四:最小请求数(minRequestAmount)

这表示的是熔断降级统计周期内的最小请求总数。仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级。默认值为:

RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT

属性五:慢调用比例阈值(slowRatioThreshold)

该属性当grade为慢调用比例时生效,取值范围为0到1之间的小数,表示慢调用请求占总请求的比例。

属性六:统计时长(statIntervalMs)

这表示的是熔断降级统计周期(单位:毫秒),默认值为1000毫秒(1秒)。在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级。

public class DegradeRule extends AbstractRule {//熔断策略,表示的是熔断降级规则的类型private int grade = RuleConstant.DEGRADE_GRADE_RT;//熔断降级的阈值,具体含义取决于DegradeRule.grade属性的值//如果grade为慢调用比例,则count表示慢调用比例阈值//如果grade为异常比例,则count表示异常比例阈值//如果grade为异常数,则count表示异常数阈值private double count;//熔断时长,即熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级private int timeWindow;//最小请求数,仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;//慢调用比例阈值,仅当grade为慢调用比例时生效private double slowRatioThreshold = 1.0d;//统计时长,熔断降级统计周期,在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级private int statIntervalMs = 1000;...
}

接着如下便是DegradeRule的配置Demo:

//Run this demo, and the output will be like:
//1529399827825,total:0, pass:0, block:0
//1529399828825,total:4263, pass:100, block:4164
//1529399829825,total:19179, pass:4, block:19176 // circuit breaker opens
//1529399830824,total:19806, pass:0, block:19806
//1529399831825,total:19198, pass:0, block:19198
//1529399832824,total:19481, pass:0, block:19481
//1529399833826,total:19241, pass:0, block:19241
//1529399834826,total:17276, pass:0, block:17276
//1529399835826,total:18722, pass:0, block:18722
//1529399836826,total:19490, pass:0, block:19492
//1529399837828,total:19355, pass:0, block:19355
//1529399838827,total:11388, pass:0, block:11388
//1529399839829,total:14494, pass:104, block:14390 // After 10 seconds, the system restored
//1529399840854,total:18505, pass:0, block:18505
//1529399841854,total:19673, pass:0, block:19676
public class SlowRatioCircuitBreakerDemo {private static final String KEY = "some_method";private static volatile boolean stop = false;private static int seconds = 120;private static AtomicInteger total = new AtomicInteger();private static AtomicInteger pass = new AtomicInteger();private static AtomicInteger block = new AtomicInteger();public static void main(String[] args) throws Exception {initDegradeRule();registerStateChangeObserver();startTick();int concurrency = 8;for (int i = 0; i < concurrency; i++) {Thread entryThread = new Thread(() -> {while (true) {Entry entry = null;try {entry = SphU.entry(KEY);pass.incrementAndGet();//RT: [40ms, 60ms)sleep(ThreadLocalRandom.current().nextInt(40, 60));} catch (BlockException e) {block.incrementAndGet();sleep(ThreadLocalRandom.current().nextInt(5, 10));} finally {total.incrementAndGet();if (entry != null) {entry.exit();}}}});entryThread.setName("sentinel-simulate-traffic-task-" + i);entryThread.start();}}private static void registerStateChangeObserver() {EventObserverRegistry.getInstance().addStateChangeObserver("logging",(prevState, newState, rule, snapshotValue) -> {if (newState == State.OPEN) {System.err.println(String.format("%s -> OPEN at %d, snapshotValue=%.2f", prevState.name(), TimeUtil.currentTimeMillis(), snapshotValue));} else {System.err.println(String.format("%s -> %s at %d", prevState.name(), newState.name(), TimeUtil.currentTimeMillis()));}});}private static void initDegradeRule() {List<DegradeRule> rules = new ArrayList<>();DegradeRule rule = new DegradeRule(KEY).setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType())//Max allowed response time.setCount(50).setTimeWindow(10)//Retry timeout (in second).setSlowRatioThreshold(0.6)//Circuit breaker opens when slow request ratio > 60%.setMinRequestAmount(100).setStatIntervalMs(20000);rules.add(rule);DegradeRuleManager.loadRules(rules);System.out.println("Degrade rule loaded: " + rules);}private static void sleep(int timeMs) {try {TimeUnit.MILLISECONDS.sleep(timeMs);} catch (InterruptedException e) {// ignore}}private static void startTick() {Thread timer = new Thread(new TimerTask());timer.setName("sentinel-timer-tick-task");timer.start();}static class TimerTask implements Runnable {@Overridepublic void run() {long start = System.currentTimeMillis();System.out.println("Begin to run! Go go go!");System.out.println("See corresponding metrics.log for accurate statistic data");long oldTotal = 0;long oldPass = 0;long oldBlock = 0;while (!stop) {sleep(1000);long globalTotal = total.get();long oneSecondTotal = globalTotal - oldTotal;oldTotal = globalTotal;long globalPass = pass.get();long oneSecondPass = globalPass - oldPass;oldPass = globalPass;long globalBlock = block.get();long oneSecondBlock = globalBlock - oldBlock;oldBlock = globalBlock;System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);if (seconds-- <= 0) {stop = true;}}long cost = System.currentTimeMillis() - start;System.out.println("time cost: " + cost + " ms");System.out.println("total: " + total.get() + ", pass:" + pass.get() + ", block:" + block.get());System.exit(0);}}
}

(2)注册熔断降级监听器和加载熔断降级规则

一.Sentinel监听器模式的核心代码回顾

二.注册熔断降级监听器和加载熔断降级规则

三.用于实现Sentinel熔断降级功能的熔断器接口

一.Sentinel监听器模式的核心代码回顾

Sentinel监听器模式会包含三大角色:

角色一:监听器PropertyListener

角色二:监听器管理器SentinelProperty

角色三:规则管理器RuleManager

首先,规则管理器RuleManager在初始化时,会调用监听器管理器SentinelProperty的addListener()方法将监听器PropertyListener注册到监听器管理器SentinelProperty上。

然后,使用方使用具体的规则时,可以通过调用规则管理器RuleManager的loadRules()方法加载规则。加载规则时会调用监听器管理器SentinelProperty的updateValue()方法通知每一个监听器PropertyListener,即通过监听器PropertyListener的configUpdate()方法把规则加载到规则管理器的本地中。

二.注册熔断降级监听器和加载熔断降级规则

DegradeRuleManager中有两个全局的HashMap:一个是用于存放资源和熔断器的对应关系的HashMap—circuitBreakers,另一个是用于存放资源和熔断规则的对应关系的HashMap—ruleMap。

其中熔断器是由熔断策略DegradeRule.grade来决定的。如果熔断策略是慢调用比例,则熔断器是ResponseTimeCircuitBreaker。如果熔断策略是异常比例和异常数,则熔断器是ExceptionCircuitBreaker。

public class DynamicSentinelProperty<T> implements SentinelProperty<T> {protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();private T value = null;...//添加监听器到集合@Overridepublic void addListener(PropertyListener<T> listener) {listeners.add(listener);//回调监听器的configLoad()方法初始化规则配置listener.configLoad(value);}//更新值@Overridepublic boolean updateValue(T newValue) {//如果值没变化,直接返回if (isEqual(value, newValue)) {return false;}RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);//如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值value = newValue;for (PropertyListener<T> listener : listeners) {listener.configUpdate(newValue);}return true;}...
}public final class DegradeRuleManager {//用于存放资源和熔断器的对应关系的HashMap,其中熔断器是由熔断策略DegradeRule.grade来决定的private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();//用于存放资源和熔断规则的对应关系的HashMapprivate static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();private static final RulePropertyListener LISTENER = new RulePropertyListener();private static SentinelProperty<List<DegradeRule>> currentProperty = new DynamicSentinelProperty<>();static {currentProperty.addListener(LISTENER);}...private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {@Overridepublic void configUpdate(List<DegradeRule> conf) {reloadFrom(conf);RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: {}", ruleMap);}@Overridepublic void configLoad(List<DegradeRule> conf) {reloadFrom(conf);RecordLog.info("[DegradeRuleManager] Degrade rules loaded: {}", ruleMap);}private synchronized void reloadFrom(List<DegradeRule> list) {//构建熔断器Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {assert e.getValue() != null && !e.getValue().isEmpty();Set<DegradeRule> rules = new HashSet<>(e.getValue().size());for (CircuitBreaker cb : e.getValue()) {rules.add(cb.getRule());}rm.put(e.getKey(), rules);}DegradeRuleManager.circuitBreakers = cbs;DegradeRuleManager.ruleMap = rm;}private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);if (list == null || list.isEmpty()) {return cbMap;}for (DegradeRule rule : list) {if (!isValidRule(rule)) {RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);continue;}if (StringUtil.isBlank(rule.getLimitApp())) {rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);}CircuitBreaker cb = getExistingSameCbOrNew(rule);if (cb == null) {RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);continue;}String resourceName = rule.getResource();List<CircuitBreaker> cbList = cbMap.get(resourceName);if (cbList == null) {cbList = new ArrayList<>();cbMap.put(resourceName, cbList);}cbList.add(cb);}return cbMap;}}private static CircuitBreaker getExistingSameCbOrNew(DegradeRule rule) {List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());if (cbs == null || cbs.isEmpty()) {return newCircuitBreakerFrom(rule);}for (CircuitBreaker cb : cbs) {if (rule.equals(cb.getRule())) {//Reuse the circuit breaker if the rule remains unchanged.return cb;}}return newCircuitBreakerFrom(rule);}static List<CircuitBreaker> getCircuitBreakers(String resourceName) {return circuitBreakers.get(resourceName);}//Create a circuit breaker instance from provided circuit breaking rule.private static CircuitBreaker newCircuitBreakerFrom(DegradeRule rule) {switch (rule.getGrade()) {case RuleConstant.DEGRADE_GRADE_RT:return new ResponseTimeCircuitBreaker(rule);case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:return new ExceptionCircuitBreaker(rule);default:return null;}}
}

三.用于实现Sentinel熔断降级功能的熔断器接口

进行熔断降级规则验证时,就会根据熔断降级规则的熔断策略,选择对应的熔断器CircuitBreaker。再通过熔断器CircuitBreaker的tryPass()接口,尝试通过当前请求。

//熔断器接口,用于实现Sentinel的熔断降级功能
public interface CircuitBreaker {//Get the associated circuit breaking rule.//获取当前熔断器对应的熔断降级规则DegradeRule getRule();//Acquires permission of an invocation only if it is available at the time of invoking.//尝试通过熔断器//如果熔断器处于关闭状态(CLOSED),则允许请求通过;//如果处于打开状态(OPEN),则拒绝请求;//如果处于半开状态(HALF_OPEN),则根据规则允许部分请求通过;boolean tryPass(Context context);//Get current state of the circuit breaker.//获取当前熔断器的状态(OPEN, HALF_OPEN, CLOSED)State currentState();//Record a completed request with the context and handle state transformation of the circuit breaker.//Called when a passed invocation finished.//在请求完成后调用此方法,用于更新熔断器的统计数据void onRequestComplete(Context context);//Circuit breaker state.enum State {//In OPEN state, all requests will be rejected until the next recovery time point.//表示熔断器处于打开状态,此时会拒绝所有请求OPEN,//In HALF_OPEN state, the circuit breaker will allow a "probe" invocation.//If the invocation is abnormal according to the strategy (e.g. it's slow), //the circuit breaker will re-transform to the OPEN state and wait for the next recovery time point;//otherwise the resource will be regarded as "recovered" and the circuit breaker will cease cutting off requests and transform to CLOSED state. //表示熔断器处于半开状态,此时允许部分请求通过,以检测系统是否已经恢复正常HALF_OPEN,//In CLOSED state, all requests are permitted. //When current metric value exceeds the threshold, the circuit breaker will transform to OPEN state.//表示熔断器处于关闭状态,此时允许所有请求通过CLOSED}
}public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {......
}public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {......
}

(3)DegradeSlot根据熔断降级规则对请求进行验证

一.entry()方法会对请求进行熔断降级的规则验证

二.exit()方法会触发改变熔断器的状态

开始对请求进行规则验证时,需要调用SphU的entry()方法。完成对请求的规则验证后,也需要调用Entry的exit()方法。

一.entry()方法会对请求进行熔断降级的规则验证

在DegradeSlot的entry()方法中,执行熔断降级规则验证的是DegradeSlot的performChecking()方法。该方法首先会根据资源名称从DegradeRuleManager中获取熔断器,然后调用每个熔断器的tryPass()方法判断熔断器开关是否已打开来验证。如果验证通过,则返回true表示放行请求。如果验证不通过,则返回false表示拦截请求。

在判断熔断器开关是否已打开的AbstractCircuitBreaker的tryPass()方法中,首先会判断熔断器是否是关闭状态。如果是关闭状态,则代表没打开熔断器,于是会直接返回true放行请求。如果是打开状态,则要继续判断当前请求是否已达熔断器恢复时间。如果当前请求已达熔断器恢复时间,也就是当前时间大于下次尝试恢复的时间,且成功将熔断器状态从OPEN变为HALF_OPEN,则放行请求,否则拒绝。

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {//验证熔断规则的逻辑performChecking(context, resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void performChecking(Context context, ResourceWrapper r) throws BlockException {//先根据资源名称获取对应的熔断器,也就是从DegradeRuleManager中的Map<String, List<CircuitBreaker>>类型的circuitBreakers获取List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}//调用每个熔断器的tryPass()方法验证当前请求是否可以通过for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) {throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}...
}public abstract class AbstractCircuitBreaker implements CircuitBreaker {protected final DegradeRule rule;private final EventObserverRegistry observerRegistry;//熔断器当前的开关状态protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);//下一次尝试恢复的时间protected volatile long nextRetryTimestamp;...@Overridepublic boolean tryPass(Context context) {//首先判断熔断器是否是关闭状态//如果是关闭状态则代表根本没打开熔断器,也就不涉及熔断了,因此直接返回true放行当前请求if (currentState.get() == State.CLOSED) {return true;}//如果熔断器是打开状态,那么就要进行如下逻辑判断if (currentState.get() == State.OPEN) {//如果当前系统时间大于等于下一次尝试恢复的时间,即已到达可尝试恢复的时间且成功设置当前熔断器状态为半开启,则可以放行当前请求//也就是如果此次请求已达到了熔断器恢复时间,并且将熔断器的状态从打开变为半开启,则放行,反之拒绝return retryTimeoutArrived() && fromOpenToHalfOpen(context);}return false;}protected boolean retryTimeoutArrived() {//当前时间是否大于下一次尝试恢复的时间return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;}protected boolean fromOpenToHalfOpen(Context context) {//将当前熔断器的状态从OPEN变为HALF_OPENif (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {//通过观察者模式通知各个观察者notifyObservers(State.OPEN, State.HALF_OPEN, null);...return true;}return false;}private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {observer.onStateChange(prevState, newState, rule, snapshotValue);}}...
}

二.exit()方法会触发改变熔断器的状态

问题一:熔断器的tryPass()方法一开始就会判断熔断器的状态,那么熔断器何时打开、何时关闭?

答:这个会在配置熔断降级规则DegradeRule时指定,如下图就指定了:如果最近10000ms内,10个请求中有2个是异常的,则触发熔断。

问题二:如果熔断器状态为打开,判断时会比较下一次尝试恢复时间和当前时间。如果当前时间大于下一次尝试恢复的时间,意味着请求已超过熔断时间,即当前请求不再处于熔断时间段内,因此可以放行。那么下一次尝试恢复的时间nextRetryTimestamp会在何时更新?

异常数据的采集和下次恢复时间的更新会由DegradeSlot的exit()方法触发。因为开始对请求进行规则验证时,会调用SphU的entry()方法。但完成对请求的规则验证后,则会调用Entry的exit()方法,而Entry的exit()方法最终就会执行到DegradeSlot的exit()方法。

在DegradeSlot的exit()方法中,就会调用熔断器的onRequestComplete()方法来进行计数,并改变熔断器的状态。

比如在执行ExceptionCircuitBreaker的onRequestComplete()方法中,会先统计异常数errorCount和总请求数totalCount,然后根据熔断降级的规则判断是否达到打开或关闭熔断器的阈值,最后执行比如AbstractCircuitBreaker的transformToOpen()方法打开熔断器。

AbstractCircuitBreaker.transformToOpen()方法的主要工作是:首先将当前熔断器状态变更为OPEN,然后更新下一次尝试恢复时间nextRetryTimestamp,最后通过观察者设计模式通知各个观察者。

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {...@Overridepublic void exit(Context context, ResourceWrapper r, int count, Object... args) {Entry curEntry = context.getCurEntry();if (curEntry.getBlockError() != null) {fireExit(context, r, count, args);return;}List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {fireExit(context, r, count, args);return;}//如果没报错,那么就调用熔断器的onRequestComplete()方法来计数if (curEntry.getBlockError() == null) {// passed requestfor (CircuitBreaker circuitBreaker : circuitBreakers) {circuitBreaker.onRequestComplete(context);}}fireExit(context, r, count, args);}...
}public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {...@Overridepublic void onRequestComplete(Context context) {Entry entry = context.getCurEntry();if (entry == null) {return;}Throwable error = entry.getError();//获取当前值SimpleErrorCounter counter = stat.currentWindow().value();//如果此次请求报错了,则将errorCount + 1if (error != null) {counter.getErrorCount().add(1);}//将totalCount总数 + 1,用于计算异常比例counter.getTotalCount().add(1);handleStateChangeWhenThresholdExceeded(error);}private void handleStateChangeWhenThresholdExceeded(Throwable error) {//如果当前熔断器已经打开了,则直接返回if (currentState.get() == State.OPEN) {return;}//如果当前熔断器是半开启状态if (currentState.get() == State.HALF_OPEN) {//如果本次请求没出现异常,则代表可以关闭熔断器了if (error == null) {//调用AbstractCircuitBreaker.fromHalfOpenToClose()关闭熔断器fromHalfOpenToClose();} else {//如果本次请求还是异常,就继续熔断//即调用AbstractCircuitBreaker.fromHalfOpenToOpen()方法打开熔断器fromHalfOpenToOpen(1.0d);}return;}List<SimpleErrorCounter> counters = stat.values();//异常数量long errCount = 0;//请求总数long totalCount = 0;for (SimpleErrorCounter counter : counters) {errCount += counter.errorCount.sum();totalCount += counter.totalCount.sum();}//如果请求总数没超过最小请求数,那直接放行if (totalCount < minRequestAmount) {return;}double curCount = errCount;//熔断策略为异常比例if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {//计算百分比curCount = errCount * 1.0d / totalCount;}//当错误率或者错误数大于阈值,则打开熔断器if (curCount > threshold) {//调用AbstractCircuitBreaker.transformToOpen()方法打开熔断器transformToOpen(curCount);}}...
}public abstract class AbstractCircuitBreaker implements CircuitBreaker {private final EventObserverRegistry observerRegistry;//熔断器当前的开关状态protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);//下一次尝试恢复的时间protected volatile long nextRetryTimestamp;...protected void transformToOpen(double triggerValue) {State cs = currentState.get();switch (cs) {case CLOSED:fromCloseToOpen(triggerValue);break;case HALF_OPEN:fromHalfOpenToOpen(triggerValue);break;default:break;}}protected boolean fromHalfOpenToClose() {if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {resetStat();notifyObservers(State.HALF_OPEN, State.CLOSED, null);return true;}return false;}protected boolean fromHalfOpenToOpen(double snapshotValue) {if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {updateNextRetryTimestamp();notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);return true;}return false;}private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {observer.onStateChange(prevState, newState, rule, snapshotValue);}}...
}

(4)总结

一.Sentinel熔断降级的两种熔断策略

二.Sentinel熔断降级的流程

三.熔断器的三个状态

四.熔断器三个状态之间的流转过程

一.Sentinel熔断降级的两种熔断策略

策略一:异常熔断器ExceptionCircuitBreaker

异常熔断器关注错误数量、错误比例,其核心功能是在请求结束时更新计数器,统计异常数和总请求数。当达到阈值时,熔断器的状态从CLOSED变为OPEN。

策略二:慢调用比例熔断器ResponseTimeCircuitBreaker

慢调用比例熔断器关注响应时间RT。它计算请求结束时间与请求开始时间的差值,然后与用户设置的阈值比较。若达到阈值,熔断器状态将从CLOSED变为OPEN。

二.Sentinel熔断降级的流程

步骤一:计数

比如统计错误数、响应时间rt。

步骤二:对比

将计数结果和用户设置的熔断阈值做对比,如达到阈值则打开熔断器。

步骤三:验证

请求进来时就可以直接判断熔断器是否是打开状态。如果是打开状态,则直接拒绝请求。如果是关闭状态,则直接放行请求。如果是半打开状态,则进行二次验证,看看是否能放行请求。

三.熔断器的三个状态

状态一:CLOSED,关闭状态

当熔断器处于CLOSED状态时,表示系统正常运行,没有发生熔断。此时,熔断器会对请求进行正常计数和统计。如果统计结果表明异常数/比例或者慢调用比例超过了预设阈值,熔断器将切换至OPEN状态,触发熔断。

状态二:OPEN,打开状态

当熔断器处于OPEN状态时,系统进入熔断状态。在这个状态下,熔断器会拒绝所有新的请求,直接返回预定义的降级策略。在熔断器打开一段时间后(通常由用户设置),熔断器会尝试从OPEN状态切换到HALF_OPEN状态,看系统是否已恢复。

状态三:HALF_OPEN,半开启状态

当熔断器处于HALF_OPEN状态时,系统将允许有限数量的请求通过。如果这些请求成功,熔断器将认为系统已经恢复,然后切回CLOSED状态。如果这些请求异常,熔断器会认为系统未恢复,切回OPEN状态继续熔断。

四.熔断器三个状态之间的流转过程

过程一:从CLOSED到OPEN

当异常数/比例或慢调用比例超过阈值时。

过程二:从OPEN到HALF_OPEN

在熔断器打开一段时间后,尝试恢复系统。

过程三:从HALF_OPEN到CLOSED

当允许的有限数量请求成功时。

过程四:从HALF_OPEN到OPEN

当允许的有限数量请求仍然出现异常时,HALF_OPEN就好比一个中间态。

这种流转机制确保了在系统出现问题时,熔断器能够自动进行熔断保护,同时在系统恢复后能够及时恢复正常运行。

相关文章:

  • 【kubernetes】pod资源配额
  • 5G网络切片:精准分配资源,提升网络效率的关键技术
  • 基于 LWE 的格密码python实战
  • (done) 吴恩达版提示词工程 1. 引言 (Base LLM 和 Instruction Tuned LLM)
  • visual studio无法跳转到函数定义、变量定义、跳转函数位置不准问题解决
  • Java链表反转方法详解
  • 注意力机制(np计算示例)单头和多头
  • 信息系统项目管理工程师备考计算类真题讲解五
  • 用python脚本怎么实现:把一个文件夹里面.png文件没有固定名称,复制到另外一个文件夹按顺序命名?
  • 基于Django的AI客服租车分析系统
  • Linux 常用命令 -pkill【通过进程名或其他属性来发送信号给一个或多个进程】
  • 2025.4.14-2025.4.20学习周报
  • 宝塔面板部署 Dify-latest 最新版本
  • QML 字符串格式化
  • Matplotlib的应用
  • Matlab FCM模糊聚类
  • SmolVLM2: The Smollest Video Model Ever(五)
  • 2020 年 7 月大学英语四级考试真题(组合卷)——解析版
  • String +memset字符串类题型【C++】
  • c语言修炼秘籍 - - 禁(进)忌(阶)秘(技)术(巧)【第六式】文件操作
  • 以优良作风激发改革发展动力活力,中管企业扎实开展深入贯彻中央八项规定精神学习教育
  • 女子伸腿阻止列车关门等待同行人员,被深圳铁路警方行政拘留
  • 市民建议公交广播增加“请勿大声喧哗”提示,上海交通委回复
  • 白兰花香飘京城,上海文化体验项目点亮中华民族共同体之美
  • 在没有穹顶的剧院,和春天的音乐会来一场约会
  • 日薪100元散发“引流小卡片”,上海浦东警方抓获2名违法人员