当前位置: 首页 > news >正文

Sentinel源码—6.熔断降级和数据统计的实现二

大纲

1.DegradeSlot实现熔断降级的原理与源码

2.Sentinel数据指标统计的滑动窗口算法

2.Sentinel数据指标统计的滑动窗口算法

(1)滑动窗口介绍

(2)StatisticSlot使用滑动窗口算法进行数据统计

(1)滑动窗口介绍

一.滑动窗口原理

滑动窗口不会指定固定的时间窗口起点与终点,而是将处理请求的时间点作为该请求对应时间窗口的终点,起点则是向前距离该终点一个时间窗口长度的时间点。

二.滑动窗口的性能问题(样本窗口解决)

由于每到来一个请求,就会移动一下统计的时间窗口。如果先后到来的两个请求的相隔时间在一个时间窗口长度之内,那么分别在这个两个请求对应的时间窗口下统计请求数时就会出现重复计数的问题。如果在一个时间窗口长度内出现了大量的请求,则会进行大量重复计算从而浪费资源。

为了解决这个问题,就需要更细粒度的计算,比如引入样本窗口。样本窗口的长度会小于滑动窗口的长度,通常滑动窗口的长度是样本窗口的整数倍。每个样本窗口在到达终点时间时,会统计其中的请求数并进行记录。这样统计请求对应的时间窗口的请求数时,就可复用样本窗口的数据了。

所以,通过多个样本窗口组成滑动窗口,可以解决滑动窗口的性能问题。

(2)StatisticSlot使用滑动窗口算法进行数据统计

一.StatisticNode为了实现统计数据而进行的设计

二.LeapArray实现滑动窗口算法的数据统计逻辑

一.StatisticNode为了实现统计数据而进行的设计

首先StatisticSlot的entry()方法会调用DefaultNode的addPassRequest()方法,接着DefaultNode的addPassRequest()方法又会调用StatisticNode的addPassRequest()方法,而StatisticNode的addPassRequest()方法便会通过使用滑动窗口算法来统计数据。

StatisticNode中会定义一个用来保存数据的ArrayMetric对象。创建该对象时默认就指定了样本窗口数量为2,时间窗口长度为1000ms。其中,ArrayMetric对象中的data属性会真正用来存储数据,而ArrayMetric对象中的data属性则是一个LeapArray对象。

在LeapArray对象中会详细记录:样本窗口长度、样本窗口数量、滑动窗口长度、样本窗口数组。LeapArray的array属性便是用来统计并保存数据的WindowWrap数组,WindowWrap数组也就是样本窗口数组。

WindowWrap有一个巧妙的设计:就是使用LongAdder数组而不是用LongAdder来存储统计数据。由于统计的数据是多维度的,且MetricEvent枚举类定义了这些维度类型,因此将MetricEvent维度类型枚举值对应的序号映射成数组索引,可以巧妙地将多维度的数据存储到LongAdder数组中。

