开源限流组件分析(三):golang-time/rate

文章目录

  • 本系列
  • 前言
  • 提供获取令牌的API
  • 数据结构
  • 基础方法
    • tokensFromDuration
    • durationFromTokens
    • advance
  • 获取令牌方法
    • reverseN
    • 其他系列API
  • 令人费解的CancelAt
    • 是bug吗
  • 取消后无法唤醒其他请求

本系列

  • 开源限流组件分析(一):juju/ratelimit
  • 开源限流组件分析(二):uber-go/ratelimit
  • 开源限流组件分析(三):golang-time/rate (本文)

前言

根据开源限流组件分析(一):juju/ratelimit的分析,令牌桶限流的实现没必要真的准备一个桶,定时往里塞令牌,然后每次获取令牌时从桶中弹出,但这样做对内存不友好,需要开辟桶最大容量大小的空间

最佳做法是利用 token 数可以和时间跨度相互转化的原理,只用维护一个桶中当前令牌数的变量,每次消费前才根据时间差计算并更新 Token 数目

本文将分析go官方库提供的限流工具的使用及实现细节,也是令牌桶算法。并对下面这个问题给出合理的解释:在CancelAt方法中,为啥归还令牌时不是归还所有令牌,而是要扣减一个值

本文基于源码:https://github.com/golang/time.git,版本:v0.7.0

提供获取令牌的API

  • Wait/WaitN:当没有足够的Token时,将在内部阻塞等待直到Token足够,或超时取消

    • 工程中推荐使用该方法,参数中有ctx,和ctx配合能优雅实现超时控制
  • Allow/AllowN:当前时刻没有足够的Token时,返回false。否则获取到令牌,返回成功

  • Reverse/ReverseN:当没有足够的Token时,返回 Reservation 对象,里面包含是否获取成功,以及要等多久才能放行请求

    • 可控性最好,可以选择等待到规定的时间然后放心请求,也可以调Reservation.Cancel() 取消等待,归还令牌

数据结构

type Limit float64type Limiter struct {mu sync.Mutex// 每秒产生多少个tokenlimit Limit// 桶大小burst int// 桶中剩余的token数,可以为负数,表示被某些请求预定了一些未来的令牌tokens float64// 最近一次推进令牌桶的时间last time.Time// 令牌可以满足最近一次请求的时间lastEvent time.Time
}

特别说明这两个字段:

  • last:表示最近一次推进令牌桶的时间,也就是每次调获取令牌api的时间,因此每次获取令牌都会推进令牌桶的时间

    • 这样每次获取令牌时,都会看从last到now这段时间之间产生了多少新的令牌
  • lastEvent:表示令牌桶中最近一次请求能放行的时间。如果当前时间调获取令牌api时就能满足,就是当前时间,否则是未来的某个时间

    • 只用于CancelAt计算归还多少个令牌,可以到后面再理解

基础方法

tokensFromDuration

计算在时长d内,可以生成多少个令牌

也就是 速度 * 时间 = 总量

func (limit Limit) tokensFromDuration(d time.Duration) float64 {if limit <= 0 {return 0}return d.Seconds() * float64(limit)
}

durationFromTokens

计算生成tokens个令牌,需要多少时间

也就是 总数 / 速度 = 时间

func (limit Limit) durationFromTokens(tokens float64) time.Duration {if limit <= 0 {return InfDuration}seconds := tokens / float64(limit)return time.Duration(float64(time.Second) * seconds)
}

advance

计算如果时间推进到t后,桶中可用的令牌个数

注意:该方法只计算,不会更新桶中的令牌数

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {last := lim.lastif t.Before(last) {last = t}// 过去了多长时间elapsed := t.Sub(last)// 计算过去这段时间,一共产生多少tokendelta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + delta// tokens不能超过最大容量if burst := float64(lim.burst); tokens > burst {tokens = burst}return t, tokens
}

获取令牌方法

reverseN

Wait系列,Allow系列,Reserve系列的方法,底层都调了reverseN,其流程如下:

  1. 计算如果时间推进到now,桶中可用的 token 数

  2. 计算本次消费后,桶还能剩能下多少token

  3. 如果 token < 0, 说明目前的token不够,需要等待一段时间

  4. 只有同时满足下面两个条件时,才获取令牌成功

    1. n <= lim.burst:申请的token数量没有超过了桶的容量大小
    2. waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间

注意如果桶中令牌数不够,需要等待直到桶中令牌产生为止

此时需要将桶中令牌数置为负值,表示有请求欠了账,占了位

