境外电商平台排行榜,淄博网站建设优化,热 综合-网站正在建设中-手机版,市场营销策划方案格式模板前言#xff1a;
Sentinel 的一个重要功能就是限流#xff0c;对于限流来说有多种的限流算法#xff0c;比如滑动时间窗口算法、漏桶算法、令牌桶算法等#xff0c;Sentinel 对这几种算法都有具体的实现#xff0c;如果我们对某一个资源设置了一个流控规则#xff0c;并…前言
Sentinel 的一个重要功能就是限流对于限流来说有多种的限流算法比如滑动时间窗口算法、漏桶算法、令牌桶算法等Sentinel 对这几种算法都有具体的实现如果我们对某一个资源设置了一个流控规则并且选择的流控模式是“快速失败”那么 Sentinel 就会采用滑动时间窗口算法来作为该资源的限流算法本篇我们来分析 Sentinel 中滑动时间窗口算法的实现。
Sentinel 系列文章传送门
Sentinel 初步认识及使用
Sentinel 核心概念和工作流程详解
Spring Cloud 整合 Nacos、Sentinel、OpenFigen 实战【微服务熔断降级实战】
Sentinel 源码分析入门【Entry、Chain、Context】
Sentine 源码分析之–NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot
Sentine 源码分析之–AuthoritySlot、SystemSlot、GatewayFlowSlot
Sentine 源码分析之–ParamFlowSlot
Sentine 源码分析之–FlowSlot
滑动窗口的原理
滑动窗口是一种常用的算法用于统计一段时间内的事件或数据点在限流场景中滑动窗口将时间窗口分割成多个小的时间片段通常称为桶也可以叫做样本窗口每个时间片段独立统计随着时间的推移最旧的时间片段的数据会被新的时间片段替换形成“滑动”的效果在具体实现上滑动时间窗算法可以通过多种数据结构来实现例如使用环形数组、哈希表等可以使用一个环形数组来存储时间窗口内的数据点数组的大小等于时间窗口的大小每当有新的数据点进入时旧的对应时间点的数据将被覆盖从而实现滑动时间窗的效果此外也可以使用哈希表结构来实现滑动时间窗口其中键为时间点值为该时间点的数据值或变化量。
滑动窗口的优点
实现更细粒度的时间控制与固定窗口整个时间窗口只统计一次相比滑动窗口通过连续滑动减少了窗口切换时的流量突变避免了请求在窗口刚开始时因为累积的计数而被误判为超限。减少突发流量对系统的影响保证服务的稳定性和可靠性在实际应用中流量往往呈现出突发性特征如果使用固定窗口算法在窗口重置的瞬间可能会接受大量请求时间窗口的起始点聚集大量流量造成短时间内的服务压力滑动窗口可以更均匀、更细粒度的控制每个时间片段内的流量从而降低了因突发流量导致的导致的系统压力。提高系统响应的实时性滑动窗口提供了更实时的流量数据系统能够基于最实时的流量情况做出响应这对于需要快速适应流量变化的在线服务尤其重要可以即时调整资源分配和访问策略。
Sentinel 滑动时间窗口的实现
Sentinel 官网的图就很清楚的告诉了我们 Sentinel 使用环形数组实现滑动窗口下图中的右上角就是滑动窗口的示意图是 StatisticSlot 的具体实现底层采用的是 LeapArray 来统计实时的秒级指标数据可以很好地支撑写多于读的高并发场景。 滑动窗口的核心数据结构
ArrayMetric滑动窗口核心实现类。LeapArray滑动窗口顶层数据结构主要存储窗口数据。WindowWrap每一个滑动窗口的包装类其内部的数据结构用 MetricBucket 表示。MetricBucket指标桶例如通过数量、阻塞数量、异常数量、成功数量、响应时间已通过未来配额抢占下一个滑动窗口的数量。MetricEvent指标类型例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。
ArrayMetric 构造方法源码解析
ArrayMetric 是滑动窗口的入口类实现了 Metric 接口该接口主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量TPS、响应时间等ArrayMetric 提供了两个构造方法两个构造方法的区别是在于当前时间窗口达到限制之后是否可以抢占下一个时间窗口具体逻辑如下
intervalInMs滑动窗口的总时间例如 1 分钟、1 秒中。sampleCount在一个滑动窗口的总时间中的抽样的个数默认为 2即一个滑动窗口的总时间包含两个相等的区间一个区间就是一个窗口。enableOccupy是否允许抢占即当前滑动窗口已经达到限制后是否可以占用下一个时间窗口的容量。
public class ArrayMetric implements Metric {private final LeapArrayMetricBucket data;//com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#ArrayMetric(int, int)public ArrayMetric(int sampleCount, int intervalInMs) {//默认是可抢占的this.data new OccupiableBucketLeapArray(sampleCount, intervalInMs);}//com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#ArrayMetric(int, int, boolean)public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {//当前时间窗口容量满了 是否可抢占时间窗口if (enableOccupy) {//可抢占this.data new OccupiableBucketLeapArray(sampleCount, intervalInMs);} else {//不可抢占this.data new BucketLeapArray(sampleCount, intervalInMs);}}
}
LeapArray 源码分析
LeapArray 用来存储滑动窗口数据的也就是所谓的环形数组具体成员变量如下
windowLengthInMs样本窗口的时间间隔单位秒。sampleCount样本窗口数量。intervalInMs一个滑动窗口跨越的时间长度也就是总时间窗口。array样本窗口的集合使用 AtomicReferenceArray 保证原子性。
public abstract class LeapArrayT {//样本窗口的时间间隔 单位秒protected int windowLengthInMs;//样本窗口数量protected int sampleCount;//毫秒为单位 一个滑动窗口跨越的时间长度 也就是总时间窗口protected int intervalInMs;//样本窗口的集合 使用 AtomicReferenceArray 保证原子性protected final AtomicReferenceArrayWindowWrapT array;/*** The conditional (predicate) update lock is used only when current bucket is deprecated.*/private final ReentrantLock updateLock new ReentrantLock();/*** The total bucket count is: {code sampleCount intervalInMs / windowLengthInMs}.** param sampleCount bucket count of the sliding window* param intervalInMs the total time interval of this {link LeapArray} in milliseconds*/public LeapArray(int sampleCount, int intervalInMs) {AssertUtil.isTrue(sampleCount 0, bucket count is invalid: sampleCount);AssertUtil.isTrue(intervalInMs 0, total time interval of the sliding window should be positive);AssertUtil.isTrue(intervalInMs % sampleCount 0, time span needs to be evenly divided);this.windowLengthInMs intervalInMs / sampleCount;this.intervalInMs intervalInMs;this.sampleCount sampleCount;this.array new AtomicReferenceArray(sampleCount);}}
MetricBucket 源码分析
MetricBucket 统计一个时间窗口内的各项指标数据例如异常总数、成功总数等Bucket 使用 LongAdder 数组记录一段时间内的各项指标MetricBucket 包含一个 LongAdder 数组数组的每个元素代表一类 MetricEvent。LongAdder 保证了数据修改的原子性。
public class MetricBucket {//记录各事件的计数 异常总数 成功总数等private final LongAdder[] counters;//最小耗时 默认值 5 秒private volatile long minRt;//构造方法public MetricBucket() {//遍历各种事件MetricEvent[] events MetricEvent.values();//创建 数组this.counters new LongAdder[events.length];for (MetricEvent event : events) {//事件加入数组counters[event.ordinal()] new LongAdder();}//初始化最小事件initMinRt();}
}public enum MetricEvent {/*** Normal pass.*/PASS,/*** Normal block.*/BLOCK,EXCEPTION,SUCCESS,RT,/*** Passed in future quota (pre-occupied, since 1.5.0).*/OCCUPIED_PASS
}
WindowWrap 源码解析
MetricBucket 自身不保存时间窗口信息因此 Sentinel 给 Bucket 加了一个包装类 WindowWrapMetricBucket 用于统计各项指标数据WindowWrap 用于记录 MetricBucket 时间窗口信息具体属性如下
windowLengthInMs单个时间窗口的时间长度也就是样本窗口的时间长度。windowStart样本窗口的起始时间。value样本窗口统计数据。
public class WindowWrapT {/*** Time length of a single window bucket in milliseconds.*///单个时间窗口的时间长度 也就是样本窗口的时间长度private final long windowLengthInMs;/*** Start timestamp of the window in milliseconds.*///样本窗口的起始时间private long windowStart;/*** Statistic data.*///样本窗口统计数据private T value;/*** param windowLengthInMs a single window buckets time length in milliseconds.* param windowStart the start timestamp of the window* param value statistic data*/public WindowWrap(long windowLengthInMs, long windowStart, T value) {this.windowLengthInMs windowLengthInMs;this.windowStart windowStart;this.value value;}}
至此Sentinel 滑动时间窗口的基本实现我们已经了解了下面我们来分析一下 Sentinel 具体是如果使用这个滑动时间窗口的。
StatisticSlot#entry 方法源码解析
我们知道 StatisticSlot 是 Sentinel 的核心功能插槽之一用于统计实时的调用数据前面系列文章已经分析过这里重点关注这行代码即可 node.addPassRequest(count)。 //com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.//放行到下一个 slot 做限流 降级 等规则判断fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.//请求已通过 线程数1 用做线程隔离node.increaseThreadNum();//请求通过 计数器1 用做限流node.addPassRequest(count);//请求来源节点判断if (context.getCurEntry().getOriginNode() ! null) {// Add count for origin node.//来源节点不为空 来源节点的 线程数 和 计数器 也1context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}//是否是入口资源类型if (resourceWrapper.getEntryType() EntryType.IN) {// Add count for global inbound entry node for global statistics.//如果是入口资源类型 全局线程数 和 计数器 也要1Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.//请求通过后回调for (ProcessorSlotEntryCallbackDefaultNode handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {//优先级等待异常这里没有增加请求失败的数量node.increaseThreadNum();//请求来源节点判断if (context.getCurEntry().getOriginNode() ! null) {// Add count for origin node.//来源节点不为空 来源节点的 线程数 1context.getCurEntry().getOriginNode().increaseThreadNum();}//是否是入口资源类型if (resourceWrapper.getEntryType() EntryType.IN) {// Add count for global inbound entry node for global statistics.//如果是入口资源类型 全局线程数 1Constants.ENTRY_NODE.increaseThreadNum();}// Handle pass event with registered entry callback handlers.//请求通过后回调for (ProcessorSlotEntryCallbackDefaultNode handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (BlockException e) {// Blocked, set block exception to current entry.//阻塞 没有通过异常 将异常信息保存在 当前的 entry 中context.getCurEntry().setBlockError(e);// Add block count.//增加阻塞数量node.increaseBlockQps(count);//请求来源节点判断if (context.getCurEntry().getOriginNode() ! null) {//请求来源节点 阻塞数量 1context.getCurEntry().getOriginNode().increaseBlockQps(count);}//是否是入口资源类型if (resourceWrapper.getEntryType() EntryType.IN) {// Add count for global inbound entry node for global statistics.//如果是入口资源类型 全局阻塞数 1Constants.ENTRY_NODE.increaseBlockQps(count);}// Handle block event with registered entry callback handlers.//回调for (ProcessorSlotEntryCallbackDefaultNode handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;} catch (Throwable e) {// Unexpected internal error, set error to current entry.//错误设置到 当前 Entrycontext.getCurEntry().setError(e);throw e;}
}
DefaultNode#addPassRequest 方法源码解析
DefaultNode#addPassRequest 方法没有复杂的逻辑只是调用了 StatisticNode#addPassRequest 方法我们接着分析。
//com.alibaba.csp.sentinel.node.DefaultNode#addPassRequest
Override
public void addPassRequest(int count) {super.addPassRequest(count);this.clusterNode.addPassRequest(count);
}
StatisticNode#addPassRequest 方法源码解析
StatisticNode#addPassRequest 方法分别对分钟级时间窗口和秒级时间窗口进行了处理我们选择一个分析即可底层逻辑是一样的。
//com.alibaba.csp.sentinel.node.StatisticNode#addPassRequest
Override
public void addPassRequest(int count) {//秒级时间窗口 500ms 一个样本rollingCounterInSecond.addPass(count);//分钟级时间窗口 每秒一个样本rollingCounterInMinute.addPass(count);
}
ArrayMetric#addPass 方法源码解析
ArrayMetric#addPass 方法主要是获取事件窗口并对时间窗口中的对应值进行增加操作。
//com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#addPass
Override
public void addPass(int count) {//获取当前的滑动时间窗口WindowWrapMetricBucket wrap data.currentWindow();//时间窗口中的对应位置的值countwrap.value().addPass(count);
}
LeapArray#currentWindow 方法源码解析
LeapArray#currentWindow 方法作用是获取当前滑动时间窗口。
//com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow()
public WindowWrapT currentWindow() {//传入当前时间return currentWindow(TimeUtil.currentTimeMillis());
}
LeapArray#currentWindow 方法源码解析
LeapArray#currentWindow 方法主要作用是创建或者更新时间窗口具体逻辑如下
根据当前时间戳获取样本窗口索引值。获取当前窗口的起始位置。根据当前样本时间窗口索引值获取旧的时间窗口。如果当前样本时间窗口为 null就创建一个样本时间窗口。不为空首先判断计算出来的样本时间窗口其实质是否等于获取到的样本时间窗口起始值如果等于则直接返回如果大于则更新样本时间窗口数据如果小于也会创建一个样本时间窗口返回时钟回拨的情况但是这个样本时间窗口不会参与计算。
//com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow(long)
public WindowWrapT currentWindow(long timeMillis) {//时间小于 0 直接 returnif (timeMillis 0) {return null;}//根据当前时间戳获取样本窗口索引值int idx calculateTimeIdx(timeMillis);// Calculate current bucket start time.//获取当前窗口的起始位置long windowStart calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {//根据当前样本时间窗口索引值 获取旧的时间窗口WindowWrapT old array.get(idx);if (old null) {/** B0 B1 B2 NULL B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time888* bucket is empty, so create new and update** If the old bucket is absent, then we create a new bucket at {code windowStart},* then try to update circular array via a CAS operation. Only one thread can* succeed to update, while other threads yield its time slice.*///旧的时间窗口为空 创建 WindowWrap 对象 也就是创建样本时间窗口 MetricBucketWindowWrapT window new WindowWrapT(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));//使用 CAS 更新索引位置的时间窗口 更新到 LeapArray 中if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.//让出 CPU 使用权Thread.yield();}} else if (windowStart old.windowStart()) {/** B0 B1 B2 B3 B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time888* startTime of Bucket 3: 800, so its up-to-date** If current {code windowStart} is equal to the start timestamp of old bucket,* that means the time is within the bucket, so directly return the bucket.*///新旧时间窗口的起始时间一样 直接返回旧的时间窗口return old;} else if (windowStart old.windowStart()) {/** (old)* B0 B1 B2 NULL B4* |_______||_______|_______|_______|_______|_______||___* ... 1200 1400 1600 1800 2000 2200 timestamp* ^* time1676* startTime of Bucket 2: 400, deprecated, should be reset** If the start timestamp of old bucket is behind provided time, that means* the bucket is deprecated. We have to reset the bucket to current {code windowStart}.* Note that the reset and clean-up operations are hard to be atomic,* so we need a update lock to guarantee the correctness of bucket update.** The update lock is conditional (tiny scope) and will take effect only when* bucket is deprecated, so in most cases it wont lead to performance loss.*///滚动窗口 因为之前就已经初始化好了对应时间的窗口规格大小和数量所以这里只会覆盖上一个时间周期的老数据相当于环形数组//加锁if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.//更新时间窗口return resetWindowTo(old, windowStart);} finally {//释放锁updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart old.windowStart()) {// Should not go through here, as the provided time is already behind.//理论上来到这里是不正常的 但是这里还是重新创建了时间窗口//这里其实是时钟回拨问题例如服务器时间被前调导致了计算出来的窗口开始时间小于了现在目标的窗口时间//那么就新建一个窗口仅用作统计不会在流控 slot 中进行计算出现这个问题肯定就会计算不准return new WindowWrapT(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}
}
LeapArray#calculateTimeIdx 方法源码解析
LeapArray#calculateTimeIdx 方法的作用是获取样本时间窗口的索引值具体算法是当前的时间戳/样本窗口时间 % 样本窗口长度以秒级时间窗口为例当前的时间戳/500 % 2。 //com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#calculateTimeIdx
private int calculateTimeIdx(/*Valid*/ long timeMillis) {//索引除以样本窗口时间 得到long timeId timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.//得到的值 和窗口的样本数量取模得到索引值return (int)(timeId % array.length());
}LeapArray#calculateWindowStart 方法源码解析
LeapArray#calculateWindowStart 方法的主要作用是计算当前样本时间窗口的起始时间。
//com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#calculateWindowStart
protected long calculateWindowStart(/*Valid*/ long timeMillis) {//当前窗口的起始时间 当前时间-当前时间和样本窗口时间取模return timeMillis - timeMillis % windowLengthInMs;
}
BucketLeapArray#newEmptyBucket 方法源码解析
BucketLeapArray#newEmptyBucket 方法的作用是创建指标桶 MetricBucket。
//com.alibaba.csp.sentinel.slots.statistic.metric.BucketLeapArray#newEmptyBucket
Override
public MetricBucket newEmptyBucket(long time) {return new MetricBucket();
}
BucketLeapArray#resetWindowTo 方法源码解析
BucketLeapArray#resetWindowTo 方法的主要作用是更新窗口的开始时间和重置值。
//com.alibaba.csp.sentinel.slots.statistic.metric.BucketLeapArray#resetWindowTo
Override
protected WindowWrapMetricBucket resetWindowTo(WindowWrapMetricBucket w, long startTime) {// Update the start time and reset value.w.resetTo(startTime);w.value().reset();return w;
}
使用循环数组的好处
而循环数组可以循环重复使用可以避免频繁的创建 Bucket减少内存资源的占用。
总结Sentinel 滑动时间窗口使用了环形数组 LeapArray 来实现而 LeapArray 内部使用了一个 WindowWrap 类型的 array 来保存样本窗口WindowWrap 的作用是用来包装 MetricBucketWindowWrap 数组实现滑动窗口MetricBucket 负责统计各项指标数据WindowWrap 用于记录 MetricBucket 的时间窗口信息寻找样本时间窗口实际上就是寻找 WindowWrap找到了 WindowWrap 也就找到了 MetricBucketSentinel 滑动时间窗口的核心就是 LeapArray 、MetricBucket 和 WindowWrap 。
欢迎提出建议及对错误的地方指出纠正。