Skip to content

Commit be58d84

Browse files
authored
Using new TimeProvider #2084 (#2108)
1 parent 444005a commit be58d84

File tree

53 files changed

+541
-743
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+541
-743
lines changed

eng/Versions.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
</PropertyGroup>
1313
<!--Package versions-->
1414
<PropertyGroup>
15+
<MicrosoftBclTimeProviderVersion>8.0.0-preview.4.23220.7</MicrosoftBclTimeProviderVersion>
1516
<MicrosoftDotNetXUnitExtensionsPackageVersion>8.0.0-beta.23224.1</MicrosoftDotNetXUnitExtensionsPackageVersion>
1617
</PropertyGroup>
1718
</Project>

global.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
{
22
"sdk": {
3-
"version": "8.0.100-preview.3.23178.7"
3+
"version": "8.0.100-preview.4.23221.1"
44
},
55
"tools": {
6-
"dotnet": "8.0.100-preview.3.23178.7",
6+
"dotnet": "8.0.100-preview.4.23221.1",
77
"runtimes": {
88
"dotnet": [
99
"6.0.14",

samples/KubernetesIngress.Sample/Combined/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
1818
COPY ["src/Directory.Build.props", "src/"]
1919
COPY ["Directory.Build.*", "./"]
2020
COPY ["NuGet.config", ""]
21+
COPY ["eng/Versions.props", "eng/"]
2122

2223
# Build a cache layer with all of the nuget packages
2324
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Combined/Yarp.Kubernetes.IngressController.csproj

samples/KubernetesIngress.Sample/Ingress/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
1818
COPY ["src/Directory.Build.props", "src/"]
1919
COPY ["Directory.Build.*", "./"]
2020
COPY ["NuGet.config", ""]
21+
COPY ["eng/Versions.props", "eng/"]
2122

2223
# Build a cache layer with all of the nuget packages
2324
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Ingress/Yarp.Kubernetes.Ingress.csproj

samples/KubernetesIngress.Sample/Monitor/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
1818
COPY ["src/Directory.Build.props", "src/"]
1919
COPY ["Directory.Build.*", "./"]
2020
COPY ["NuGet.config", ""]
21+
COPY ["eng/Versions.props", "eng/"]
2122

2223
# Build a cache layer with all of the nuget packages
2324
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Monitor/Yarp.Kubernetes.Monitor.csproj

src/Kubernetes.Controller/Rate/Limiter.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class Limiter
4242
{
4343
private readonly object _sync = new object();
4444
private readonly Limit _limit;
45-
private readonly ISystemClock _clock;
45+
private readonly TimeProvider _timeProvider;
4646
private readonly int _burst;
4747
private double _tokens;
4848

@@ -63,12 +63,12 @@ public class Limiter
6363
/// </summary>
6464
/// <param name="limit">The count per second which is allowed.</param>
6565
/// <param name="burst">The burst.</param>
66-
/// <param name="systemClock">Accessor for the current UTC time.</param>
67-
public Limiter(Limit limit, int burst, ISystemClock systemClock = default)
66+
/// <param name="timeProvider">Accessor for the current UTC time.</param>
67+
public Limiter(Limit limit, int burst, TimeProvider timeProvider = default)
6868
{
6969
_limit = limit;
7070
_burst = burst;
71-
_clock = systemClock ?? new SystemClock();
71+
_timeProvider = timeProvider ?? TimeProvider.System;
7272
}
7373

7474
/// <summary>
@@ -77,7 +77,7 @@ public Limiter(Limit limit, int burst, ISystemClock systemClock = default)
7777
/// <returns><c>true</c> if a token is available and used, <c>false</c> otherwise.</returns>
7878
public bool Allow()
7979
{
80-
return AllowN(_clock.UtcNow, 1);
80+
return AllowN(_timeProvider.GetUtcNow(), 1);
8181
}
8282

8383
/// <summary>
@@ -98,7 +98,7 @@ public bool AllowN(DateTimeOffset now, int number)
9898
/// <returns>Reservation.</returns>
9999
public Reservation Reserve()
100100
{
101-
return Reserve(_clock.UtcNow, 1);
101+
return Reserve(_timeProvider.GetUtcNow(), 1);
102102
}
103103

104104
/// <summary>
@@ -165,7 +165,7 @@ public async Task WaitAsync(int count, CancellationToken cancellationToken)
165165

166166
while (true)
167167
{
168-
var now = _clock.UtcNow;
168+
var now = _timeProvider.GetUtcNow();
169169
var r = ReserveImpl(now, count, waitLimit);
170170
if (r.Ok)
171171
{
@@ -198,7 +198,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
198198
if (_limit == Limit.Max)
199199
{
200200
return new Reservation(
201-
clock: _clock,
201+
timeProvider: _timeProvider,
202202
limiter: this,
203203
ok: true,
204204
tokens: number,
@@ -225,7 +225,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
225225
if (ok)
226226
{
227227
var reservation = new Reservation(
228-
clock: _clock,
228+
timeProvider: _timeProvider,
229229
limiter: this,
230230
ok: true,
231231
tokens: number,
@@ -241,7 +241,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
241241
else
242242
{
243243
var reservation = new Reservation(
244-
clock: _clock,
244+
timeProvider: _timeProvider,
245245
limiter: this,
246246
ok: false,
247247
limit: _limit);

src/Kubernetes.Controller/Rate/Reservation.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,29 @@ namespace Yarp.Kubernetes.Controller.Rate;
1515
/// </summary>
1616
public class Reservation
1717
{
18-
private readonly ISystemClock _clock;
18+
private readonly TimeProvider _timeProvider;
1919
private readonly Limiter _limiter;
2020
private readonly Limit _limit;
2121
private readonly double _tokens;
2222

2323
/// <summary>
2424
/// Initializes a new instance of the <see cref="Reservation"/> class.
2525
/// </summary>
26-
/// <param name="clock">A system clock.</param>
26+
/// <param name="timeProvider">Gets the system time.</param>
2727
/// <param name="limiter">The limiter.</param>
2828
/// <param name="ok">if set to <c>true</c> [ok].</param>
2929
/// <param name="tokens">The tokens.</param>
3030
/// <param name="timeToAct">The time to act.</param>
3131
/// <param name="limit">The limit.</param>
3232
public Reservation(
33-
ISystemClock clock,
33+
TimeProvider timeProvider,
3434
Limiter limiter,
3535
bool ok,
3636
double tokens = default,
3737
DateTimeOffset timeToAct = default,
3838
Limit limit = default)
3939
{
40-
_clock = clock;
40+
_timeProvider = timeProvider;
4141
_limiter = limiter;
4242
Ok = ok;
4343
_tokens = tokens;
@@ -63,7 +63,7 @@ public Reservation(
6363
/// <returns>TimeSpanOffset.</returns>
6464
public TimeSpan Delay()
6565
{
66-
return DelayFrom(_clock.UtcNow);
66+
return DelayFrom(_timeProvider.GetUtcNow());
6767
}
6868

6969
/// <summary>

src/ReverseProxy/Forwarder/HttpForwarder.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ internal sealed class HttpForwarder : IHttpForwarder
3030
private static readonly Version DefaultVersion = HttpVersion.Version20;
3131
private static readonly HttpVersionPolicy DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrLower;
3232
private readonly ILogger _logger;
33-
private readonly IClock _clock;
33+
private readonly TimeProvider _timeProvider;
3434

35-
public HttpForwarder(ILogger<HttpForwarder> logger, IClock clock)
35+
public HttpForwarder(ILogger<HttpForwarder> logger, TimeProvider timeProvider)
3636
{
3737
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
38-
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
38+
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
3939
}
4040

4141
/// <summary>
@@ -577,7 +577,7 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage
577577
return new StreamCopyHttpContent(
578578
request: request,
579579
autoFlushHttpClientOutgoingStream: isStreamingRequest,
580-
clock: _clock,
580+
timeProvider: _timeProvider,
581581
activityToken);
582582
}
583583

@@ -737,10 +737,10 @@ private async ValueTask<ForwarderError> HandleUpgradedResponse(HttpContext conte
737737
// :: Step 7-A-2: Copy duplex streams
738738
using var destinationStream = await destinationResponse.Content.ReadAsStreamAsync(activityCancellationSource.Token);
739739

740-
var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, StreamCopier.UnknownLength, _clock, activityCancellationSource,
740+
var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource,
741741
// HTTP/2 HttpClient request streams buffer by default.
742742
autoFlush: destinationResponse.Version == HttpVersion.Version20, activityCancellationSource.Token).AsTask();
743-
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, StreamCopier.UnknownLength, _clock, activityCancellationSource, activityCancellationSource.Token).AsTask();
743+
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource, activityCancellationSource.Token).AsTask();
744744

745745
// Make sure we report the first failure.
746746
var firstTask = await Task.WhenAny(requestTask, responseTask);
@@ -864,7 +864,7 @@ private ForwarderError FixupUpgradeResponseHeaders(HttpContext context, HttpResp
864864
{
865865
using var destinationResponseStream = await destinationResponseContent.ReadAsStreamAsync(activityCancellationSource.Token);
866866
// The response content-length is enforced by the server.
867-
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, StreamCopier.UnknownLength, _clock, activityCancellationSource, activityCancellationSource.Token);
867+
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource, activityCancellationSource.Token);
868868
}
869869

870870
return (StreamCopyResult.Success, null);

src/ReverseProxy/Forwarder/StreamCopier.cs

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,19 @@ internal static class StreamCopier
2121
private const int DefaultBufferSize = 65536;
2222
public const long UnknownLength = -1;
2323

24-
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
25-
=> CopyAsync(isRequest, input, output, promisedContentLength, clock, activityToken, autoFlush: false, cancellation);
24+
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
25+
=> CopyAsync(isRequest, input, output, promisedContentLength, timeProvider, activityToken, autoFlush: false, cancellation);
2626

27-
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
27+
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
2828
{
2929
Debug.Assert(input is not null);
3030
Debug.Assert(output is not null);
31-
Debug.Assert(clock is not null);
31+
Debug.Assert(timeProvider is not null);
3232
Debug.Assert(activityToken is not null);
3333

34-
// Avoid capturing 'isRequest' and 'clock' in the state machine when telemetry is disabled
34+
// Avoid capturing 'isRequest' and 'timeProvider' in the state machine when telemetry is disabled
3535
var telemetry = ForwarderTelemetry.Log.IsEnabled(EventLevel.Informational, EventKeywords.All)
36-
? new StreamCopierTelemetry(isRequest, clock)
36+
? new StreamCopierTelemetry(isRequest, timeProvider)
3737
: null;
3838

3939
return CopyAsync(input, output, promisedContentLength, telemetry, activityToken, autoFlush, cancellation);
@@ -142,48 +142,46 @@ internal static class StreamCopier
142142

143143
private sealed class StreamCopierTelemetry
144144
{
145-
private static readonly TimeSpan _timeBetweenTransferringEvents = TimeSpan.FromSeconds(1);
146-
147145
private readonly bool _isRequest;
148-
private readonly IClock _clock;
146+
private readonly TimeProvider _timeProvider;
149147
private long _contentLength;
150148
private long _iops;
151-
private TimeSpan _readTime;
152-
private TimeSpan _writeTime;
153-
private TimeSpan _firstReadTime;
154-
private TimeSpan _lastTime;
155-
private TimeSpan _nextTransferringEvent;
149+
private long _readTime;
150+
private long _writeTime;
151+
private long _firstReadTime;
152+
private long _lastTime;
153+
private long _nextTransferringEvent;
156154

157-
public StreamCopierTelemetry(bool isRequest, IClock clock)
155+
public StreamCopierTelemetry(bool isRequest, TimeProvider timeProvider)
158156
{
159157
_isRequest = isRequest;
160-
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
161-
_firstReadTime = new TimeSpan(-1);
158+
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
159+
_firstReadTime = -1;
162160

163161
ForwarderTelemetry.Log.ForwarderStage(isRequest ? ForwarderStage.RequestContentTransferStart : ForwarderStage.ResponseContentTransferStart);
164162

165-
_lastTime = clock.GetStopwatchTime();
166-
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
163+
_lastTime = timeProvider.GetTimestamp();
164+
_nextTransferringEvent = _lastTime + _timeProvider.TimestampFrequency;
167165
}
168166

169167
public void AfterRead(long contentLength)
170168
{
171169
_contentLength = contentLength;
172170
_iops++;
173171

174-
var readStop = _clock.GetStopwatchTime();
172+
var readStop = _timeProvider.GetTimestamp();
175173
var currentReadTime = readStop - _lastTime;
176174
_lastTime = readStop;
177175
_readTime += currentReadTime;
178-
if (_firstReadTime.Ticks < 0)
176+
if (_firstReadTime < 0)
179177
{
180178
_firstReadTime = currentReadTime;
181179
}
182180
}
183181

184182
public void AfterWrite()
185183
{
186-
var writeStop = _clock.GetStopwatchTime();
184+
var writeStop = _timeProvider.GetTimestamp();
187185
_writeTime += writeStop - _lastTime;
188186
_lastTime = writeStop;
189187

@@ -193,12 +191,12 @@ public void AfterWrite()
193191
_isRequest,
194192
_contentLength,
195193
_iops,
196-
_readTime.Ticks,
197-
_writeTime.Ticks);
194+
_timeProvider.GetElapsedTime(0, _readTime).Ticks,
195+
_timeProvider.GetElapsedTime(0, _writeTime).Ticks);
198196

199197
// Avoid attributing the time taken by logging ContentTransferring to the next read call
200-
_lastTime = _clock.GetStopwatchTime();
201-
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
198+
_lastTime = _timeProvider.GetTimestamp();
199+
_nextTransferringEvent = _lastTime + _timeProvider.TimestampFrequency;
202200
}
203201
}
204202

@@ -208,9 +206,9 @@ public void Stop()
208206
_isRequest,
209207
_contentLength,
210208
_iops,
211-
_readTime.Ticks,
212-
_writeTime.Ticks,
213-
Math.Max(0, _firstReadTime.Ticks));
209+
_timeProvider.GetElapsedTime(0, _readTime).Ticks,
210+
_timeProvider.GetElapsedTime(0, _writeTime).Ticks,
211+
_timeProvider.GetElapsedTime(0, Math.Max(0, _firstReadTime)).Ticks);
214212
}
215213
}
216214
}

src/ReverseProxy/Forwarder/StreamCopyHttpContent.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,16 @@ internal sealed class StreamCopyHttpContent : HttpContent
4343
// HttpClient's machinery keeps an internal buffer that doesn't get flushed to the socket on every write.
4444
// Some protocols (e.g. gRPC) may rely on specific bytes being sent, and HttpClient's buffering would prevent it.
4545
private readonly bool _autoFlushHttpClientOutgoingStream;
46-
private readonly IClock _clock;
46+
private readonly TimeProvider _timeProvider;
4747
private readonly ActivityCancellationTokenSource _activityToken;
4848
private readonly TaskCompletionSource<(StreamCopyResult, Exception?)> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
4949
private int _started;
5050

51-
public StreamCopyHttpContent(HttpRequest request, bool autoFlushHttpClientOutgoingStream, IClock clock, ActivityCancellationTokenSource activityToken)
51+
public StreamCopyHttpContent(HttpRequest request, bool autoFlushHttpClientOutgoingStream, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken)
5252
{
5353
_request = request ?? throw new ArgumentNullException(nameof(request));
5454
_autoFlushHttpClientOutgoingStream = autoFlushHttpClientOutgoingStream;
55-
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
55+
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
5656

5757
_activityToken = activityToken;
5858
}
@@ -164,7 +164,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
164164
}
165165

166166
// Check that the content-length matches the request body size. This can be removed in .NET 7 now that SocketsHttpHandler enforces this: https://github.com/dotnet/runtime/issues/62258.
167-
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _clock, _activityToken, _autoFlushHttpClientOutgoingStream, cancellationToken);
167+
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _timeProvider, _activityToken, _autoFlushHttpClientOutgoingStream, cancellationToken);
168168
_tcs.TrySetResult((result, error));
169169

170170
// Check for errors that weren't the result of the destination failing.

0 commit comments

Comments
 (0)