如果接下来有别的请求来获取令牌,会在之前欠账的基础上继续欠账,让令牌数的负值更大,需要等待更长的时间

通过时间流逝来还这些债,这一点和另一个令牌桶开源限流组件分析(一):juju/ratelimit很类似

// 当前时t时刻,想获取n个令牌,最多等maxFutureReserve时间
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()defer lim.mu.Unlock()// 可以忽略这个分支,因为用了限流器就肯定要有速率限制if lim.limit == Inf {return Reservation{ok:        true,lim:       lim,tokens:    n,timeToAct: t,}}// 如果时间推进到now,可用的 token 数t, tokens := lim.advance(t)// 本次消费后,桶还能剩能下多少tokentokens -= float64(n)// 如果 token < 0, 说明目前的token不够,需要等待一段时间var waitDuration time.Durationif tokens < 0 {waitDuration = lim.limit.durationFromTokens(-tokens)}// n <= lim.burst:申请的 token 没有超过了桶的大小// waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间ok := n <= lim.burst && waitDuration <= maxFutureReserver := Reservation{ok:    ok,lim:   lim,limit: lim.limit,}// 能获取到令牌if ok {// 本次获取了多少tokenr.tokens = n// 表示需要等待到这个时刻才能获得期望数量的token(当然 waitDuration 有可能为 0,就是立即满足,timeToAct就是now)r.timeToAct = t.Add(waitDuration)// 更新推进令牌桶的时间lim.last = t// 就扣减令牌,更新桶中token的数量lim.tokens = tokens// 桶中可以满足最近一次请求的时间lim.lastEvent = r.timeToAct}return r
}

其他系列API

allowN:调reserveN时传maxFutureReserve=0,表示只有不等待就能获得时,才获取令牌

func (lim *Limiter) AllowN(t time.Time, n int) bool {return lim.reserveN(t, n, 0).ok
}

ReserveN:直接调reserveN

func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {r := lim.reserveN(t, n, InfDuration)return &r
}

WaitN:根据ctx计算要获取令牌最多等待多久,并以这个时间最为最大等待时间调reverseN。然后在内部sleep

  1. 先检查ctx是否已取消,如果已取消直接返回

  2. 根据ctx还剩多久过期,来决定在reverseN中最多等待多久

    1. 换句话说,如果判断直到ctx过期都无法获取令牌,就不等了
  3. 获取到Reservation后,看 1)根据Reservation计算还有多久能获取令牌的时间delay,和2) ctx被取消,这两件事谁先发生:

    1. 如果delay先过完:获取令牌成功,可以执行请求
    2. 如果ctx先被取消:本次获取令牌失败,需要尽可能归还桶中的令牌

正常来说,只要不是外部主动取消,都是delay先过完

func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// 其实就是:time.NewTimer(d)newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {timer := time.NewTimer(d)return timer.C, timer.Stop, func() {}}return lim.wait(ctx, n, time.Now(), newTimer)
}func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()// 不能超过桶容量if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// 先检查下ctx是否已取消select {case <-ctx.Done():return ctx.Err()default:}// 获取令牌的最大等待时间根据ctx计算waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(t)}r := lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// 该Reservation还需要等多久才能获得令牌delay := r.DelayFrom(t)if delay == 0 {return nil}ch, stop, _ := newTimer(delay)defer stop()select {case <-ch:// ctx超时或被取消之前,就获得令牌了return nilcase <-ctx.Done():// 在获得令牌之前,ctx就被取消了,需要归还令牌到桶中r.Cancel()return ctx.Err()}
}

令人费解的CancelAt

最后看看CancelAt:取消本次持有,尽可能归还持有的令牌

为啥需要归还令牌?因此本次实际上没有放行请求,而是被cancel掉了,可以把令牌放回桶中,供接下来的请求使用

// 参数t一般是当前时间
func (r *Reservation) CancelAt(t time.Time) {if !r.ok {return}r.lim.mu.Lock()defer r.lim.mu.Unlock()/**1.如果产生令牌的速率无限大,那没必要归还令牌2.自己没有拿到token,也不用归还3.预期获得时间比当前时间早,这里认为不用归还*/if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {return}// 如果只有有更新的请求请求了令牌,要归还的令牌数减去// r.lim.lastEvent.Sub(r.timeToAct) 这期间产生的令牌restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))if restoreTokens <= 0 {return}// 推进桶的时间t, tokens := r.lim.advance(t)tokens += restoreTokensif burst := float64(r.lim.burst); tokens > burst {tokens = burst}r.lim.last = t// 归还tokenr.lim.tokens = tokens// 如果相等,说明后面没有新的token消费if r.timeToAct == r.lim.lastEvent {// 上一次的lastEventprevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))if !prevEvent.Before(t) {// 恢复lastEvent为上一次的lastEventr.lim.lastEvent = prevEvent}}
}

