网站文章更新时间,网页怎么设计图片循环播放,网站前置审批表,连云港规划建设网站文章目录 为什么需要熔断Google sre弹性熔断算法kratos Breaker源码分析公共接口sre实现上报请求结果判定是否熔断 为什么需要熔断
一般来说#xff0c;当服务器过载#xff08;overload#xff09;时#xff0c;需要给client返回服务过载的报错 但是拒接请求也有成本当服务器过载overload时需要给client返回服务过载的报错 但是拒接请求也有成本可能响应错误码本身没啥成本但处理请求协议栈构建响应header等也有一笔开销 如果被拒绝的请求数量很大后端任然会过载因为其绝大多数CPU都花在拒绝请求上
因此最好的办法是客户端不要将请求发到服务端当客户端检测到其最近的请求中有很大一部分因“服务过载”错误而被拒绝时直接在本地失败不会经过网络IO发给服务端 熔断也可以称为客户端限流
Google sre弹性熔断算法
google sre提供了一种自适应的客户端熔断算法其维护了过去一段时间内的两个信息
requests往下游发起请求的总数accepts成功的请求数
正常情况下这两个值是相等的但当下游出现异常时accepts会逐渐小于requests一旦requests达到了accepts的K倍客户端就要启动自适应限流新产生的请求以一定概率被拒绝 拒绝请求的概率计算公式为 m a x ( 0 , r e q u e s t s − K ∗ a c c e p t s r e q u e s t s 1 ) max(0, \frac{requests - K * accepts}{requests 1}) max(0,requests1requests−K∗accepts) 当下游逐渐恢复时accetps会增加使得上述公式中分子变为负数拒绝的概率降为0
可以调整K值使算法产生不同的效果
减少K值会使得行为更激进也就是更容易发生熔断增大K值会使得自适应熔断不那么激进
kratos Breaker源码分析
接下来分析kratos中熔断器其采用了google sre的自适应客户端限流算法
公共接口
熔断器对外暴露3个方法
Allow每次调下游之前判断熔断器状态根据返回结果决定是否往下游发送请求MarkSuccess每次调下游如果成功上报SuccMarkFailed每次调下游如果失败上报Failed
type CircuitBreaker interface { Allow() error MarkSuccess() MarkFailed()
}业务中用起来大概是这样
// 初始化breaker
b : sre.NewBreaker()// 请求下游前判断熔断器状态
if err breaker.Allow(); err ! nil { return
}// 请求下游
err : fn()// 执行成功或失败将结果告知 breaker
if(err ! nil){ breaker.MarkFailed()
}else{ breaker.MarkSuccess()
}sre实现
stat维护请求总数和成功数的滑动窗口k熔断算法的K值request开始熔断的请求数阈值滑动窗口中请求数量达到request才开始熔断state熔断器状态该字段实际没啥用
type Breaker struct { // 滑动窗口stat window.RollingCounter // 随机数产生器同于根据概率熔断请求r *rand.Rand randLock sync.Mutex // 熔断算法的K值k float64// 开始熔断的请求数阈值request int64 state int32
}上报请求结果
func (b *Breaker) MarkSuccess() { b.stat.Add(1)
} func (b *Breaker) MarkFailed() { b.stat.Add(0)
}MarkSuccess内部会将总数1成功数1MarkFailed内部只会将总数1 本文的重点不是滑动窗口这里知道其干了啥就好 判定是否熔断 调summary() 拿到滑动窗口中的请求总数和成功数如果没达到熔断条件返回errnil。两个判定条件 条件一滑动窗口中请求总数没达到阈值total b.request条件二近期失败的数量不够多k * accepts total 否则就需要熔断根据公式计算熔断概率dr判定是否命中概率生成一个0~1之间的随机数如果小于dr说明命中
func (b *Breaker) Allow() error { // 拿到滑动窗口中的请求总数和成功数accepts, total : b.summary() requests : b.k * float64(accepts) // 没达到熔断条件 if total b.request || float64(total) requests { atomic.CompareAndSwapInt32(b.state, StateOpen, StateClosed) return nil } // 下面就是需要熔断atomic.CompareAndSwapInt32(b.state, StateClosed, StateOpen)// 计算熔断概率 dr : math.Max(0, (float64(total)-requests)/float64(total1)) drop : b.trueOnProba(dr) // 需要熔断if drop { return circuitbreaker.ErrNotAllowed } return nil
}从滑动窗口中获得请求总数total和成功请求数success
func (b *Breaker) summary() (success int64, total int64) { b.stat.Reduce(func(iterator window.Iterator) float64 { for iterator.Next() { bucket : iterator.Bucket() // 统计总数 total bucket.Count for _, p : range bucket.Points { // 统计成功的数量 success int64(p) } } return 0 }) return
}trueOnProba就是生成一个0~1之间的随机数看是否小于概率proba
func (b *Breaker) trueOnProba(proba float64) (truth bool) { b.randLock.Lock() truth b.r.Float64() proba b.randLock.Unlock() return
}