本文分享自华为云社区《详解 Redisson 分布式限流的实现原理》,作者: xindoo。
我们目前在工作中遇到一个性能问题,我们有个定时任务需要处理大量的数据,为了提升吞吐量,所以部署了很多台机器,但这个任务在运行前需要从别的服务那拉取大量的数据,随着数据量的增大,如果同时多台机器并发拉取数据,会对下游服务产生非常大的压力。之前已经增加了单机限流,但无法解决问题,因为这个数据任务运行中只有不到 10% 的时间拉取数据,如果单机限流限制太狠,虽然集群总的请求量控制住了,但任务吞吐量又降下来。如果限流阈值太高,多机并发的时候,还是有可能压垮下游。 所以目前唯一可行的解决方案就是分布式限流。
我目前是选择直接使用 Redisson 库中的 RRateLimiter 实现了分布式限流,关于 Redission 可能很多人都有所耳闻,它其实是在 Redis 能力上构建的开发库,除了支持 Redis 的基础操作外,还封装了布隆过滤器、分布式锁、限流器…… 等工具。今天要说的 RRateLimiter 及时其实现的限流器。接下来本文将详细介绍下 RRateLimiter 的具体使用方式、实现原理还有一些注意事项,最后简单谈谈我对分布式限流底层原理的理解。
RRateLimiter 使用
RRateLimiter 的使用方式异常的简单,参数也不多。只要创建出 RedissonClient,就可以从 client 中获取到 RRateLimiter 对象,直接看代码示例。
RedissonClientredissonClient = Redisson.create();RRateLimiterrateLimiter = redissonClient.getRateLimiter("xindoo.limiter");rateLimiter.trySetRate(RateType.OVERALL,100, 1, RateIntervalUnit.HOURS);
rateLimiter.trySetRate 就是设置限流参数,RateType 有两种,OVERALL 是全局限流 ,PER_CLIENT 是单 Client 限流(可以认为就是单机限流),这里我们只讨论全局模式。而后面三个参数的作用就是设置在多长时间窗口内(rateInterval+IntervalUnit),许可总量不超过多少(rate),上面代码中我设置的值就是 1 小时内总许可数不超过 100 个。然后调用 rateLimiter 的 tryAcquire () 或者 acquire () 方法即可获取许可。
rateLimiter.acquire(1); // 申请1份许可,直到成功boolean res = rateLimiter.tryAcquire(1, 5, TimeUnit.SECONDS); // 申请1份许可,如果5s内未申请到就放弃
使用起来还是很简单的嘛,以上代码中的两种方式都是同步调用,但 Redisson 还同样提供了异步方法 acquireAsync () 和 tryAcquireAsync (),使用其返回的 RFuture 就可以异步获取许可。
RRateLimiter 的实现
接下来我们顺着 tryAcquire () 方法来看下它的实现方式,在 RedissonRateLimiter 类中,我们可以看到最底层的 tryAcquireAsync () 方法。
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {byte[] random = newbyte[8];ThreadLocalRandom.current().nextBytes(random);return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"——————————————————————————————————————"+ "这里是一大段lua代码"+ "____________________________________",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),value, System.currentTimeMillis(), random);}
映入眼帘的就是一大段 lua 代码,其实这段 Lua 代码就是限流实现的核心,我把这段 lua 代码摘出来,并加了一些注释,我们来详细看下。
local rate = redis.call("hget", KEYS[1], "rate") # 100 localinterval = redis.call("hget", KEYS[1], "interval") # 3600000localtype = redis.call("hget", KEYS[1], "type") # 0
assert(rate ~= falseandinterval ~= falseandtype ~= false, "RateLimiter is not initialized")
local valueName = KEYS[2] # {xindoo.limiter}:value 用来存储剩余许可数量local permitsName = KEYS[4] # {xindoo.limiter}:permits 记录了所有许可发出的时间戳 # 如果是单实例模式,name信息后面就需要拼接上clientId来区分出来了iftype == "1"thenvalueName = KEYS[3] # {xindoo.limiter}:value:b474c7d5-862c-4be2-9656-f4011c269d54permitsName = KEYS[5] # {xindoo.limiter}:permits:b474c7d5-862c-4be2-9656-f4011c269d54end# 对参数校验
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
# 获取当前还有多少许可 local currentValue = redis.call("get", valueName)
local res
# 如果有记录当前还剩余多少许可 if currentValue ~= falsethen# 回收已过期的许可数量local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)local released = 0for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack("Bc0I", v)released = released + permitsend# 清理已过期的许可记录if released > 0thenredis.call("zremrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)if tonumber(currentValue) + released > tonumber(rate) thencurrentValue = tonumber(rate) - redis.call("zcard", permitsName)elsecurrentValue = tonumber(currentValue) + releasedendredis.call("set", valueName, currentValue)end# ARGV permit timestamp random, random是一个随机的8字节# 如果剩余许可不够,需要在res中返回下个许可需要等待多长时间 if tonumber(currentValue) < tonumber(ARGV[1]) thenlocal firstValue = redis.call("zrange", permitsName, 0, 0, "withscores")res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]))elseredis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))# 减小可用许可量 redis.call("decrby", valueName, ARGV[1])res = nilendelse# 反之,记录到还有多少许可,说明是初次使用或者之前已记录的信息已经过期了,就将配置rate写进去,并减少许可数 redis.call("set", valueName, rate)redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))redis.call("decrby", valueName, ARGV[1])res = nil
endlocal ttl = redis.call("pttl", KEYS[1])
# 重置if ttl > 0thenredis.call("pexpire", valueName, ttl)redis.call("pexpire", permitsName, ttl)
endreturn res
即便是加了注释,相信你还是很难一下子看懂这段代码的,接下来我就以其在 Redis 中的数据存储形式,然辅以流程图让大家彻底了解其实现实现原理。
首先用 RRateLimiter 有个 name,在我代码中就是 xindoo.limiter,用这个作为 KEY 你就可以在 Redis 中找到一个 map,里面存储了 limiter 的工作模式 (type)、可数量 (rate)、时间窗口大小 (interval),这些都是在 limiter 创建时写入到的 redis 中的,在上面的 lua 代码中也使用到了。
其次还俩很重要的 key,valueName 和 permitsName,其中在我的代码实现中 valueName 是 {xindoo.limiter}:value ,它存储的是当前可用的许可数量。我代码中 permitsName 的具体值是 {xindoo.limiter}:permits,它是一个 zset,其中存储了当前所有的许可授权记录(含有许可授权时间戳),其中 SCORE 直接使用了时间戳,而 VALUE 中包含了 8 字节的随机值和许可的数量,如下图:
{xindoo.limiter}:permits 这个 zset 中存储了所有的历史授权记录,直到了这些信息,相信你也就理解了 RRateLimiter 的实现原理,我们还是将上面的那大段 Lua 代码的流程图绘制出来,整个执行的流程会更直观。
看到这大家应该能理解这段 Lua 代码的逻辑了,可以看到 Redis 用了多个字段来存储限流的信息,也有各种各样的操作,那 Redis 是如何保证在分布式下这些限流信息数据的一致性的?答案是不需要保证,在这个场景下,信息天然就是一致性的。原因是 Redis 的单进程数据处理模型,在同一个 Key 下,所有的 eval 请求都是串行的,所有不需要考虑数据并发操作的问题。在这里,Redisson 也使用了 HashTag,保证所有的限流信息都存储在同一个 Redis 实例上。
RRateLimiter 使用时注意事项
了解了 RRateLimiter 的底层原理,再结合 Redis 自身的特性,我想到了 RRateLimiter 使用的几个局限点 (问题点)。
RRateLimiter 是非公平限流器
这个是我查阅资料得知,并且在自己代码实践的过程中也得到了验证,具体表现就是如果多个实例 (机器) 取竞争这些许可,很可能某些实例会获取到大部分,而另外一些实例可怜巴巴仅获取到少量的许可,也就是说容易出现旱的旱死 涝的涝死的情况。在使用过程中,你就必须考虑你能否接受这种情况,如果不能接受就得考虑用某些方式尽可能让其变公平。
Rate 不要设置太大
从 RRateLimiter 的实现原理你也看出了,它采用的是滑动窗口的模式来限流的,而且记录了所有的许可授权信息,所以如果你设置的 Rate 值过大,在 Redis 中存储的信息 (permitsName 对应的 zset) 也就越多,每次执行那段 lua 脚本的性能也就越差,这对 Redis 实例也是一种压力。个人建议如果你是想设置较大的限流阈值,倾向于小 Rate + 小时间窗口的方式,而且这种设置方式请求也会更均匀一些。
限流的上限取决于 Redis 单实例的性能
从原理上看,RRateLimiter 在 Redis 上所存储的信息都必须在一个 Redis 实例上,所以它的限流 QPS 的上限就是 Redis 单实例的上限,比如你 Redis 实例就是 1w QPS,你想用 RRateLimiter 实现一个 2w QPS 的限流器,必然实现不了。 那有没有突破 Redis 单实例性能上限的方式?单限流器肯定是实现不了的,我们可以拆分多个限流器,比如我搞 10 个限流器,名词用不一样的,然后每台机器随机使用一个限流器限流,实际的流量不就被分散到不同的限流器上了吗,总的限流上线不也就上来了。
分布式限流的本质
分布式限流的本质实际上就是协同,协同的本质就是信息交换,信息交换最重要的的就是信息的准确性和一致性。 更简单粗暴理解,分布式限流的本质原理其实还是分布式数据一致性的原理,而限流只是数据结果的一种决策。所以只要以任何方式能让信息同步,且保证信息的正确性就可以实现一个分布式限流器了,这就是我理解的本质思路。
其实从上面的 RRateLimiter 的实现原理也可以看出来,它不就是存储了一些信息吗! 那我不用 Redis,而是使用 mysql 行不行。实际肯定是可以的,只要将的上面 Lua 代码中的所有操作都放到一个事务里,且事务的级别改成串行化,依旧能实现 RRateLimiter 同样的功能。如果你具备 Mysql 相关知识的话,肯定也能基于 Mysql 将 RRateLimiter 的 API 封装出来,但是封装出来的限流器,其限流的上限就取决于 Mysql 实例的性能上限。
最近 chatGPT 比较火,我也问了下它对分布式限流本质原理的理解,下面是它的回答,大家觉得怎么样?
分布式限流的本质原理是通过在分布式系统中共享限流状态来限制系统中单位时间内的请求数量,从而避免系统因流量过大而崩溃。
这是通过使用一些共享的存储组件,如数据库,缓存,分布式锁等来实现的。在每次请求时,系统会检查当前的请求数量是否超过了预先设定的限制,如果超过了限制,请求就会被拒绝;如果未超过限制,请求就会被允许。
通过使用分布式限流技术,系统可以在高并发情况下保持稳定的性能,并避免因流量过大而导致的系统崩溃。