是bug吗

其中比较费解的是这一行:为啥要多减去这一部分

restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  • r.TimeToAct:本次Reservation可以放行请求的时间,也是能使用令牌的时间
  • r.lim.lastEvent:令牌桶中,最近一次请求的TimeToAct

在归还令牌时,要减掉这段时间范围能产生的令牌数

对应于下面这个场景:

在这里插入图片描述

这行代码有注释:

// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.

但只是解释了在干啥:在r之后获取的令牌不应该被重新放回桶中

而没有解释为啥要这么干


有人在stackoverflow提问,归还所有持有的token是否应该改成restoreTokens = float64(r.tokens)

在这里插入图片描述

也有人在官方仓库提issue,举了这样一个例子:

func TestLimit(t *testing.T) {t0 := time.Now()t1 := time.Now().Add(100 * time.Millisecond)t2 := time.Now().Add(200 * time.Millisecond)t3 := time.Now().Add(300 * time.Millisecond)// 桶每秒产生10个令牌,容量=20l := NewLimiter(10, 20)l.ReserveN(t0, 15) // 桶里还剩 5 个 tokenfmt.Printf("%+v\n", l)// t1:时间过去100ms,桶里增加1个,变成6个// 消费10个,变成-4个r := l.ReserveN(t1, 10)// 此时timeToAct:500msfmt.Printf("%+v\n", l)// t2:时间过去100ms,桶里增加1个,变成-3个// 消费2个, 桶里变成-5个l.ReserveN(t2, 2)// 此时timeToAct:700msfmt.Printf("%+v\n", l)// t3: 时间过去100ms,桶里增加1个,变成-4个// r在t3取消,变成4(归还8个)个,按常理是变成6(归还10个)个r.CancelAt(t3)fmt.Printf("%+v\n", l)
}

最后一行:

  • 按常理来说,应该把所有令牌都归还,也就是归还10个,这样桶中的令牌数变成6
  • 实际上只归还了8个,桶中的令牌数变成4。就是那行令人费解的代码造成的

有人在issue下面给出解释:

在这里插入图片描述

意思就是:如果把保留的令牌数全部还回去的话,会造成在某一时刻突破令牌桶的速率限制

拿上面例子说明:

如果在t3时刻取消r时,如果把所有的令牌都归还了,也就是归还10个,此时桶中的令牌数变成6

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有10个令牌!

如果此时有请求来获取这10个令牌,算上刚刚开始执行的r2,那就相当于同时有加起来需要消耗12个令牌的请求在执行!超过了最大容量10的限制


而按照源码中的逻辑扣减,在t3时刻取消r时,只归还8个,此时桶中的令牌数变成4

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有8个令牌

如果此时有请求来获取这8个令牌,算上刚刚开始执行的r2,那就会同时加起来需要消耗10个令牌的请求在执行,刚好不超过桶中的最大容量的限制

也就是说,如果r后面有更新的请求rnew,rnew.TimeToAct赋值给了r.limit.lastEvent

那么从r.TimeToActr.limit.lastEvent之间能产生的令牌是不能归还的,而是要等时间流逝自然填充

这样当rnew可以执行时,桶中的令牌数 + rnew获取的令牌数才不会超过桶的容量


取消后无法唤醒其他请求

这个库有个小问题:如果请求A在请求B之前发生,都需要等待一段时间。如果任务A提前cancel了,使得桶中的令牌满足请求B的需求,请求B也不会被唤醒,而是等待其预定的时间到了再执行

例如下面的例子:

