Skip to content

Commit 81dff04

Browse files
authored
Add CreateChained to RateLimiter (#107230)
1 parent 343b034 commit 81dff04

File tree

10 files changed

+2089
-807
lines changed

10 files changed

+2089
-807
lines changed

src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public enum QueueProcessingOrder
8888
}
8989
public abstract partial class RateLimiter : System.IAsyncDisposable, System.IDisposable
9090
{
91+
public static System.Threading.RateLimiting.RateLimiter CreateChained(params System.Threading.RateLimiting.RateLimiter[] limiters) { throw null; }
9192
protected RateLimiter() { }
9293
public abstract System.TimeSpan? IdleDuration { get; }
9394
public System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> AcquireAsync(int permitCount = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }

src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>
1313
</PropertyGroup>
1414

1515
<ItemGroup>
16+
<Compile Include="System\Threading\RateLimiting\ChainedRateLimiter.cs" />
1617
<Compile Include="System\Threading\RateLimiting\ChainedPartitionedRateLimiter.cs" />
18+
<Compile Include="System\Threading\RateLimiting\CombinedRateLimitLease.cs" />
1719
<Compile Include="System\Threading\RateLimiting\ConcurrencyLimiter.cs" />
1820
<Compile Include="System\Threading\RateLimiting\ConcurrencyLimiterOptions.cs" />
1921
<Compile Include="System\Threading\RateLimiting\DefaultPartitionedRateLimiter.cs" />

src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs

Lines changed: 17 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,18 @@ internal sealed class ChainedPartitionedRateLimiter<TResource> : PartitionedRate
2020

2121
public ChainedPartitionedRateLimiter(PartitionedRateLimiter<TResource>[] limiters)
2222
{
23-
_limiters = limiters;
23+
_limiters = (PartitionedRateLimiter<TResource>[])limiters.Clone();
2424
}
2525

2626
public override RateLimiterStatistics? GetStatistics(TResource resource)
2727
{
2828
ThrowIfDisposed();
29+
2930
long lowestAvailablePermits = long.MaxValue;
3031
long currentQueuedCount = 0;
3132
long totalFailedLeases = 0;
3233
long innerMostSuccessfulLeases = 0;
34+
3335
foreach (PartitionedRateLimiter<TResource> limiter in _limiters)
3436
{
3537
if (limiter.GetStatistics(resource) is { } statistics)
@@ -38,11 +40,13 @@ public ChainedPartitionedRateLimiter(PartitionedRateLimiter<TResource>[] limiter
3840
{
3941
lowestAvailablePermits = statistics.CurrentAvailablePermits;
4042
}
43+
4144
currentQueuedCount += statistics.CurrentQueuedCount;
4245
totalFailedLeases += statistics.TotalFailedLeases;
4346
innerMostSuccessfulLeases = statistics.TotalSuccessfulLeases;
4447
}
4548
}
49+
4650
return new RateLimiterStatistics()
4751
{
4852
CurrentAvailablePermits = lowestAvailablePermits,
@@ -55,11 +59,14 @@ public ChainedPartitionedRateLimiter(PartitionedRateLimiter<TResource>[] limiter
5559
protected override RateLimitLease AttemptAcquireCore(TResource resource, int permitCount)
5660
{
5761
ThrowIfDisposed();
62+
5863
RateLimitLease[]? leases = null;
64+
5965
for (int i = 0; i < _limiters.Length; i++)
6066
{
6167
RateLimitLease? lease = null;
6268
Exception? exception = null;
69+
6370
try
6471
{
6572
lease = _limiters[i].AttemptAcquire(resource, permitCount);
@@ -68,7 +75,9 @@ protected override RateLimitLease AttemptAcquireCore(TResource resource, int per
6875
{
6976
exception = ex;
7077
}
71-
RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);
78+
79+
RateLimitLease? notAcquiredLease = ChainedRateLimiter.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);
80+
7281
if (notAcquiredLease is not null)
7382
{
7483
return notAcquiredLease;
@@ -81,11 +90,14 @@ protected override RateLimitLease AttemptAcquireCore(TResource resource, int per
8190
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(TResource resource, int permitCount, CancellationToken cancellationToken)
8291
{
8392
ThrowIfDisposed();
93+
8494
RateLimitLease[]? leases = null;
95+
8596
for (int i = 0; i < _limiters.Length; i++)
8697
{
8798
RateLimitLease? lease = null;
8899
Exception? exception = null;
100+
89101
try
90102
{
91103
lease = await _limiters[i].AcquireAsync(resource, permitCount, cancellationToken).ConfigureAwait(false);
@@ -94,7 +106,9 @@ protected override async ValueTask<RateLimitLease> AcquireAsyncCore(TResource re
94106
{
95107
exception = ex;
96108
}
97-
RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);
109+
110+
RateLimitLease? notAcquiredLease = ChainedRateLimiter.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);
111+
98112
if (notAcquiredLease is not null)
99113
{
100114
return notAcquiredLease;
@@ -116,145 +130,5 @@ private void ThrowIfDisposed()
116130
throw new ObjectDisposedException(nameof(ChainedPartitionedRateLimiter<TResource>));
117131
}
118132
}
119-
120-
private static RateLimitLease? CommonAcquireLogic(Exception? ex, RateLimitLease? lease, ref RateLimitLease[]? leases, int index, int length)
121-
{
122-
if (ex is not null)
123-
{
124-
AggregateException? innerEx = CommonDispose(leases, index);
125-
if (innerEx is not null)
126-
{
127-
Exception[] exceptions = new Exception[innerEx.InnerExceptions.Count + 1];
128-
innerEx.InnerExceptions.CopyTo(exceptions, 0);
129-
exceptions[exceptions.Length - 1] = ex;
130-
throw new AggregateException(exceptions);
131-
}
132-
throw ex;
133-
}
134-
135-
if (!lease!.IsAcquired)
136-
{
137-
AggregateException? innerEx = CommonDispose(leases, index);
138-
return innerEx is not null ? throw innerEx : lease;
139-
}
140-
141-
leases ??= new RateLimitLease[length];
142-
leases[index] = lease;
143-
return null;
144-
}
145-
146-
private static AggregateException? CommonDispose(RateLimitLease[]? leases, int i)
147-
{
148-
List<Exception>? exceptions = null;
149-
while (i > 0)
150-
{
151-
i--;
152-
try
153-
{
154-
leases![i].Dispose();
155-
}
156-
catch (Exception ex)
157-
{
158-
exceptions ??= new List<Exception>();
159-
exceptions.Add(ex);
160-
}
161-
}
162-
163-
if (exceptions is not null)
164-
{
165-
return new AggregateException(exceptions);
166-
}
167-
168-
return null;
169-
}
170-
171-
private sealed class CombinedRateLimitLease : RateLimitLease
172-
{
173-
private RateLimitLease[]? _leases;
174-
private HashSet<string>? _metadataNames;
175-
176-
public CombinedRateLimitLease(RateLimitLease[] leases)
177-
{
178-
_leases = leases;
179-
}
180-
181-
public override bool IsAcquired => true;
182-
183-
public override IEnumerable<string> MetadataNames
184-
{
185-
get
186-
{
187-
if (_leases is null)
188-
{
189-
return Enumerable.Empty<string>();
190-
}
191-
192-
if (_metadataNames is null)
193-
{
194-
_metadataNames = new HashSet<string>();
195-
foreach (RateLimitLease lease in _leases)
196-
{
197-
foreach (string metadataName in lease.MetadataNames)
198-
{
199-
_metadataNames.Add(metadataName);
200-
}
201-
}
202-
}
203-
return _metadataNames;
204-
}
205-
}
206-
207-
public override bool TryGetMetadata(string metadataName, out object? metadata)
208-
{
209-
if (_leases is not null)
210-
{
211-
foreach (RateLimitLease lease in _leases)
212-
{
213-
// Use the first metadata item of a given name, ignore duplicates, we can't reliably merge arbitrary metadata
214-
// Creating an object[] if there are multiple of the same metadataName could work, but makes consumption of metadata messy
215-
// and makes MetadataName.Create<T>(...) uses no longer work
216-
if (lease.TryGetMetadata(metadataName, out metadata))
217-
{
218-
return true;
219-
}
220-
}
221-
}
222-
223-
metadata = null;
224-
return false;
225-
}
226-
227-
protected override void Dispose(bool disposing)
228-
{
229-
if (_leases is null)
230-
{
231-
return;
232-
}
233-
234-
List<Exception>? exceptions = null;
235-
// Dispose in reverse order
236-
// Avoids issues where dispose might unblock a queued acquire and then the acquire fails when acquiring the next limiter.
237-
// When disposing in reverse order there wont be any issues of unblocking an acquire that affects acquires on limiters in the chain after it
238-
for (int i = _leases.Length - 1; i >= 0; i--)
239-
{
240-
try
241-
{
242-
_leases[i].Dispose();
243-
}
244-
catch (Exception ex)
245-
{
246-
exceptions ??= new List<Exception>();
247-
exceptions.Add(ex);
248-
}
249-
}
250-
251-
_leases = null;
252-
253-
if (exceptions is not null)
254-
{
255-
throw new AggregateException(exceptions);
256-
}
257-
}
258-
}
259133
}
260134
}

0 commit comments

Comments
 (0)