我们很高兴地宣布作为 .NET 7 的一部分的内置速率限制支持。速率限制提供了一种保护资源的方法,以避免让您的应用程序不堪重负并将流量保持在安全水平。
什么是限速?
速率限制是限制可以访问多少资源的概念。例如,您知道您的应用程序访问的数据库每分钟可以安全地处理 1000 个请求,但不确定它可以处理更多的请求。您可以在应用程序中放置一个速率限制器,每分钟允许 1000 个请求,并在他们可以访问数据库之前拒绝更多请求。因此,限制您的数据库的速率并允许您的应用程序处理安全数量的请求,而不会出现来自数据库的潜在故障。
有多种不同的速率限制算法来控制请求流。我们将介绍 .NET 7 中提供的其中 4 个。
并发限制
并发限制器限制有多少并发请求可以访问资源。如果你的限制是 10,那么 10 个请求可以同时访问一个资源,第 11 个请求将不被允许。一旦请求完成,允许的请求数量增加到 1,当第二个请求完成时,数量增加到 2,等等。这是通过释放 RateLimitLease来完成的,我们稍后会讨论。
令牌桶限制
令牌桶是一种算法,它的名字来源于描述它的工作原理。想象有一个装满令牌的桶。当一个请求进来时,它会获取一个令牌并永远保存它。经过一段一致的时间后,有人将预定数量的令牌添加回桶中,永远不会超过桶可以容纳的数量。如果存储桶为空,则当请求进入时,该请求将被拒绝访问资源。
举一个更具体的例子,假设桶可以容纳 10 个令牌,每分钟有 2 个令牌被添加到桶中。当一个请求进来时,它需要一个令牌,所以我们剩下 9 个,另外 3 个请求进来,每个请求都接受一个令牌,给我们留下 6 个令牌,一分钟后我们得到 2 个新令牌,这使我们处于 8. 8请求进来并拿走剩余的令牌,给我们留下 0。如果另一个请求进来,则在我们获得更多令牌之前不允许访问资源,这每分钟都会发生一次。在 5 分钟没有请求后,存储桶将再次拥有所有 10 个令牌,并且在随后的几分钟内不会再添加任何令牌,除非请求需要更多令牌。
固定窗口限制
固定窗口算法使用窗口的概念,该窗口也将在下一个算法中使用。窗口是在我们进入下一个窗口之前应用我们的限制的时间量。在固定窗口情况下,移动到下一个窗口意味着将限制重置回其起点。假设有一个电影院,有一个单人间,可容纳 100 人,电影播放时长 2 小时。当电影开始时,我们让人们开始排队等待 2 小时后的下一场放映,最多允许 100 人排队,然后我们开始告诉他们改天再来。2 小时的电影结束后,0 到 100 人的队伍可以进入电影院,我们重新开始排队。这与固定窗口算法中的移动窗口相同。
滑动窗口限制
滑动窗口算法类似于固定窗口算法,但增加了段。一个片段是一个窗口的一部分,如果我们将前一个 2 小时的窗口分成 4 个片段,我们现在有 4 个 30 分钟的片段。还有一个当前段索引,它将始终指向窗口中的最新段。30 分钟内的请求进入当前段,并且每 30 分钟窗口滑动一个段。如果在窗口滑过的段期间有任何请求,这些请求现在会被刷新,我们的限制会增加该数量。如果没有任何请求,我们的限制保持不变。
例如,让我们使用具有 3 个 10 分钟分段和 100 个请求限制的滑动窗口算法。我们的初始状态是 3 个段,计数均为 0,我们当前的段索引指向第 3 个段。
滑动窗口,空段和段 3 处的当前段指针,窗口覆盖段 1-3
在前 10 分钟内,我们收到 50 个请求,所有这些请求都在第 3 段(我们当前的段索引)中进行跟踪。10 分钟过去后,我们将窗口滑动 1 段,同时将当前段索引移动到第 4 段。第 1 段中的任何使用请求现在都添加回我们的限制。由于没有,我们的限制是 50(因为 50 已经在第 3 段中使用)。
滑动窗口,段 3 中有 50 个请求,当前段指针位于段 4,窗口移动到覆盖段 2-4
在接下来的 10 分钟内,我们又收到了 20 个请求,所以现在第 3 段有 50 个,第 4 段有 20 个。同样,我们在 10 分钟过去后滑动窗口,因此我们当前的段索引指向 5,我们将来自段 2 的任何请求添加到我们的限制。
滑动窗口,段 3 和 4 中的 50 和 20 个请求,段 5 处的当前段指针,窗口覆盖段 3-5
10 分钟后,我们再次滑动窗口,这一次窗口滑动时,当前段索引为 6,段 3(有 50 个请求的段)现在位于窗口之外。所以我们取回了 50 个请求并将它们添加到我们的限制中,现在将是 80,因为段 4 仍有 20 个在使用。
滑动窗口,50 个请求在段 3 中划掉,当前段指针在段 6,窗口覆盖段 4-6
速率限制器 API
在 .NET 7 中引入新的 nuget 包System.Threading.RateLimiting!
这个包提供了编写速率限制器的原语,并提供了一些内置的常用算法。主要类型是抽象基类RateLimiter。
public abstract class RateLimiter : IAsyncDisposable, IDisposable
{public abstract int GetAvailablePermits();public abstract TimeSpan? IdleDuration { get; }public RateLimitLease Acquire(int permitCount = 1);public ValueTask<RateLimitLease> WaitAsync(int permitCount = 1, CancellationToken cancellationToken = default);public void Dispose();public ValueTask DisposeAsync();
}
RateLimiter包含Acquire并WaitAsync作为尝试获得受保护资源许可的核心方法。根据应用程序,受保护资源可能需要获得超过 1 个许可,因此Acquire两者WaitAsync都接受可选permitCount参数。Acquire是一种同步方法,它将检查是否有足够的许可可用并返回一个RateLimitLease包含有关您是否成功获得许可的信息的信息。WaitAsync类似于,Acquire除了它可以支持排队许可请求,当许可可用时,可以在将来的某个时间出队,这就是为什么它是异步的并且接受一个可选CancellationToken的以允许取消排队的请求。
RateLimitLease有一个IsAcquired属性,用于查看是否获得了许可证。此外,RateLimitLease如果租约失败,可能包含元数据,例如建议的重试期限(将在后面的示例中显示)。最后,它RateLimitLease是一次性的,应该在代码使用受保护的资源完成时被释放。处置将让RateLimiter知道根据获得的许可证数量更新其限制。下面是使用 1RateLimiter尝试获取具有 1 个许可证的资源的示例。
RateLimiter limiter = GetLimiter();
using RateLimitLease lease = limiter.Acquire(permitCount: 1);
if (lease.IsAcquired)
{// Do action that is protected by limiter
}
else
{// Error handling or add retry logic
}
在上面的示例中,我们尝试使用同步Acquire方法获取 1 个许可证。我们还用于using确保在使用完资源后处理租约。然后检查租约以查看我们请求的许可是否已获得,如果是,我们就可以使用受保护的资源,否则我们可能希望进行一些日志记录或错误处理以通知用户或应用程序该资源未被使用由于达到了速率限制。
尝试获得许可的另一种方法是WaitAsync。此方法允许排队许可并等待许可可用(如果它们不可用)。让我们用另一个例子来解释排队的概念。
RateLimiter limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2));// thread 1:
using RateLimitLease lease = limiter.Acquire(permitCount: 2);
if (lease.IsAcquired) { }// thread 2:
using RateLimitLease lease = await limiter.WaitAsync(permitCount: 2);
if (lease.IsAcquired) { }
在这里,我们展示了使用内置速率限制实现之一的第一个示例,ConcurrencyLimiter. 我们创建了最大许可限制为 2 和队列限制为 2 的限制器。这意味着在任何时候都可以获取最多 2 个许可,并且我们允许排队WaitAsync呼叫最多有 2 个许可请求。
该queueProcessingOrder参数决定队列中项目的处理顺序,它可以是(FIFO)或(LIFO)的值。需要注意的一个有趣行为是,在队列已满时使用将完成最旧的排队呼叫,但失败,直到队列中有空间容纳最新的队列项。QueueProcessingOrder.OldestFirstQueueProcessingOrder.NewestFirstQueueProcessingOrder.NewestFirstWaitAsyncRateLimitLease
在此示例中,有 2 个线程试图获取许可。如果线程 1 先运行,它将成功获得 2 个许可,并且WaitAsyncin 线程 2 将排队等待RateLimitLeasein 线程 1 被释放。此外,如果另一个线程尝试使用Acquireor获取许可WaitAsync,它将立即收到属性等于 false 的 a RateLimitLease,IsAcquired因为permitLimitandqueueLimit已经用完。
如果线程 2 首先运行,它将立即获得RateLimitLease等于IsAcquiredtrue 的 a,并且当线程 1 下一次运行时(假设线程 2 中的租约尚未释放),它将同步获得RateLimitLease属性IsAcquired等于 false 的 a,因为Acquire不queue 并且被调用permitLimit用完。WaitAsync
到目前为止,我们已经看到了ConcurrencyLimiter,我们还提供了 3 个其他限制器。TokenBucketRateLimiter, FixedWindowRateLimiter, 和所有这些都实现了自己实现SlidingWindowRateLimiter的抽象类。介绍了该方法以及用于观察限制器上的常见设置的几个属性。将在展示这些速率限制器的一些示例后进行解释。
ReplenishingRateLimiterRateLimiterReplenishingRateLimiterTryReplenishTryReplenishRateLimiter limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));using RateLimitLease lease = await limiter.WaitAsync(5);// will complete after ~5 seconds
using RateLimitLease lease2 = await limiter.WaitAsync();
在这里我们展示了TokenBucketRateLimiter,它比ConcurrencyLimiter. 这replenishmentPeriod是将新令牌(与许可的概念相同,只是令牌桶上下文中更好的名称)添加回限制的频率。在此示例tokensPerPeriod中为 1 和replenishmentPeriod5 秒,因此每 5 秒添加 1 个令牌tokenLimit,最多为 5。最后,autoReplenishment设置为 true,这意味着限制器将在Timer内部创建一个来处理令牌的补充每 5 秒。
如果autoReplenishment设置为 false,则由开发人员调用TryReplenish限制器。ReplenishingRateLimiter这在管理多个实例并希望通过创建单个实例并自己管理补充调用来降低开销时很有用Timer,而不是让每个限制器创建一个Timer.
ReplenishingRateLimiter[] limiters = GetLimiters();
Timer rateLimitTimer = new Timer(static state =>
{var replenishingLimiters = (ReplenishingRateLimiter[])state;foreach (var limiter in replenishingLimiters){limiter.TryReplenish();}
}, limiters, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
FixedWindowRateLimiter有一个window选项定义窗口更新需要多长时间。
new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(permitLimit: 2,queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 1, window: TimeSpan.FromSeconds(10), autoReplenishment: true));
除了指定有多少段以及窗口滑动的频率之外,还有SlidingWindowRateLimiter一个选项。
segmentsPerWindowwindownew SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(permitLimit: 2,queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 1, window: TimeSpan.FromSeconds(10), segmentsPerWindow: 5, autoReplenishment: true));
回到前面提到的元数据,让我们展示一个元数据可能有用的例子。
class RateLimitedHandler : DelegatingHandler
{private readonly RateLimiter _rateLimiter;public RateLimitedHandler(RateLimiter limiter) : base(new HttpClientHandler()){_rateLimiter = limiter;}protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken){using RateLimitLease lease = await _rateLimiter.WaitAsync(1, cancellationToken);if (lease.IsAcquired){return await base.SendAsync(request, cancellationToken);}var response = new HttpResponseMessage(System.Net.HttpStatusCode.TooManyRequests);if (lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter)){response.Headers.Add(HeaderNames.RetryAfter, ((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo));}return response;}
}RateLimiter limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));;
HttpClient client = new HttpClient(new RateLimitedHandler(limiter));
await client.GetAsync("https://example.com");
在这个例子中,我们设置了一个速率限制HttpClient,如果我们未能获得请求的许可,我们希望返回一个失败的 HTTP 请求,并带有 429 状态代码(请求太多),而不是向我们的下游资源发出 HTTP 请求。此外,429 响应可以包含“Retry-After”标头,让消费者知道重试何时可能成功。我们通过在RateLimitLeaseusingTryGetMetadata和. 我们还使用 ,因为它能够计算所请求令牌数量何时可用的估计值,因为它知道它多久补充一次令牌。鉴于无法知道许可证何时可用,因此它不会提供任何元数据。MetadataName.RetryAfterTokenBucketRateLimiterConcurrencyLimiterRetryAfter
MetadataName是一个静态类,它提供了几个预先创建的实例,我们刚刚看到的,它的类型为,而,它的类型为. 还有一种静态方法可用于创建您自己的强类型命名元数据键。有 2 个重载,一个用于具有参数的强类型,另一个接受元数据名称的字符串并具有参数。MetadataName<T>MetadataName.RetryAfterMetadataName<TimeSpan>MetadataName.ReasonPhraseMetadataName<string>MetadataName.Create<T>(string name)RateLimitLease.TryGetMetadataMetadataName<T>out Tout object
现在让我们看看另一个被引入的 API 来帮助处理更复杂的场景,PartitionedRateLimiter!
分区速率限制器
System.Threading.RateLimiting nuget 包中还包含. 这是一个与类非常相似的抽象,除了它接受一个实例作为其方法的参数。例如现在:. 这对于您可能希望根据传入的值更改速率限制行为的场景很有用。这可以是诸如不同s 的独立并发限制或更复杂的场景(例如将 X 和 Y 分组在相同的并发限制下)之类的东西,但是W 和 Z 在令牌桶限制之下。PartitionedRateLimiter<TResource>RateLimiterTResourceAcquireAcquire(TResource resourceID, int permitCount = 1)TResourceTResource
为了帮助常见用法,我们提供了一种构建via的方法。
PartitionedRateLimiter<TResource>PartitionedRateLimiter.Create<TResource, TPartitionKey>(...)enum MyPolicyEnum
{One,Two,Admin,Default
}PartitionedRateLimiter<string> limiter = PartitionedRateLimiter.Create<string, MyPolicyEnum>(resource =>
{if (resource == "Policy1"){return RateLimitPartition.Create(MyPolicyEnum.One, key => new MyCustomLimiter());}else if (resource == "Policy2"){return RateLimitPartition.CreateConcurrencyLimiter(MyPolicyEnum.Two, key =>new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2));}else if (resource == "Admin"){return RateLimitPartition.CreateNoLimiter(MyPolicyEnum.Admin);}else{return RateLimitPartition.CreateTokenBucketLimiter(MyPolicyEnum.Default, key =>new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));}
});
RateLimitLease lease = limiter.Acquire(resourceID: "Policy1", permitCount: 1);// ...RateLimitLease lease = limiter.Acquire(resourceID: "Policy2", permitCount: 1);// ...RateLimitLease lease = limiter.Acquire(resourceID: "Admin", permitCount: 12345678);// ...RateLimitLease lease = limiter.Acquire(resourceID: "other value", permitCount: 1);
PartitionedRateLimiter.Create有 2 个泛型类型参数,第一个代表资源类型,它也将TResource在返回的. 第二种通用类型是分区键类型,在上面的示例中我们使用 作为我们的键类型。key 用于区分具有相同限制器的一组实例,也就是我们所说的分区。接受我们称之为分区器的 a。每次通过or与函数交互并从函数返回 a时都会调用此函数。包含一个方法,该方法是用户如何指定分区将具有的标识符以及将与该标识符关联的限制器。PartitionedRateLimiter<TResource>MyPolicyEnumTResourcePartitionedRateLimiter.CreateFunc<TResource, RateLimitPartition<TPartitionKey>>PartitionedRateLimiterAcquireWaitAsyncRateLimitPartition<TKey>RateLimitPartition<TKey>Create
在上面的第一个代码块中,我们正在检查资源是否与“Policy1”相等,如果它们匹配,我们将使用键创建一个分区并返回一个用于创建自定义的工厂。工厂被调用一次,然后速率限制器被缓存,因此未来对密钥的访问将使用相同的速率限制器实例。MyPolicyEnum.OneRateLimiterMyPolicyEnum.One
看第一个条件,当资源等于“Policy2”时,我们同样创建一个分区,这次我们使用便捷方法创建一个. 我们为此分区使用新的分区键,并指定将生成的选项。现在每个or for “Policy2” 将使用相同的.else ifCreateConcurrencyLimiterConcurrencyLimiterMyPolicyEnum.TwoConcurrencyLimiterAcquireWaitAsyncConcurrencyLimiter
我们的第三个条件是我们的“管理员”资源,我们不想限制我们的管理员,所以我们使用CreateNoLimiter不会应用任何限制。我们还为这个分区分配了分区键。MyPolicyEnum.Admin
最后,我们为所有其他资源使用TokenBucketLimiter实例提供了备用,我们将键分配给该分区。对我们条件未涵盖的资源的任何请求都将使用 this 。拥有一个非 noop 回退限制器通常是一个很好的做法,以防您将来没有涵盖所有条件或向您的应用程序添加新行为。MyPolicyEnum.DefaultifTokenBucketLimiter
在下一个示例中,让我们将PartitionedRateLimiter与我们HttpClient之前自定义的结合起来。我们将HttpRequestMessage用作 的资源类型PartitionedRateLimiter,这是我们在 的SendAsync方法中获得的类型DelegatingHandler。和 astring用于我们的分区键,因为我们将基于 url 路径进行分区。
PartitionedRateLimiter<HttpRequestMessage> limiter = PartitionedRateLimiter.Create<HttpRequestMessage, string>(resource =>
{if (resource.RequestUri?.IsLoopback){return RateLimitPartition.CreateNoLimiter("loopback");}string[]? segments = resource.RequestUri?.Segments;if (segments?.Length >= 2 && segments[1] == "api/"){// segments will be [] { "/", "api/", "next_path_segment", etc.. }return RateLimitPartition.CreateConcurrencyLimiter(segments[2].Trim('/'), key =>new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2));}return RateLimitPartition.Create("default", key => new MyCustomLimiter());
});class RateLimitedHandler : DelegatingHandler
{private readonly PartitionedRateLimiter<HttpRequestMessage> _rateLimiter;public RateLimitedHandler(PartitionedRateLimiter<HttpRequestMessage> limiter) : base(new HttpClientHandler()){_rateLimiter = limiter;}protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken){using RateLimitLease lease = await _rateLimiter.WaitAsync(request, 1, cancellationToken);if (lease.IsAcquired){return await base.SendAsync(request, cancellationToken);}var response = new HttpResponseMessage(System.Net.HttpStatusCode.TooManyRequests);if (lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter)){response.Headers.Add(HeaderNames.RetryAfter, ((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo));}return response;}
}
仔细观察PartitionedRateLimiter上面的例子,我们的第一个检查是本地主机,我们已经决定如果用户在本地做事情我们不想限制他们,他们不会使用我们正在尝试的上游资源保护。下一个检查更有趣,我们正在查看 url 路径并查找对/api/端点的任何请求。如果请求匹配,我们会抓取路径的一部分并为该特定路径创建一个分区。这意味着任何请求都将使用我们的一个实例,而任何请求都将使用我们的不同实例/api/apple/*ConcurrencyLimiter/api/orange/ConcurrencyLimiter. 这是因为我们为这些请求使用了不同的分区键,因此我们的限制器工厂为不同的分区生成了一个新的限制器。最后,对于任何不是针对本地主机或端点的请求,我们都有一个回退限制。/api/
还显示了更新RateLimitedHandler,它现在接受 a而不是 a并传递给调用,否则其余代码保持不变。PartitionedRateLimiterRateLimiterrequestWaitAsync
在这个例子中有几件事值得指出。如果发出大量独特的请求,我们可能会创建许多分区,这将导致我们的. 返回的 from确实有一些逻辑可以在一段时间未使用限制器后删除限制器以帮助缓解这种情况,但应用程序开发人员也应该意识到创建无界分区并尽可能避免这种情况。此外,我们有我们的分区键,调用是为了避免在这种情况下使用不同的限制器,因为在使用./api/*PartitionedRateLimiterPartitionedRateLimiterPartitionedRateLimiter.Createsegments[2].Trim(‘/’)Trim/api/apple/api/apple/Uri.Segments
也可以在不使用该方法的情况下编写自定义实现。下面是使用每个资源的并发限制的自定义实现示例。所以资源有自己的限制,有自己的限制,等等。这具有更灵活和潜在更高效的优势,但代价是更高的维护成本。PartitionedRateLimiterPartitionedRateLimiter.Createint12
public sealed class PartitionedConcurrencyLimiter : PartitionedRateLimiter<int>
{private ConcurrentDictionary<int, int> _keyLimits = new();private int _permitLimit;private static readonly RateLimitLease FailedLease = new Lease(null, 0, 0);public PartitionedConcurrencyLimiter(int permitLimit){_permitLimit = permitLimit;}public override int GetAvailablePermits(int resourceID){if (_keyLimits.TryGetValue(resourceID, out int value)){return value;}return 0;}protected override RateLimitLease AcquireCore(int resourceID, int permitCount){if (_permitLimit < permitCount){return FailedLease;}bool wasUpdated = false;_keyLimits.AddOrUpdate(resourceID, (key) =>{wasUpdated = true;return _permitLimit - permitCount;}, (key, currentValue) =>{if (currentValue >= permitCount){wasUpdated = true;currentValue -= permitCount;}return currentValue;});if (wasUpdated){return new Lease(this, resourceID, permitCount);}return FailedLease;}protected override ValueTask<RateLimitLease> WaitAsyncCore(int resourceID, int permitCount, CancellationToken cancellationToken){return new ValueTask<RateLimitLease>(AcquireCore(resourceID, permitCount));}private void Release(int resourceID, int permitCount){_keyLimits.AddOrUpdate(resourceID, _permitLimit, (key, currentValue) =>{currentValue += permitCount;return currentValue;});}private sealed class Lease : RateLimitLease{private readonly int _permitCount;private readonly int _resourceId;private PartitionedConcurrencyLimiter? _limiter;public Lease(PartitionedConcurrencyLimiter? limiter, int resourceId, int permitCount){_limiter = limiter;_resourceId = resourceId;_permitCount = permitCount;}public override bool IsAcquired => _limiter is not null;public override IEnumerable<string> MetadataNames => throw new NotImplementedException();public override bool TryGetMetadata(string metadataName, out object? metadata){throw new NotImplementedException();}protected override void Dispose(bool disposing){if (_limiter is null){return;}_limiter.Release(_resourceId, _permitCount);_limiter = null;}}
}PartitionedRateLimiter<int> limiter = new PartitionedConcurrencyLimiter(permitLimit: 10);
// both will be successful acquisitions as they use different resource IDs
RateLimitLease lease = limiter.Acquire(resourceID: 1, permitCount: 10);
RateLimitLease lease2 = limiter.Acquire(resourceID: 2, permitCount: 7);
此实现确实存在一些问题,例如从不删除字典中的条目,不支持排队,以及在访问元数据时抛出,因此请将其作为实现自定义的灵感,不要在未修改代码的情况下复制。PartitionedRateLimiter
现在我们已经了解了主要的 API,让我们来看看 ASP.NET Core 中利用这些原语的 RateLimiting 中间件。
限速中间件
此中间件是通过Microsoft.AspNetCore.RateLimiting NuGet 包提供的。主要的使用模式是配置一些速率限制策略,然后将这些策略附加到您的端点。策略是一个命名的,它与方法所采用的相同,现在在哪里并且仍然是用户定义的密钥。当您想为策略配置单个限制器而不需要不同的分区时,还有 4 个内置速率限制器的扩展方法。Func<HttpContext,
RateLimitPartition<TPartitionKey>>PartitionedRateLimiter.CreateTResourceHttpContextTPartitionKeyvar app = WebApplication.Create(args);app.UseRateLimiter(new RateLimiterOptions().AddConcurrencyLimiter(policyName: "get", new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2)).AddNoLimiter(policyName: "admin").AddPolicy(policyName: "post", partitioner: httpContext =>{if (!StringValues.IsNullOrEmpty(httpContext.Request.Headers["token"])){return RateLimitPartition.CreateTokenBucketLimiter("token", key =>new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));}else{return RateLimitPartition.Create("default", key => new MyCustomLimiter());}}));app.MapGet("/get", context => context.Response.WriteAsync("get")).RequireRateLimiting("get");app.MapGet("/admin", context => context.Response.WriteAsync("admin")).RequireRateLimiting("admin").RequireAuthorization("admin");app.MapPost("/post", context => context.Response.WriteAsync("post")).RequireRateLimiting("post");app.Run();
这个例子展示了如何添加中间件,配置一些策略,并将不同的策略应用到不同的端点。从顶部开始,我们将中间件添加到我们的中间件管道中,使用UseRateLimiter. 接下来,我们使用便捷方法将一些策略添加到我们的选项中,AddConcurrencyLimiter并AddNoLimiter为其中的 2 个策略,分别命名为"get"和"admin"。然后我们使用AddPolicy允许根据传入的资源(HttpContext对于中间件)配置不同分区的方法。最后,我们RequireRateLimiting在各种端点上使用该方法让速率限制中间件知道在哪个端点上运行什么策略。(注意上面的RequireAuthorization用法/admin端点在这个最小示例中没有做任何事情,想象一下配置了身份验证和授权)
该AddPolicy方法还有另外 2 个使用. 该接口公开了一个回调,与我将在下面描述的相同,以及一个将.作为参数并返回. 第一个重载接受一个实例,第二个接受一个实现作为泛型参数。通用参数一将使用依赖注入来调用构造函数并为您实例化。
IRateLimiterPolicy<TPartitionKey>OnRejectedRateLimiterOptionsGetPartitionHttpContextRateLimitPartition<TPartitionKey>AddPolicyIRateLimiterPolicyIRateLimiterPolicyIRateLimiterPolicypublic class CustomRateLimiterPolicy<string> : IRateLimiterPolicy<string>
{private readonly ILogger _logger;public CustomRateLimiterPolicy(ILogger<CustomRateLimiterPolicy<string>> logger){_logger = logger;}public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected{get => (context, lease) =>{context.HttpContext.Response.StatusCode = 429;_logger.LogDebug("Request rejected");return new ValueTask();};}public RateLimitPartition<string> GetPartition(HttpContext context){if (!StringValues.IsNullOrEmpty(httpContext.Request.Headers["token"])){return RateLimitPartition.CreateTokenBucketLimiter("token", key =>new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));}else{return RateLimitPartition.Create("default", key => new MyCustomLimiter());}}
}var app = WebApplication.Create(args);
var logger = app.Services.GetRequiredService<ILogger<CustomRateLimiterPolicy<string>>>();app.UseRateLimiter(new RateLimitOptions().AddPolicy("a", new CustomRateLimiterPolicy<string>(logger)).AddPolicy<CustomRateLimiterPolicy<string>>("b"));
其他配置RateLimiterOptions包括RejectionStatusCode如果租约获取失败将返回的状态码,默认返回503。对于更高级的用法,还有一个在使用OnRejected后将被调用RejectionStatusCode并作为参数接收OnRejectedContext的函数。
new RateLimiterOptions()
{OnRejected = (context, cancellationToken) =>{context.HttpContext.StatusCode = StatusCodes.Status429TooManyRequests;return new ValueTask();}
};
最后但并非最不重要的一点是,RateLimiterOptions允许配置全局via 。如果提供了 a ,它将在端点上指定的任何策略之前运行。例如,如果您想限制您的应用程序处理 1000 个并发请求,无论指定什么端点策略,您都可以使用这些设置配置 a 并设置属性。PartitionedRateLimiterRateLimiterOptions.GlobalLimiterGlobalLimiterPartitionedRateLimiterGlobalLimiter
概括
请尝试速率限制,让我们知道您的想法!对于 System.Threading.RateLimiting 命名空间中的 RateLimiting API,请使用 nuget 包System.Threading.RateLimiting并在运行时GitHub存储库中提供反馈。对于 RateLimiting 中间件,请使用 nuget 包Microsoft.AspNetCore.RateLimiting并在AspNetCore GitHub存储库中提供反馈。