func TestBug(t *testing.T) {// 每100ms产生一个令牌,最大容量=10l := NewLimiter(10, 10)t1 := time.Now()// 先把桶中的初始令牌用完l.ReserveN(t1, 10)var wg sync.WaitGroupctx, cancel := context.WithTimeout(context.TODO(), time.Hour)defer cancel()wg.Add(1)wg.Add(2)go func() {defer wg.Done()// 如果要等,这个要等 1s才能执行,但是我们的 ctx 200ms 就会取消l.WaitN(ctx, 10)fmt.Printf("[1] cost: %s\n", time.Since(t1))}()go func() {defer wg.Done()// 模拟出现问题, 200ms就取消了time.Sleep(200 * time.Millisecond)cancel()}()time.Sleep(100 * time.Millisecond)go func() {defer wg.Done()// 正常情况下,这个要等 1.2 s 才能执行,// 但是我们前面都取消了,按理说在400ms时,桶中令牌数就够了,这个可以执行// 但实际上没有唤醒,等到1.2才执行ctx, cancel := context.WithTimeout(context.Background(), time.Hour)defer cancel()l.WaitN(ctx, 2)fmt.Printf("[2] cost: %s\n", time.Since(t1))}()wg.Wait()
}
  1. 时刻0ms:请求A获取10个token,需要等1000ms,桶中tokens = -10
  2. 时刻100ms:请求B获取2个token,需要等1200ms,桶中tokens = -11
  3. 时刻200ms:请求A取消,往桶中归还8个token,桶中tokens = -2
  4. 时刻400ms:理想情况下,此时请求B可以执行,因为桶中容量够了
  5. 时刻1.2s:实际上,请求B此时才可以执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/456347.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能AI监测系统燃气安全改造方案的背景及应用价值

随着燃气行业的迅速发展和城市化进程的加快&#xff0c;燃气安全管理成为企业运营和城市管理中不可忽视的关键领域。燃气泄漏、管道破损等事故的发生不仅会造成严重的经济损失&#xff0c;还威胁到人民生命财产安全。传统的安全管理方法往往依赖人工巡检和手动监测&#xff0c;…

如何写一个视频编码器演示篇

先前写过《视频编码原理简介》&#xff0c;有朋友问光代码和文字不太真切&#xff0c;能否补充几张图片&#xff0c;今天我们演示一下&#xff1a; 这是第一帧画面&#xff1a;P1&#xff08;我们的参考帧&#xff09; 这是第二帧画面&#xff1a;P2&#xff08;需要编码的帧&…

C2W4.LAB.Word_Embedding.Part2

理论课&#xff1a;C2W4.Word Embeddings with Neural Networks 文章目录 Training the CBOW modelForward propagationInitialization of the weights and biasesTraining exampleValues of the hidden layerValues of the output layerCross-entropy loss BackpropagationGr…

大家都在用的HR招聘管理工具:国内Top5排名

招聘管理工具是专为HR及招聘团队设计的数字化助手&#xff0c;旨在简化招聘流程&#xff0c;提高效率。众所周知&#xff0c;招聘管理工具通常集成简历收集、筛选、面试安排、候选人跟踪等功能于一体&#xff0c;让招聘过程更加流畅。使用招聘管理工具&#xff0c;不仅能节省时…

高边坡稳定安全监测预警系统解决方案

一、项目背景 高边坡的滑坡和崩塌是一种常见的自然地质灾害&#xff0c;一但发生而没有提前预告将给人民的生命财产和社会危害产生严重影响。对高边坡可能产生的灾害提前预警、必将有利于决策者采取应对措施、减少和降低灾害造成的损失。现有的高边坡监测技术有人工巡查和利用测…

100个候选人,没一个能讲明白什么是自动化框架?

什么是自动化测试框架 01 什么是框架 框架是整个或部分系统的可重用设计&#xff0c;表现为一组抽象构件及构件实例间交互的方法。它规定了应用的体系结构&#xff0c;阐明了整个设计、协作构件之间的依赖关系、责任分配和控制流程&#xff0c;表现为一组抽象类以及其实例之间…

格姗知识圈博客网站开源了!

格姗知识圈博客 一个基于 Spring Boot、Spring Security、Vue3、Element Plus 的前后端分离的博客网站&#xff01;本项目基本上是小格子一个人开发&#xff0c;由于工作和个人能力原因&#xff0c;部分技术都是边学习边开发&#xff0c;特别是前端&#xff08;工作中是后端开…

MySQL~表的操作(创建表,查看表,修改表,删除表)

1.创建表 1.1.创建表 首先要选择需要操作的数据库&#xff0c;USE 数据库名&#xff0c;后续可以根据实际情况操作时添加。 USE fruitsales;建表语法&#xff1a; create table 表名( 字段名1 数据类型, 字段名2 数据类型, ); 实例&#xff1a;创建fruit_bak1表。 create t…

[linux]软件安装

安装方式 二进制发布包安装: 软件已经针对具体平台编译打包发布&#xff0c;只要解压修改配置即可 rpm安装: 软件已经按照redhat的包管理规范进行打包, 使用rpm命令进行安装&#xff0c;不能自行解决库依赖问题 yum安装: 一种在线软件安装方式, 本质上还是rpm安装, 自动下载…

