go-zero 是如何实现令牌桶限流的?

原文链接:

上一篇文章介绍了 如何实现计数器限流?主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。

但是采用固定窗口实现的限流器会有两个问题:

  1. 会出现请求量超出限制值两倍的情况
  2. 无法很好处理流量突增问题

这篇文章来介绍一下令牌桶算法,可以很好解决以上两个问题。

工作原理

算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。

源码实现

源码分析我们还是以 go-zero 项目为例,首先来看生成令牌的部分,依然是使用 Redis 来实现。

// core/limit/tokenlimit.go// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 当前时间戳
local now = tonumber(ARGV[3])
// 请求数量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填满
local fill_time = capacity/rate
// 向下取整,ttl 为填满时间 2 倍
local ttl = math.floor(fill_time*2)
// 当前桶剩余容量,如果为 nil,说明第一次使用,赋值为桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil thenlast_tokens = capacity
end// 上次请求时间戳,如果为 nil 则赋值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil thenlast_refreshed = 0
end// 距离上一次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
// 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和
// 与桶容量比较,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判断请求数量和桶内 token 数量的大小
local allowed = filled_tokens >= requested
// 被请求消耗掉之后,更新剩余 token 数量
local new_tokens = filled_tokens
if allowed thennew_tokens = filled_tokens - requested
end// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新时间
redis.call("setex", KEYS[2], ttl, now)return allowed`

Redis 中主要保存两个 key,分别是 token 数量和刷新时间。

核心思想就是比较两次请求时间间隔内生成的 token 数量 + 桶内剩余 token 数量,和请求量之间的大小,如果满足则允许,否则则不允许。

限流器初始化:

// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {// 生成 token 速率rate           int// 桶容量burst          intstore          *redis.Redis// 桶 keytokenKey       string// 桶刷新时间 keytimestampKey   stringrescueLock     sync.Mutex// redis 健康标识redisAlive     uint32// redis 健康监控启动状态monitorStarted bool// 内置单机限流器rescueLimiter  *xrate.Limiter
}// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {tokenKey := fmt.Sprintf(tokenFormat, key)timestampKey := fmt.Sprintf(timestampFormat, key)return &TokenLimiter{rate:          rate,burst:         burst,store:         store,tokenKey:      tokenKey,timestampKey:  timestampKey,redisAlive:    1,rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),}
}

其中有一个变量 rescueLimiter,这是一个进程内的限流器。如果 Redis 发生故障了,那么就使用这个,算是一个保障,尽量避免系统被突发流量拖垮。

提供了四个可调用方法:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {return lim.AllowN(time.Now(), 1)
}// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {return lim.AllowNCtx(ctx, time.Now(), 1)
}// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {return lim.reserveN(context.Background(), now, n)
}// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {return lim.reserveN(ctx, now, n)
}

最终调用的都是 reverveN 方法:

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {// 判断 Redis 健康状态,如果 Redis 故障,则使用进程内限流器if atomic.LoadUint32(&lim.redisAlive) == 0 {return lim.rescueLimiter.AllowN(now, n)}// 执行限流脚本resp, err := lim.store.EvalCtx(ctx,script,[]string{lim.tokenKey,lim.timestampKey,},[]string{strconv.Itoa(lim.rate),strconv.Itoa(lim.burst),strconv.FormatInt(now.Unix(), 10),strconv.Itoa(n),})// redis allowed == false// Lua boolean false -> r Nil bulk replyif err == redis.Nil {return false}if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {logx.Errorf("fail to use rate limiter: %s", err)return false}if err != nil {logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)// 如果有异常的话,会启动进程内限流lim.startMonitor()return lim.rescueLimiter.AllowN(now, n)}code, ok := resp.(int64)if !ok {logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)lim.startMonitor()return lim.rescueLimiter.AllowN(now, n)}// redis allowed == true// Lua boolean true -> r integer reply with value of 1return code == 1
}

最后看一下进程内限流的启动与恢复:

func (lim *TokenLimiter) startMonitor() {lim.rescueLock.Lock()defer lim.rescueLock.Unlock()// 需要加锁保护,如果程序已经启动了,直接返回,不要重复启动if lim.monitorStarted {return}lim.monitorStarted = trueatomic.StoreUint32(&lim.redisAlive, 0)go lim.waitForRedis()
}func (lim *TokenLimiter) waitForRedis() {ticker := time.NewTicker(pingInterval)// 更新监控进程的状态defer func() {ticker.Stop()lim.rescueLock.Lock()lim.monitorStarted = falselim.rescueLock.Unlock()}()for range ticker.C {// 对 redis 进行健康监测,如果 redis 服务恢复了// 则更新 redisAlive 标识,并退出 goroutineif lim.store.Ping() {atomic.StoreUint32(&lim.redisAlive, 1)return}}
}

以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。


参考文章:

  • https://juejin.cn/post/7052171117116522504
  • https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673

推荐阅读:

  • 如何实现计数器限流?
  • go-zero 是如何做路由管理的?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/84250.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apoll 多项式规划求解

一、纵向规划 void QuarticPolynomialCurve1d::ComputeCoefficients(const float x0, const float dx0, const float ddx0, const float dx1,const float ddx1, const float p) {if (p < 0.0) {std::cout << "p should be greater than 0 at line 140." &…

MySQL中基础查询语句

用户表user数据如下&#xff1a; iddevice_idgenderageuniversityprovince12138male21北京大学Beijing23214male复旦大学Shanghai36543famale20北京大学Deijing42315female 23 浙江大学ZheJiang55432male25山东大学Shandong 1&#xff0c;写出ddl语句创建如上表&#xff0c;…

解密Redis:应对面试中的缓存相关问题2

面试官&#xff1a;Redis集群有哪些方案&#xff0c;知道嘛&#xff1f; 候选人&#xff1a;嗯~~&#xff0c;在Redis中提供的集群方案总共有三种&#xff1a;主从复制、哨兵模式、Redis分片集群。 面试官&#xff1a;那你来介绍一下主从同步。 候选人&#xff1a;嗯&#xff…

设计模式行为型——模板模式

目录 模板模式的定义 模板模式的实现 模板模式角色 模板模式类图 模板模式举例 模板模式代码实现 模板模式的特点 优点 缺点 使用场景 注意事项 实际应用 模板模式的定义 模板模式&#xff08;Template Pattern&#xff09;属于行为型设计模式&#xff0c;又叫模版…

第7章 通过内网本机IP获取微信code值及其对code值的回调。

在第5章中讲述了怎样通过内网穿透外外网从而获取微信code值&#xff0c;实际上微信测试帐号管理页中也支持通过内网本机IP获取微信code值。 1 重构launchSettings.json "https": { "commandName": "Project", "dotnetRunMessages": t…

数据结构——空间复杂度

3.空间复杂度 空间复杂度也是一个数学表达式&#xff0c;是对一个算法在运行过程中临时占用存储空间大小的量度 。 空间复杂度不是程序占用了多少bytes的空间&#xff0c;因为这个也没太大意义&#xff0c;所以空间复杂度算的是变量的个数。 空间复杂度计算规则基本跟实践复杂…

kafka partition的数据文件(offffset,MessageSize,data)

partition中的每条Message包含了以下三个属性&#xff1a; offset&#xff0c;MessageSize&#xff0c;data&#xff0c;其中offset表示Message在这个partition中的偏移量&#xff0c;offset不是该Message在partition数据文件中的实际存储位置&#xff0c;而是逻辑上一个值&…

【刻削生千变,丹青图“万相”】阿里云AI绘画创作模型 “通义万相”测评

刻削生千变&#xff0c;丹青图“万相 4月7日&#xff0c;阿里大模型“通义千问”开始邀请用户测试体验。现阶段该模型主要定向邀请企业用户进行体验测试&#xff0c;用户可通过官网申请&#xff08;tongyi.aliyun.com&#xff09;&#xff0c;符合条件的用户可参与体验。 随…

RabbitMQ(一) - 基本结构、SpringBoot整合RabbitMQ、工作队列、发布订阅、直接、主题交换机模式

RabbitMQ结构 Publisher &#xff1a; 生产者 Queue: 存储消息的容器队列&#xff1b; Consumer:消费者 Connection&#xff1a;消费者与消息服务的TCP连接 Channel:信道&#xff0c;是TCP里面的虚拟连接。例如&#xff1a;电缆相当于TCP&#xff0c;信道是一条独立光纤束&…

在vue中Antv G2 折线图如何添加点击事件获取折线上点的值

在项目中有个需求是点击折线图的点&#xff0c;获取当前点的信息&#xff0c;其它图形都可以参考相关的API获取到&#xff0c;但area做的折线图怎么都获取不到点击的信息&#xff0c;只能获取全部的信息&#xff0c;最终解决如下&#xff1a; 实现思路 用户的鼠标在折线图上移…

基于低代码和数字孪生技术的电力运维平台设计

电力能源服务商在为用能企业提供线上服务的时候&#xff0c;不可避免要面对用能企业的各种个性化需求。如果这些需求和想法都要靠平台厂家研发人员来实现&#xff0c;那在周期、成本、效果上都将是无法满足服务运营需要的&#xff0c;这也是目前很多线上能源云平台应用效果不理…

单例模式-java实现

介绍 单例模式的意图&#xff1a;保证某个类在系统中有且仅有一个实例。 我们可以看到下面的类图&#xff1a;一般的单例的实现&#xff0c;是属性中保持着一个自己的私有静态实例引用&#xff0c;还有一个私有的构造方法&#xff0c;然后再开放一个静态的获取实例的方法给外界…

快速修复应用程序中的问题的利器—— Android热修复

热修复技术在Android开发中扮演着重要的角色&#xff0c;它可以帮助开发者在不需要重新发布应用程序的情况下修复已经上线的应用程序中的bug或者添加新的功能。 一、热修复是什么&#xff1f; 热修复&#xff08;HotFix&#xff09;是一种在运行时修复应用程序中的问题的技术…

树结构转换

思路&#xff1a; 先把数组转化成一个对象&#xff08;map&#xff09;&#xff0c;对象的key值是对象的id 遍历对象&#xff1b;map[map[k].pid].children.push(map[k]),【k代表索引】&#xff0c;pid等于0代表是根节点 // 数结构转换let arr [{id: 1,pid: 0,name: "b…

【OpenVINOSharp】 基于C#和OpenVINO2023.0部署Yolov8全系列模型

基于C#和OpenVINO2023.0部署Yolov8全系列模型 1 项目简介1.1 OpenVINOTM 2 OpenVinoSharp2.1 OpenVINOTM 2023.0安装配置2.2 C 动态链接库2.3 C#构建Core推理类2.4 NuGet安装OpenVinoSharp 3 获取和转换Yolov8模型3.1 安装ultralytics3.2 导出yolov8模型3.3 安装OpenVINOTM Pyt…

python——案例15:判断奇数还是偶数

案例15&#xff1a;判断奇数还是偶数numint(input(输入数值&#xff1a;))if(num%2)0: #通过if语句判断print("{0}是偶数".format(num))else: #通过else语句判断print("{0}是奇数".format(num))

flask-migrate使用

1.介绍 # 表,字段发生变化&#xff0c;都会有记录&#xff0c;自动同步到数据库中--》django支持这种操作 # 原生的sqlalchemy&#xff0c;不支持修改表的 # flask-migrate可以实现类似于django的 python manage.py makemigrations #记录 python manage.py migrate …

Docker之jenkins部署harbor在harbor中完成部署

Docker之jenkins部署harbor在harbor中完成部署 1、harbor作用 Harbor允许用户用命令行工具对容器镜像及其他Artifact进行推送和拉取&#xff0c;并提供了图形管理界面帮助用户查阅和删除这些Artifact。在Harbor 2.0版本中&#xff0c;除容器镜像外&#xff0c;Harbor对符合OCI…

爬虫程序中使用爬虫ip的优势

作为一名爬虫技术员&#xff0c;我发现在爬虫程序中使用代理IP可以提升爬取效率和匿名性。今天&#xff0c;我就来详细讲解一下代理IP在爬虫程序中的工作原理及应用。 首先&#xff0c;我们来了解一下代理IP在爬虫程序中的工作原理。当我们使用爬虫程序进行数据采集时&#xf…

计算机的构造和原理

本资料转载于B站up主芯片超人-花 仅用于学习和讨论&#xff0c;如有侵权请联系 计算机工作原理之3D动画揭秘&#xff1a;计算机内部如何工作_哔哩哔哩_bilibili 1.CPU的部分 1.1 CPU放大看 1.2 一个芯片中&#xff0c;有80亿至100亿晶体管 1.3 放大磁道 1.4 共享3级缓存 1.5 …