@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {...//执行下一个ProcessorSlot,先进行规则验证等fireEntry(context, resourceWrapper, node, count, prioritized, args);//如果通过了后面ProcessorSlot的验证//则将处理当前资源resourceWrapper的线程数+1 以及 将对当前资源resourceWrapper的成功请求数+1node.increaseThreadNum();node.addPassRequest(count);...}
}//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes. 
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {//Associated cluster node.private ClusterNode clusterNode;...//DefaultNode会统计名字相同的Context下的某个资源的调用数据,按照单机里的资源维度进行调用数据统计//EntranceNode会统计名字相同的Context下的全部资源的调用数据,按接口维度来统计调用数据,即统计接口下所有资源的调用情况//ClusterNode会统计某个资源在全部Context下的调用数据,按照集群中的资源维度进行调用数据统计@Overridepublic void addPassRequest(int count) {//增加当前资源对应的DefaultNode中的数据super.addPassRequest(count);//增加当前资源对应的ClusterNode中的全局统计数据this.clusterNode.addPassRequest(count);}...
}//The statistic node keep three kinds of real-time statistics metrics:
//1.metrics in second level rollingCounterInSecond
//2.metrics in minute level rollingCounterInMinute
//3.thread count//Sentinel use sliding window to record and count the resource statistics in real-time.
//The sliding window infrastructure behind the ArrayMetric is LeapArray.//case 1: When the first request comes in, 
//Sentinel will create a new window bucket of a specified time-span to store running statics, 
//such as total response time(rt), incoming request(QPS), block request(bq), etc. 
//And the time-span is defined by sample count.
//     0      100ms
//  +-------+--→ Sliding Windows
//         ^
//         |
//       request
//Sentinel use the statics of the valid buckets to decide whether this request can be passed.
//For example, if a rule defines that only 100 requests can be passed,
//it will sum all qps in valid buckets, and compare it to the threshold defined in rule.//case 2: continuous requests
//  0    100ms    200ms    300ms
//  +-------+-------+-------+-----→ Sliding Windows
//                      ^
//                      |
//                   request//case 3: requests keeps coming, and previous buckets become invalid
//  0    100ms    200ms      800ms       900ms  1000ms    1300ms
//  +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ Sliding Windows
//                                                      ^
//                                                      |
//                                                    request//The sliding window should become:
// 300ms     800ms  900ms  1000ms  1300ms
//  + ...... +-------+ ...... +-------+-----→ Sliding Windows
//                                                      ^
//                                                      |
//                                                    request
public class StatisticNode implements Node {//Holds statistics of the recent INTERVAL milliseconds.//The INTERVAL is divided into time spans by given sampleCount.//定义一个保存数据的ArrayMetric,指定了样本窗口数量默认为2(SAMPLE_COUNT),指定了时间窗口长度默认为1000ms(INTERVAL)private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);//Holds statistics of the recent 60 seconds. //The windowLengthInMs is deliberately set to 1000 milliseconds,//meaning each bucket per second, in this way we can get accurate statistics of each second.private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);...@Overridepublic void addPassRequest(int count) {//调用ArrayMetric.addPass()方法,根据当前请求增加计数rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}...
}//The basic metric class in Sentinel using a BucketLeapArray internal.
public class ArrayMetric implements Metric {//用于存储统计数据private final LeapArray<MetricBucket> data;...@Overridepublic void addPass(int count) {//1.通过LeapArray.currentWindow()方法获取当前时间所在的样本窗口WindowWrap<MetricBucket> wrap = data.currentWindow();//2.调用MetricBucket.addPass()方法将当前请求的计数量添加到样本窗口的统计数据中wrap.value().addPass(count);}...
}//Basic data structure for statistic metrics in Sentinel.
//Leap array use sliding window algorithm to count data. 
//Each bucket cover windowLengthInMs time span, and the total time span is intervalInMs, 
//so the total bucket amount is: sampleCount = intervalInMs / windowLengthInMs.
public abstract class LeapArray<T> {//样本窗口的长度protected int windowLengthInMs;//一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度protected int sampleCount;//滑动窗口长度protected int intervalInMs;//也是滑动窗口长度,只是单位为sprivate double intervalInSecond;//WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket//LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T>protected final AtomicReferenceArray<WindowWrap<T>> array;//The total bucket count is: sampleCount = intervalInMs / windowLengthInMs.//@param sampleCount  bucket count of the sliding window//@param intervalInMs the total time interval of this LeapArray in millisecondspublic LeapArray(int sampleCount, int intervalInMs) {...this.windowLengthInMs = intervalInMs / sampleCount;//默认为500msthis.intervalInMs = intervalInMs;//默认为1000msthis.intervalInSecond = intervalInMs / 1000.0;//默认为1this.sampleCount = sampleCount;//默认为2this.array = new AtomicReferenceArray<>(sampleCount);}//Get the bucket at current timestamp.//获取当前时间点所在的样本窗口public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());}...
}//Wrapper entity class for a period of time window.
//样本窗口类,泛型T比如是MetricBucket
public class WindowWrap<T> {//Time length of a single window bucket in milliseconds.//单个样本窗口的长度private final long windowLengthInMs;//Start timestamp of the window in milliseconds.//样本窗口的起始时间戳private long windowStart;//Statistic data.//当前样本窗口的统计数据,类型为MetricBucketprivate T value;...//返回比如MetricBucket对象public T value() {return value;}
}//Represents metrics data in a period of time span.
//统计数据的封装类
public class MetricBucket {//统计的数据会存放在LongAdder数组里//使用数组而不直接使用"LongAdder+1"是因为://由于统计的数据是多维度的,并且MetricEvent枚举类定义了这些维度类型//因此将MetricEvent维度类型枚举值对应的序号映射成数组索引,巧妙地将多维度的数据定义在LongAdder数组中private final LongAdder[] counters;private volatile long minRt;public MetricBucket() {MetricEvent[] events = MetricEvent.values();this.counters = new LongAdder[events.length];for (MetricEvent event : events) {counters[event.ordinal()] = new LongAdder();}initMinRt();}private void initMinRt() {this.minRt = SentinelConfig.statisticMaxRt();}public void addPass(int n) {add(MetricEvent.PASS, n);}public MetricBucket add(MetricEvent event, long n) {//统计数据并存储到counters中counters[event.ordinal()].add(n);return this;}...
}public enum MetricEvent {PASS,BLOCK,EXCEPTION,SUCCESS,RT,OCCUPIED_PASS
}

