做360手机网站快速,网站攻击方式,甘孜建设网站,wordpress 安全选项原文链接#xff1a;
上一篇文章介绍了 如何实现计数器限流#xff1f;主要有两种实现方式#xff0c;分别是固定窗口和滑动窗口#xff0c;并且分析了 go-zero 采用固定窗口方式实现的源码。
但是采用固定窗口实现的限流器会有两个问题#xff1a;
会出现请求量超出限…原文链接
上一篇文章介绍了 如何实现计数器限流主要有两种实现方式分别是固定窗口和滑动窗口并且分析了 go-zero 采用固定窗口方式实现的源码。
但是采用固定窗口实现的限流器会有两个问题
会出现请求量超出限制值两倍的情况无法很好处理流量突增问题
这篇文章来介绍一下令牌桶算法可以很好解决以上两个问题。
工作原理
算法概念如下
令牌以固定速率生成生成的令牌放入令牌桶中存放如果令牌桶满了则多余的令牌会直接丢弃当请求到达时会尝试从令牌桶中取令牌取到了令牌的请求可以执行如果桶空了那么尝试取令牌的请求会被直接丢弃。 令牌桶算法既能够将所有的请求平均分布到时间区间内又能接受服务器能够承受范围内的突发请求因此是目前使用较为广泛的一种限流算法。
源码实现
源码分析我们还是以 go-zero 项目为例首先来看生成令牌的部分依然是使用 Redis 来实现。
// core/limit/tokenlimit.go// 生成 token 速率
script local rate tonumber(ARGV[1])
// 通容量
local capacity tonumber(ARGV[2])
// 当前时间戳
local now tonumber(ARGV[3])
// 请求数量
local requested tonumber(ARGV[4])
// 需要多少秒才能把桶填满
local fill_time capacity/rate
// 向下取整ttl 为填满时间 2 倍
local ttl math.floor(fill_time*2)
// 当前桶剩余容量如果为 nil说明第一次使用赋值为桶最大容量
local last_tokens tonumber(redis.call(get, KEYS[1]))
if last_tokens nil thenlast_tokens capacity
end// 上次请求时间戳如果为 nil 则赋值 0
local last_refreshed tonumber(redis.call(get, KEYS[2]))
if last_refreshed nil thenlast_refreshed 0
end// 距离上一次请求的时间跨度
local delta math.max(0, now-last_refreshed)
// 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和
// 与桶容量比较取二者的小值
local filled_tokens math.min(capacity, last_tokens(delta*rate))
// 判断请求数量和桶内 token 数量的大小
local allowed filled_tokens requested
// 被请求消耗掉之后更新剩余 token 数量
local new_tokens filled_tokens
if allowed thennew_tokens filled_tokens - requested
end// 更新 redis token
redis.call(setex, KEYS[1], ttl, new_tokens)
// 更新 redis 刷新时间
redis.call(setex, KEYS[2], ttl, now)return allowedRedis 中主要保存两个 key分别是 token 数量和刷新时间。
核心思想就是比较两次请求时间间隔内生成的 token 数量 桶内剩余 token 数量和请求量之间的大小如果满足则允许否则则不允许。
限流器初始化
// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {// 生成 token 速率rate int// 桶容量burst intstore *redis.Redis// 桶 keytokenKey string// 桶刷新时间 keytimestampKey stringrescueLock sync.Mutex// redis 健康标识redisAlive uint32// redis 健康监控启动状态monitorStarted bool// 内置单机限流器rescueLimiter *xrate.Limiter
}// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {tokenKey : fmt.Sprintf(tokenFormat, key)timestampKey : fmt.Sprintf(timestampFormat, key)return TokenLimiter{rate: rate,burst: burst,store: store,tokenKey: tokenKey,timestampKey: timestampKey,redisAlive: 1,rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),}
}其中有一个变量 rescueLimiter这是一个进程内的限流器。如果 Redis 发生故障了那么就使用这个算是一个保障尽量避免系统被突发流量拖垮。 提供了四个可调用方法
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {return lim.AllowN(time.Now(), 1)
}// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {return lim.AllowNCtx(ctx, time.Now(), 1)
}// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {return lim.reserveN(context.Background(), now, n)
}// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {return lim.reserveN(ctx, now, n)
}最终调用的都是 reverveN 方法
func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {// 判断 Redis 健康状态如果 Redis 故障则使用进程内限流器if atomic.LoadUint32(lim.redisAlive) 0 {return lim.rescueLimiter.AllowN(now, n)}// 执行限流脚本resp, err : lim.store.EvalCtx(ctx,script,[]string{lim.tokenKey,lim.timestampKey,},[]string{strconv.Itoa(lim.rate),strconv.Itoa(lim.burst),strconv.FormatInt(now.Unix(), 10),strconv.Itoa(n),})// redis allowed false// Lua boolean false - r Nil bulk replyif err redis.Nil {return false}if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {logx.Errorf(fail to use rate limiter: %s, err)return false}if err ! nil {logx.Errorf(fail to use rate limiter: %s, use in-process limiter for rescue, err)// 如果有异常的话会启动进程内限流lim.startMonitor()return lim.rescueLimiter.AllowN(now, n)}code, ok : resp.(int64)if !ok {logx.Errorf(fail to eval redis script: %v, use in-process limiter for rescue, resp)lim.startMonitor()return lim.rescueLimiter.AllowN(now, n)}// redis allowed true// Lua boolean true - r integer reply with value of 1return code 1
}最后看一下进程内限流的启动与恢复
func (lim *TokenLimiter) startMonitor() {lim.rescueLock.Lock()defer lim.rescueLock.Unlock()// 需要加锁保护如果程序已经启动了直接返回不要重复启动if lim.monitorStarted {return}lim.monitorStarted trueatomic.StoreUint32(lim.redisAlive, 0)go lim.waitForRedis()
}func (lim *TokenLimiter) waitForRedis() {ticker : time.NewTicker(pingInterval)// 更新监控进程的状态defer func() {ticker.Stop()lim.rescueLock.Lock()lim.monitorStarted falselim.rescueLock.Unlock()}()for range ticker.C {// 对 redis 进行健康监测如果 redis 服务恢复了// 则更新 redisAlive 标识并退出 goroutineif lim.store.Ping() {atomic.StoreUint32(lim.redisAlive, 1)return}}
}以上就是本文的全部内容如果觉得还不错的话欢迎点赞转发和关注感谢支持。 参考文章
https://juejin.cn/post/7052171117116522504https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673
推荐阅读
如何实现计数器限流go-zero 是如何做路由管理的