文章目录
- 本系列
- 前言
- 提供获取令牌的API
- 数据结构
- 基础方法
- tokensFromDuration
- durationFromTokens
- advance
- 获取令牌方法
- reverseN
- 其他系列API
- 令人费解的CancelAt
- 是bug吗
- 取消后无法唤醒其他请求
本系列
- 开源限流组件分析(一):juju/ratelimit
- 开源限流组件分析(二):uber-go/ratelimit
- 开源限流组件分析(三):golang-time/rate (本文)
前言
根据开源限流组件分析(一):juju/ratelimit的分析,令牌桶限流的实现没必要真的准备一个桶,定时往里塞令牌,然后每次获取令牌时从桶中弹出,但这样做对内存不友好,需要开辟桶最大容量大小的空间
最佳做法是利用 token 数可以和时间跨度相互转化的原理,只用维护一个桶中当前令牌数的变量,每次消费前才根据时间差计算并更新 Token 数目
本文将分析go官方库提供的限流工具的使用及实现细节,也是令牌桶算法。并对下面这个问题给出合理的解释:在CancelAt方法中,为啥归还令牌时不是归还所有令牌,而是要扣减一个值
本文基于源码:https://github.com/golang/time.git,版本:v0.7.0
提供获取令牌的API
-
Wait/WaitN:当没有足够的Token时,将在内部阻塞等待直到Token足够,或超时取消
- 工程中推荐使用该方法,参数中有
ctx
,和ctx配合能优雅实现超时控制
- 工程中推荐使用该方法,参数中有
-
Allow/AllowN:当前时刻没有足够的Token时,返回false。否则获取到令牌,返回成功
-
Reverse/ReverseN:当没有足够的Token时,返回 Reservation 对象,里面包含是否获取成功,以及要等多久才能放行请求
- 可控性最好,可以选择等待到规定的时间然后放心请求,也可以调Reservation.Cancel() 取消等待,归还令牌
数据结构
type Limit float64type Limiter struct {mu sync.Mutex// 每秒产生多少个tokenlimit Limit// 桶大小burst int// 桶中剩余的token数,可以为负数,表示被某些请求预定了一些未来的令牌tokens float64// 最近一次推进令牌桶的时间last time.Time// 令牌可以满足最近一次请求的时间lastEvent time.Time
}
特别说明这两个字段:
-
last:表示最近一次推进令牌桶的时间,也就是每次调获取令牌api的时间,因此每次获取令牌都会推进令牌桶的时间
- 这样每次获取令牌时,都会看从last到now这段时间之间产生了多少新的令牌
-
lastEvent:表示令牌桶中最近一次请求能放行的时间。如果当前时间调获取令牌api时就能满足,就是当前时间,否则是未来的某个时间
- 只用于CancelAt计算归还多少个令牌,可以到后面再理解
基础方法
tokensFromDuration
计算在时长d内,可以生成多少个令牌
也就是 速度 * 时间 = 总量
func (limit Limit) tokensFromDuration(d time.Duration) float64 {if limit <= 0 {return 0}return d.Seconds() * float64(limit)
}
durationFromTokens
计算生成tokens个令牌,需要多少时间
也就是 总数 / 速度 = 时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration {if limit <= 0 {return InfDuration}seconds := tokens / float64(limit)return time.Duration(float64(time.Second) * seconds)
}
advance
计算如果时间推进到t后,桶中可用的令牌个数
注意:该方法只计算,不会更新桶中的令牌数
func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {last := lim.lastif t.Before(last) {last = t}// 过去了多长时间elapsed := t.Sub(last)// 计算过去这段时间,一共产生多少tokendelta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + delta// tokens不能超过最大容量if burst := float64(lim.burst); tokens > burst {tokens = burst}return t, tokens
}
获取令牌方法
reverseN
Wait系列,Allow系列,Reserve系列的方法,底层都调了reverseN,其流程如下:
-
计算如果时间推进到now,桶中可用的 token 数
-
计算本次消费后,桶还能剩能下多少token
-
如果 token < 0, 说明目前的token不够,需要等待一段时间
-
只有同时满足下面两个条件时,才获取令牌成功
n <= lim.burst
:申请的token数量没有超过了桶的容量大小waitDuration <= maxFutureReserve
: 需要等待的时间 <=用户期望的时间
注意如果桶中令牌数不够,需要等待直到桶中令牌产生为止
此时需要将桶中令牌数置为负值,表示有请求欠了账,占了位
如果接下来有别的请求来获取令牌,会在之前欠账的基础上继续欠账,让令牌数的负值更大,需要等待更长的时间
通过时间流逝来还这些债,这一点和另一个令牌桶开源限流组件分析(一):juju/ratelimit很类似
// 当前时t时刻,想获取n个令牌,最多等maxFutureReserve时间
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()defer lim.mu.Unlock()// 可以忽略这个分支,因为用了限流器就肯定要有速率限制if lim.limit == Inf {return Reservation{ok: true,lim: lim,tokens: n,timeToAct: t,}}// 如果时间推进到now,可用的 token 数t, tokens := lim.advance(t)// 本次消费后,桶还能剩能下多少tokentokens -= float64(n)// 如果 token < 0, 说明目前的token不够,需要等待一段时间var waitDuration time.Durationif tokens < 0 {waitDuration = lim.limit.durationFromTokens(-tokens)}// n <= lim.burst:申请的 token 没有超过了桶的大小// waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间ok := n <= lim.burst && waitDuration <= maxFutureReserver := Reservation{ok: ok,lim: lim,limit: lim.limit,}// 能获取到令牌if ok {// 本次获取了多少tokenr.tokens = n// 表示需要等待到这个时刻才能获得期望数量的token(当然 waitDuration 有可能为 0,就是立即满足,timeToAct就是now)r.timeToAct = t.Add(waitDuration)// 更新推进令牌桶的时间lim.last = t// 就扣减令牌,更新桶中token的数量lim.tokens = tokens// 桶中可以满足最近一次请求的时间lim.lastEvent = r.timeToAct}return r
}
其他系列API
allowN:调reserveN时传maxFutureReserve=0,表示只有不等待就能获得时,才获取令牌
func (lim *Limiter) AllowN(t time.Time, n int) bool {return lim.reserveN(t, n, 0).ok
}
ReserveN:直接调reserveN
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {r := lim.reserveN(t, n, InfDuration)return &r
}
WaitN:根据ctx计算要获取令牌最多等待多久,并以这个时间最为最大等待时间调reverseN。然后在内部sleep
-
先检查ctx是否已取消,如果已取消直接返回
-
根据ctx还剩多久过期,来决定在reverseN中最多等待多久
- 换句话说,如果判断直到ctx过期都无法获取令牌,就不等了
-
获取到Reservation后,看 1)根据Reservation计算还有多久能获取令牌的时间delay,和2) ctx被取消,这两件事谁先发生:
- 如果delay先过完:获取令牌成功,可以执行请求
- 如果ctx先被取消:本次获取令牌失败,需要尽可能归还桶中的令牌
正常来说,只要不是外部主动取消,都是delay先过完
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// 其实就是:time.NewTimer(d)newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {timer := time.NewTimer(d)return timer.C, timer.Stop, func() {}}return lim.wait(ctx, n, time.Now(), newTimer)
}func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()// 不能超过桶容量if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// 先检查下ctx是否已取消select {case <-ctx.Done():return ctx.Err()default:}// 获取令牌的最大等待时间根据ctx计算waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(t)}r := lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// 该Reservation还需要等多久才能获得令牌delay := r.DelayFrom(t)if delay == 0 {return nil}ch, stop, _ := newTimer(delay)defer stop()select {case <-ch:// ctx超时或被取消之前,就获得令牌了return nilcase <-ctx.Done():// 在获得令牌之前,ctx就被取消了,需要归还令牌到桶中r.Cancel()return ctx.Err()}
}
令人费解的CancelAt
最后看看CancelAt
:取消本次持有,尽可能归还持有的令牌
为啥需要归还令牌?因此本次实际上没有放行请求,而是被cancel掉了,可以把令牌放回桶中,供接下来的请求使用
// 参数t一般是当前时间
func (r *Reservation) CancelAt(t time.Time) {if !r.ok {return}r.lim.mu.Lock()defer r.lim.mu.Unlock()/**1.如果产生令牌的速率无限大,那没必要归还令牌2.自己没有拿到token,也不用归还3.预期获得时间比当前时间早,这里认为不用归还*/if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {return}// 如果只有有更新的请求请求了令牌,要归还的令牌数减去// r.lim.lastEvent.Sub(r.timeToAct) 这期间产生的令牌restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))if restoreTokens <= 0 {return}// 推进桶的时间t, tokens := r.lim.advance(t)tokens += restoreTokensif burst := float64(r.lim.burst); tokens > burst {tokens = burst}r.lim.last = t// 归还tokenr.lim.tokens = tokens// 如果相等,说明后面没有新的token消费if r.timeToAct == r.lim.lastEvent {// 上一次的lastEventprevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))if !prevEvent.Before(t) {// 恢复lastEvent为上一次的lastEventr.lim.lastEvent = prevEvent}}
}
是bug吗
其中比较费解的是这一行:为啥要多减去这一部分
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
r.TimeToAct
:本次Reservation可以放行请求的时间,也是能使用令牌的时间r.lim.lastEvent
:令牌桶中,最近一次请求的TimeToAct
在归还令牌时,要减掉这段时间范围能产生的令牌数
对应于下面这个场景:
这行代码有注释:
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
但只是解释了在干啥:在r之后获取的令牌不应该被重新放回桶中
而没有解释为啥要这么干
有人在stackoverflow提问,归还所有持有的token是否应该改成restoreTokens = float64(r.tokens)
也有人在官方仓库提issue,举了这样一个例子:
func TestLimit(t *testing.T) {t0 := time.Now()t1 := time.Now().Add(100 * time.Millisecond)t2 := time.Now().Add(200 * time.Millisecond)t3 := time.Now().Add(300 * time.Millisecond)// 桶每秒产生10个令牌,容量=20l := NewLimiter(10, 20)l.ReserveN(t0, 15) // 桶里还剩 5 个 tokenfmt.Printf("%+v\n", l)// t1:时间过去100ms,桶里增加1个,变成6个// 消费10个,变成-4个r := l.ReserveN(t1, 10)// 此时timeToAct:500msfmt.Printf("%+v\n", l)// t2:时间过去100ms,桶里增加1个,变成-3个// 消费2个, 桶里变成-5个l.ReserveN(t2, 2)// 此时timeToAct:700msfmt.Printf("%+v\n", l)// t3: 时间过去100ms,桶里增加1个,变成-4个// r在t3取消,变成4(归还8个)个,按常理是变成6(归还10个)个r.CancelAt(t3)fmt.Printf("%+v\n", l)
}
最后一行:
- 按常理来说,应该把所有令牌都归还,也就是归还10个,这样桶中的令牌数变成6
- 实际上只归还了8个,桶中的令牌数变成4。就是那行令人费解的代码造成的
有人在issue下面给出解释:
意思就是:如果把保留的令牌数全部还回去的话,会造成在某一时刻突破令牌桶的速率限制
拿上面例子说明:
如果在t3时刻取消r时,如果把所有的令牌都归还了,也就是归还10个,此时桶中的令牌数变成6
那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有10个令牌!
如果此时有请求来获取这10个令牌,算上刚刚开始执行的r2,那就相当于同时有加起来需要消耗12个令牌
的请求在执行!超过了最大容量10的限制
而按照源码中的逻辑扣减,在t3时刻取消r时,只归还8个,此时桶中的令牌数变成4
那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有8个令牌
如果此时有请求来获取这8个令牌,算上刚刚开始执行的r2,那就会同时加起来需要消耗10个令牌
的请求在执行,刚好不超过桶中的最大容量的限制
也就是说,如果r后面有更新的请求rnew,rnew.TimeToAct
赋值给了r.limit.lastEvent
那么从r.TimeToAct
到r.limit.lastEvent
之间能产生的令牌是不能归还的,而是要等时间流逝自然填充
这样当rnew可以执行时,桶中的令牌数 + rnew获取的令牌数才不会超过桶的容量
取消后无法唤醒其他请求
这个库有个小问题:如果请求A在请求B之前发生,都需要等待一段时间。如果任务A提前cancel了,使得桶中的令牌满足请求B的需求,请求B也不会被唤醒,而是等待其预定的时间到了再执行
例如下面的例子:
func TestBug(t *testing.T) {// 每100ms产生一个令牌,最大容量=10l := NewLimiter(10, 10)t1 := time.Now()// 先把桶中的初始令牌用完l.ReserveN(t1, 10)var wg sync.WaitGroupctx, cancel := context.WithTimeout(context.TODO(), time.Hour)defer cancel()wg.Add(1)wg.Add(2)go func() {defer wg.Done()// 如果要等,这个要等 1s才能执行,但是我们的 ctx 200ms 就会取消l.WaitN(ctx, 10)fmt.Printf("[1] cost: %s\n", time.Since(t1))}()go func() {defer wg.Done()// 模拟出现问题, 200ms就取消了time.Sleep(200 * time.Millisecond)cancel()}()time.Sleep(100 * time.Millisecond)go func() {defer wg.Done()// 正常情况下,这个要等 1.2 s 才能执行,// 但是我们前面都取消了,按理说在400ms时,桶中令牌数就够了,这个可以执行// 但实际上没有唤醒,等到1.2才执行ctx, cancel := context.WithTimeout(context.Background(), time.Hour)defer cancel()l.WaitN(ctx, 2)fmt.Printf("[2] cost: %s\n", time.Since(t1))}()wg.Wait()
}
- 时刻0ms:请求A获取10个token,需要等1000ms,桶中
tokens = -10
- 时刻100ms:请求B获取2个token,需要等1200ms,桶中
tokens = -11
- 时刻200ms:请求A取消,往桶中归还8个token,桶中
tokens = -2
- 时刻400ms:理想情况下,此时请求B可以执行,因为桶中容量够了
- 时刻1.2s:实际上,请求B此时才可以执行