二.LeapArray实现滑动窗口算法的数据统计逻辑

调用ArrayMetric的addPass()进行数据统计的逻辑如下:首先通过LeapArray的currentWindow()方法获取当前时间所在的样本窗口,然后调用MetricBucket的addPass()方法统计并存储数据到样本窗口中。

LeapArray的currentWindow()方法获取当前时间所在的样本窗口的逻辑为:

情况一:如果当前时间所在的样本窗口如果还没创建,则需要初始化。

情况二:如果当前样本窗口的起始时间与计算出的样本窗口起始时间相同,则说明这两个是同一个样本窗口,直接获取就行。

情况三:如果当前样本窗口的起始时间大于计算出的样本窗口起始时间,则说明计算出的样本窗口已过时,要将原来的样本窗口替换为新样本窗口。注意LeapArray.array数组是一个环形数组。

情况四:如果当前样本窗口的起始时间小于计算出的样本窗口起始时间,一般不出现,因为时间不会倒流,除非人为修改系统时间导致时钟回拨。

public abstract class LeapArray<T> {//样本窗口的长度protected int windowLengthInMs;//一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度protected int sampleCount;//滑动窗口长度protected int intervalInMs;//也是滑动窗口长度,只是单位为sprivate double intervalInSecond;//WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket//LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T>protected final AtomicReferenceArray<WindowWrap<T>> array;...//假设timeMillis = 1600,windowLengthInMs = 500,array.length = 2,那么timeId = 3,返回1private int calculateTimeIdx(/*@Valid*/ long timeMillis) {long timeId = timeMillis / windowLengthInMs;//Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}//假设timeMillis = 1600,windowLengthInMs = 500,那么返回1500protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;}//Get bucket item at provided timestamp.public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}//计算当前时间所在的样本窗口id,也就是样本窗口的下标,即计算在数组LeapArray中的下标int idx = calculateTimeIdx(timeMillis);//Calculate current bucket start time.//计算当前样本窗口的开始时间点long windowStart = calculateWindowStart(timeMillis);//Get bucket item at given time from the array.//(1) Bucket is absent, then just create a new bucket and CAS update to circular array.//(2) Bucket is up-to-date, then just return the bucket.//(3) Bucket is deprecated, then reset current bucket.while (true) {//获取当前时间所在的样本窗口WindowWrap<T> old = array.get(idx);//如果当前时间所在的样本窗口为null,则需要创建if (old == null) {//创建一个时间窗口//     B0       B1      B2    NULL      B4// ||_______|_______|_______|_______|_______||___// 200     400     600     800     1000    1200  timestamp//                             ^//                          time=888//            bucket is empty, so create new and update//If the old bucket is absent, then we create a new bucket at windowStart,//then try to update circular array via a CAS operation. //Only one thread can succeed to update, while other threads yield its time slice.WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));//通过CAS将新创建的窗口放入到LeapArray中if (array.compareAndSet(idx, null, window)) {//Successfully updated, return the created bucket.return window;} else {//Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}}//如果当前样本窗口的起始时间与计算出的样本窗口起始时间相同,则说明这两个是同一个样本窗口,直接获取就行else if (windowStart == old.windowStart()) {//     B0       B1      B2     B3      B4// ||_______|_______|_______|_______|_______||___// 200     400     600     800     1000    1200  timestamp//                             ^//                          time=888//            startTime of Bucket 3: 800, so it's up-to-date//If current windowStart is equal to the start timestamp of old bucket,//that means the time is within the bucket, so directly return the bucket.return old;}//如果当前样本窗口的起始时间大于计算出的样本窗口起始时间,则说明计算出来的样本窗口已经过时了,需要将原来的样本窗口替换为新的样本窗口//数组的环形数组,不是无限长的,比如存1s,1000个样本窗口,那么下1s的1000个时间窗口会覆盖上一秒的else if (windowStart > old.windowStart()) {//   (old)//             B0       B1      B2    NULL      B4// |_______||_______|_______|_______|_______|_______||___// ...    1200     1400    1600    1800    2000    2200  timestamp//                              ^//                           time=1676//          startTime of Bucket 2: 400, deprecated, should be reset//If the start timestamp of old bucket is behind provided time, that means the bucket is deprecated. //We have to reset the bucket to current windowStart.//Note that the reset and clean-up operations are hard to be atomic,//so we need a update lock to guarantee the correctness of bucket update.//The update lock is conditional (tiny scope) and will take effect only when bucket is deprecated, //so in most cases it won't lead to performance loss.if (updateLock.tryLock()) {try {//Successfully get the update lock, now we reset the bucket.//替换老的样本窗口return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {//Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}}//如果当前样本窗口的起始时间小于计算出的样本窗口起始时间//这种情况一般不会出现,因为时间不会倒流,除非人为修改系统时间导致时钟回拨else if (windowStart < old.windowStart()) {//Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}...
}

