目录
- 该项目已经实现的限流方法
- 限流流程图
- 限流规则存储
- 针对接口限流
- 限流算法实现主要代码部分
- 固定窗口
- 令牌桶
- 限制并发数
- 注意点
前言: 上一篇文章介绍了OpenResty的安装和lua的基本语法学习,这一篇介绍一下使用OpenResty进行限流。本文只对限流功能实现做简单介绍,不对限流算法原理做过多解释。
该项目已经实现的限流方法
- 固定窗口(限制单位时间内接口可以请求的数量)
- 令牌桶
- 计数器(限制接口并发连接数)
限流流程图
限流规则存储
针对接口限流
限流算法实现主要代码部分
固定窗口
time_range_request_count(rule_detail)local request_uri = ngx.var.urilocal limitTimeRange = rule_detail.limitTimeRangelocal limitNum = rule_detail.limitNumif not limitTimeRange or not limitNum thenngx.log(ngx.ERR, "limitTimeRange or limitNum is nil")returnend-- 检查 limitTimeRange 和 limitNum 的类型if type(limitTimeRange) ~= "number" or type(limitNum) ~= "number" thenngx.log(ngx.ERR, "limitTimeRange 或 limitNum 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))returnendlocal lim, err = limit_count.new("api_rate_limit_req_store", limitNum, limitTimeRange)if not lim thenngx.log(ngx.ERR, "创建限流器失败 err: ", err.." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)--return ngx.exit(500)baseUtil.api_rate_limit_res(500,500,"服务器异常")endlocal delay, response = lim:incoming(request_uri, true)if not delay thenif response == "rejected" thenngx.log(ngx.ERR, "限制请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)--return ngx.exit(429)baseUtil.api_rate_limit_res(429,429,"服务器异常")endngx.log(ngx.ERR, "failed to limit count: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)--return ngx.exit(500)baseUtil.api_rate_limit_res(500,500,"服务器异常")end
endreturn {time_range_request_count = time_range_request_count
}
令牌桶
local limit_req = require "resty.limit.req"
local cjson = require "cjson"
local baseUtil = require "BaseUtil"-- 令牌桶算法限流
local function token_bucket_algorithm(rule_detail)local request_uri = ngx.var.uri-- 限流速率 r/slocal rate = rule_detail.rate-- 令牌数量local burst = rule_detail.burst-- 检查 rate 和 burst 的类型if type(rate) ~= "number" or type(burst) ~= "number" thenngx.log(ngx.ERR, "rate 或 burst 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))returnendlocal lim, err = limit_req.new("api_rate_limit_req_store", rate, burst)if not lim thenngx.log(ngx.ERR, "令牌桶限流,创建限流器失败", err)--return ngx.exit(500)baseUtil.api_rate_limit_res(500,500,"服务器异常")end-- 调用限流器local delay, response = lim:incoming(request_uri, true)if not delay thenif response == "rejected" thenngx.log(ngx.ERR, "限制请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)--return ngx.exit(429) -- 返回 HTTP 429 状态码baseUtil.api_rate_limit_res(429,429,"服务器异常")endngx.log(ngx.ERR, "调用限流器失败: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)--return ngx.exit(500)baseUtil.api_rate_limit_res(500,500,"服务器异常")end-- 如果请求没有超过限制,正常处理请求if delay >= 0 thenngx.sleep(delay)end
endreturn {token_bucket_algorithm = token_bucket_algorithm
}
限制并发数
local limit_conn = require "resty.limit.conn"
local cjson = require "cjson"
local baseUtil = require "BaseUtil"-- 限制最大并发连接数
local function concurrent_connections(rule_detail)local key = ngx.var.urilocal concurrentNum = rule_detail.concurrentNumlocal concurrentExtraNum = rule_detail.concurrentExtraNumlocal concurrentTime = rule_detail.concurrentTime-- 检查 concurrentNum、concurrentExtraNum、concurrentTime 的类型if type(concurrentNum) ~= "number" or type(concurrentExtraNum) ~= "number" or type(concurrentTime) ~= "number"thenngx.log(ngx.ERR, "concurrentNum 或 concurrentExtraNum 或 concurrentTime 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))returnendlocal lim, err = limit_conn.new("api_rate_limit_req_store", concurrentNum, concurrentExtraNum, concurrentTime)if not lim thenngx.log(ngx.ERR, "限制最大并发连接数,创建限流器失败", err)baseUtil.api_rate_limit_res(500,500,"服务器异常")endlocal delay, response = lim:incoming(key, true)if not delay thenif response == "rejected" thenngx.log(ngx.ERR, "限制请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..key)baseUtil.api_rate_limit_res(429,429,"服务器异常")endngx.log(ngx.ERR, "failed to limit req: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..key)baseUtil.api_rate_limit_res(500,500,"服务器异常")end-- 如果请求没有超过并发限制,那么将连接信息添加到ngx.ctx中if lim:is_committed() thenlocal ctx = ngx.ctxctx.limit_type = "concurrent_connections"ctx.limit_conn = limctx.limit_conn_key = keyctx.limit_conn_delay = delayendif delay >= 0.001 thenngx.log(ngx.WARN, "delaying conn, excess ", delay, "s per binary_remote_addr by limit_conn_store")ngx.sleep(delay)end
end-- 请求离开,减少数量
local function concurrent_connections_leaving()local ctx = ngx.ctxlocal lim = ctx.limit_connif lim thenlocal key = ctx.limit_conn_keylocal latency = ctx.limit_conn_delaylocal limit_type = ctx.limit_typeif not limit_type thenreturnendif limit_type ~= "concurrent_connections" thenreturnend--ngx.log(ngx.ERR, "释放连接", "request: ", key.." latency: "..latency)local conn, err = lim:leaving(key, latency)if not conn thenngx.log(ngx.ERR, "failed to record the connection leaving ", "request: ", err.." key: "..key)returnendend
endreturn {concurrent_connections = concurrent_connections,concurrent_connections_leaving = concurrent_connections_leaving
}
注意点
- 实际情况下,生产nginx会存在多台,所以这里将规则存储在redis当中
- OpenResty本身只支持单机的redis连接,所以这里需要单独引入redis集群模块。参考地址:https://developer.aliyun.com/article/1334650
上面就是几种限流算法用lua实现的关键部分逻辑,代码仅供参考学习,欢迎留言讨论。后续会将完整的项目代码更新到远程仓库,欢迎关注我的公众号,后续会在公众号提供获取地址。下一篇会继续更新使用redis来实现上面几种限流逻辑,进行集群限流