diff --git a/src/ReverseProxy/Service/Proxy/HttpProxy.cs b/src/ReverseProxy/Service/Proxy/HttpProxy.cs index 5bc73548e..44dcc9963 100644 --- a/src/ReverseProxy/Service/Proxy/HttpProxy.cs +++ b/src/ReverseProxy/Service/Proxy/HttpProxy.cs @@ -122,8 +122,8 @@ public async Task ProxyAsync( // :: Step 4: Send the outgoing request using HttpClient HttpResponseMessage destinationResponse; - var requestTimeoutSource = CancellationTokenSource.CreateLinkedTokenSource(requestAborted); - requestTimeoutSource.CancelAfter(requestOptions?.Timeout ?? DefaultTimeout); + + var requestTimeoutSource = PooledCTS.Rent(requestOptions?.Timeout ?? DefaultTimeout, requestAborted); var requestTimeoutToken = requestTimeoutSource.Token; try { @@ -151,7 +151,7 @@ public async Task ProxyAsync( } finally { - requestTimeoutSource.Dispose(); + requestTimeoutSource.Return(); } // Detect connection downgrade, which may be problematic for e.g. gRPC. diff --git a/src/ReverseProxy/Utilities/PooledCTS.cs b/src/ReverseProxy/Utilities/PooledCTS.cs new file mode 100644 index 000000000..3342cdac7 --- /dev/null +++ b/src/ReverseProxy/Utilities/PooledCTS.cs @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#nullable enable + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading; + +namespace Yarp.ReverseProxy.Utilities +{ + internal sealed class PooledCTS : CancellationTokenSource + { + private static readonly ConcurrentQueue _sharedSources = new(); + + private static readonly Action _linkedTokenCancelDelegate = static s => + { + ((CancellationTokenSource)s!).Cancel(throwOnFirstException: false); + }; + + private const long SafeToReuseTicks = TimeSpan.TicksPerSecond * 10; + private static readonly double _stopwatchTicksPerTimeSpanTick = Stopwatch.Frequency / (double)TimeSpan.TicksPerSecond; + + private long _safeToReuseBeforeTimestamp; + private CancellationTokenRegistration _registration; + + public static PooledCTS Rent(TimeSpan timeout, CancellationToken linkedToken) + { + if (!_sharedSources.TryDequeue(out var cts)) + { + cts = new PooledCTS(); + } + + cts._registration = linkedToken.UnsafeRegister(_linkedTokenCancelDelegate, cts); + + cts._safeToReuseBeforeTimestamp = Stopwatch.GetTimestamp() + (long)((timeout.Ticks - SafeToReuseTicks) * _stopwatchTicksPerTimeSpanTick); + + cts.CancelAfter(timeout); + + return cts; + } + + public void Return() + { + _registration.Dispose(); + _registration = default; + + // TODO: Use TryReset in 6.0+ + CancelAfter(Timeout.Infinite); + + if (IsCancellationRequested || Stopwatch.GetTimestamp() > _safeToReuseBeforeTimestamp) + { + Dispose(); + } + else + { + _sharedSources.Enqueue(this); + } + } + } +}