【vim】手动安装 Leader-F

LeaderF 是一个功能强大的 Vim 插件&#xff0c;主要用于快速导航和搜索。它可以帮助用户在 Vim 中高效地查找文件、缓冲区、标签、函数等各种元素&#xff0c;极大地提高了编辑效率。 LeaderF 的安装如果按照仓库中的教程来的话可以很方便的实现安装&#xff0c;这里介绍一下…

【记录】VSCode|自用设置项

文章目录 1 基础配置1.1 自动保存1.2 编辑区自动换行1.3 选项卡换行1.4 空格代替制表符1.5 开启滚轮缩放 2 进阶设置2.1 选项卡不自我覆盖2.2 选项卡限制宽度2.3 选项卡组限制高度2.4 字体设置2.5 字体加粗2.6 侧边栏2.7 沉浸式代码模式 Zen Mode2.8 设置 Zen 模式的选项卡组 3…

家用wifi的ip地址固定吗?换wifi就是换ip地址吗

在探讨家用WiFi的IP地址是否固定&#xff0c;以及换WiFi是否就意味着换IP地址这两个问题时&#xff0c;我们首先需要明确几个关键概念&#xff1a;IP地址、家用WiFi网络、以及它们之间的相互作用。 一、家用WiFi的IP地址固定性 家用WiFi环境中的IP地址通常涉及两类&#xff1a…

文档透明加密系统怎么用?五款透明加密软件汇总!2024热门推荐,实测分享!

数据泄露事件频发&#xff0c;让无数企业谈之色变。 想要自动对存储在计算机上的文档进行加密吗&#xff1f; 怎么在不影响日常工作的前提&#xff0c;确保文档在存储和传输过程中的安全&#xff1f; 透明加密系统来助力&#xff01; 本文&#xff0c;将详细介绍文档透明加密…

解决vue使用pdfdist-mergeofd插件时报错polyfills

pdfdist-mergeofd 该插件主要是为了解决pdf-js和ofd-js共同使用时产生的依赖冲突问题&#xff0c;具体可看这位博主的文章同时使用ofdjs和pdfjs遇到的问题&#xff0c;和解决方法——懒加载 首先看下报错信息 ERROR in ./node_modules/.pnpm/pdfdist-mergeofd2.2.228_webpa…

人工智能算法之双倍体遗传算法(DGA)

人工智能算法之双倍体遗传算法&#xff08;DGA&#xff09; 双倍体遗传算法是一种改进的遗传算法&#xff0c;借鉴了生物中双倍体&#xff08;每个体细胞中具有两套染色体&#xff09;的遗传机制。传统遗传算法中的个体通常是单倍体&#xff08;单套基因&#xff09;&#xff0…

使用 v-html 指令渲染的标签, 标签内绑定的 click 事件不生效

背景 在项目开发中&#xff0c;实现用户友好的输入交互是提升用户体验的关键之一。例如&#xff0c;在客服对话框中&#xff0c;其中有包含多个快捷选项用于快速问答&#xff0c;每个快捷选项都是一个可点击的按钮&#xff0c;并需要绑定点击事件来执行相应操作。然而&#xf…

数据类型【MySQL】

文章目录 建立表查看表删除表数据类型floatcharvarcharchar&&varchar 时间日期类型enum和setenum和set查找 建立表 mysql> create table if not exists user1(-> id int ,-> name varchar (20) comment 用户名 ,-> password char (32) comment 用户名的…

软考(中级-软件设计师)算法分析篇(1024)

三、算法设计与分析 #1024程序员节|正文# 一、分治法 1.1 分而治之 对于一个规模为n的问题&#xff0c;若该问题可以容易的解决&#xff08;比如说规模较小&#xff0c;则直接解决&#xff0c;否则将其分解为k个规模较小的问题&#xff0c;这些子问题相互独立且与原问题形…

数组类型应用举例

在main.cpp里输入程序如下&#xff1a; #include "stdio.h" //使能printf()函数 #include <stdlib.h> //使能exit(); #define My_array_Size 10 //定义用My_array_Size代替 unsigned char My_array[My_array_Size]; //声明数组My_arra…

集群分发脚本

我的后端学习大纲 我的Linux环境搭建学习大纲 8.2.scp安全拷贝: 1.命令格式&#xff1a;scp -r $pdir/$fname $user$host:$pdir/$fname2.具体命令&#xff1a; scp -r jdk1.8.0_321/ rootHadoop104:/opt/module 3.实际操作&#xff1a; 3.1.在hadoop2和hadoop3&#xff0c;had…