相关文章:

  • 【数据结构入门训练DAY-18】信息学奥赛一本通T1331-后缀表达式的值
  • React 打包
  • Python数据可视化领域的卓越工具:深入剖析Seaborn、Plotly与Pyecharts
  • 使用LSTM动态调整SIMPLE算法松弛因子的CFD仿真训练程序
  • B+树删除和测试
  • seate TCC模式案例
  • vue3 toRefs 与 toRef的使用
  • SpringCloud概述和环境搭建
  • Vue3 响应式原理: Proxy 数据劫持详解
  • 命令行参数·环境变量·进程地址空间(linux+C/C++)
  • 【Rust 精进之路之第14篇-结构体 Struct】定义、实例化与方法:封装数据与行为
  • STM32开发过程中碰到的问题总结 - 4
  • C++:详解命名空间
  • Chromium 134 编译指南 Ubuntu篇:环境搭建与源码获取(一)
  • Cesium 地形加载
  • 2025年渗透测试面试题总结-拷打题库07(题目+回答)
  • 性能比拼: Go vs Bun
  • PICO4 Ultra MR开发 空间网格扫描 模型导出及预览
  • 【25软考网工】第二章(8)差错控制、奇偶校验、CRC、海明码
  • DAY6:从执行计划到索引优化的完整指南
  • 两岸基层民生发展交流会在浙江开幕
  • 张九思任电子科大副教授,曾以学生身份入选爱思唯尔全球前2%顶尖科学家
  • 航行警告!南海部分水域进行军事训练,禁止驶入
  • 两日内连续施压,特朗普再次喊话美联储降息
  • 非法收受财物2.29亿余元,窦万贵受贿案一审开庭
  • 接续《莱茵的黄金》,国家大剧院带来7国艺术家的《女武神》