Skip to content

Commit 1c0014c

Browse files
authored
Convert TestServer to pipes #11598 (#11611)
1 parent 2508dfc commit 1c0014c

8 files changed

+442
-271
lines changed

src/Hosting/TestHost/src/ClientHandler.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ protected override async Task<HttpResponseMessage> SendAsync(
6464

6565
var contextBuilder = new HttpContextBuilder(_application, AllowSynchronousIO, PreserveExecutionContext);
6666

67-
Stream responseBody = null;
6867
var requestContent = request.Content ?? new StreamContent(Stream.Null);
6968
var body = await requestContent.ReadAsStreamAsync();
7069
contextBuilder.Configure(context =>
@@ -114,8 +113,6 @@ protected override async Task<HttpResponseMessage> SendAsync(
114113
body.Seek(0, SeekOrigin.Begin);
115114
}
116115
req.Body = new AsyncStreamWrapper(body, () => contextBuilder.AllowSynchronousIO);
117-
118-
responseBody = context.Response.Body;
119116
});
120117

121118
var response = new HttpResponseMessage();
@@ -138,7 +135,7 @@ protected override async Task<HttpResponseMessage> SendAsync(
138135
response.ReasonPhrase = httpContext.Features.Get<IHttpResponseFeature>().ReasonPhrase;
139136
response.RequestMessage = request;
140137

141-
response.Content = new StreamContent(responseBody);
138+
response.Content = new StreamContent(httpContext.Response.Body);
142139

143140
foreach (var header in httpContext.Response.Headers)
144141
{

src/Hosting/TestHost/src/HttpContextBuilder.cs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.IO.Pipelines;
56
using System.Threading;
67
using System.Threading.Tasks;
78
using Microsoft.AspNetCore.Http;
@@ -16,7 +17,8 @@ internal class HttpContextBuilder : IHttpBodyControlFeature
1617
private readonly HttpContext _httpContext;
1718

1819
private readonly TaskCompletionSource<HttpContext> _responseTcs = new TaskCompletionSource<HttpContext>(TaskCreationOptions.RunContinuationsAsynchronously);
19-
private readonly ResponseStream _responseStream;
20+
private readonly ResponseBodyReaderStream _responseReaderStream;
21+
private readonly ResponseBodyPipeWriter _responsePipeWriter;
2022
private readonly ResponseFeature _responseFeature;
2123
private readonly RequestLifetimeFeature _requestLifetimeFeature = new RequestLifetimeFeature();
2224
private readonly ResponseTrailersFeature _responseTrailersFeature = new ResponseTrailersFeature();
@@ -37,14 +39,19 @@ internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronou
3739
request.Protocol = "HTTP/1.1";
3840
request.Method = HttpMethods.Get;
3941

42+
var pipe = new Pipe();
43+
_responseReaderStream = new ResponseBodyReaderStream(pipe, AbortRequest, () => _responseReadCompleteCallback?.Invoke(_httpContext));
44+
_responsePipeWriter = new ResponseBodyPipeWriter(pipe, ReturnResponseMessageAsync);
45+
_responseFeature.Body = new ResponseBodyWriterStream(_responsePipeWriter, () => AllowSynchronousIO);
46+
_responseFeature.BodySnapshot = _responseFeature.Body;
47+
_responseFeature.BodyWriter = _responsePipeWriter;
48+
4049
_httpContext.Features.Set<IHttpBodyControlFeature>(this);
4150
_httpContext.Features.Set<IHttpResponseFeature>(_responseFeature);
4251
_httpContext.Features.Set<IHttpResponseStartFeature>(_responseFeature);
4352
_httpContext.Features.Set<IHttpRequestLifetimeFeature>(_requestLifetimeFeature);
4453
_httpContext.Features.Set<IHttpResponseTrailersFeature>(_responseTrailersFeature);
45-
46-
_responseStream = new ResponseStream(ReturnResponseMessageAsync, AbortRequest, () => AllowSynchronousIO, () => _responseReadCompleteCallback?.Invoke(_httpContext));
47-
_responseFeature.Body = _responseStream;
54+
_httpContext.Features.Set<IResponseBodyPipeFeature>(_responseFeature);
4855
}
4956

5057
public bool AllowSynchronousIO { get; set; }
@@ -119,14 +126,14 @@ internal void AbortRequest()
119126
{
120127
_requestLifetimeFeature.Abort();
121128
}
122-
_responseStream.CompleteWrites();
129+
_responsePipeWriter.Complete();
123130
}
124131

125132
internal async Task CompleteResponseAsync()
126133
{
127134
_pipelineFinished = true;
128135
await ReturnResponseMessageAsync();
129-
_responseStream.CompleteWrites();
136+
_responsePipeWriter.Complete();
130137
await _responseFeature.FireOnResponseCompletedAsync();
131138
}
132139

@@ -155,14 +162,25 @@ internal async Task ReturnResponseMessageAsync()
155162
{
156163
newFeatures[pair.Key] = pair.Value;
157164
}
165+
var serverResponseFeature = _httpContext.Features.Get<IHttpResponseFeature>();
166+
// The client gets a deep copy of this so they can interact with the body stream independently of the server.
167+
var clientResponseFeature = new HttpResponseFeature()
168+
{
169+
StatusCode = serverResponseFeature.StatusCode,
170+
ReasonPhrase = serverResponseFeature.ReasonPhrase,
171+
Headers = serverResponseFeature.Headers,
172+
Body = _responseReaderStream
173+
};
174+
newFeatures.Set<IHttpResponseFeature>(clientResponseFeature);
158175
_responseTcs.TrySetResult(new DefaultHttpContext(newFeatures));
159176
}
160177
}
161178

162179
internal void Abort(Exception exception)
163180
{
164181
_pipelineFinished = true;
165-
_responseStream.Abort(exception);
182+
_responsePipeWriter.Abort(exception);
183+
_responseReaderStream.Abort(exception);
166184
_responseTcs.TrySetException(exception);
167185
}
168186
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Diagnostics.Contracts;
6+
using System.IO;
7+
using System.IO.Pipelines;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.AspNetCore.Http.Features;
11+
12+
namespace Microsoft.AspNetCore.TestHost
13+
{
14+
internal class ResponseBodyPipeWriter : PipeWriter
15+
{
16+
private readonly Func<Task> _onFirstWriteAsync;
17+
private readonly Pipe _pipe;
18+
19+
private bool _firstWrite;
20+
private bool _complete;
21+
22+
internal ResponseBodyPipeWriter(Pipe pipe, Func<Task> onFirstWriteAsync)
23+
{
24+
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
25+
_onFirstWriteAsync = onFirstWriteAsync ?? throw new ArgumentNullException(nameof(onFirstWriteAsync));
26+
_firstWrite = true;
27+
}
28+
29+
public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
30+
{
31+
cancellationToken.ThrowIfCancellationRequested();
32+
CheckNotComplete();
33+
34+
await FirstWriteAsync();
35+
return await _pipe.Writer.FlushAsync(cancellationToken);
36+
}
37+
38+
private Task FirstWriteAsync()
39+
{
40+
if (_firstWrite)
41+
{
42+
_firstWrite = false;
43+
return _onFirstWriteAsync();
44+
}
45+
return Task.CompletedTask;
46+
}
47+
48+
internal void Abort(Exception innerException)
49+
{
50+
Contract.Requires(innerException != null);
51+
_complete = true;
52+
_pipe.Writer.Complete(new IOException(string.Empty, innerException));
53+
}
54+
55+
internal void Complete()
56+
{
57+
if (_complete)
58+
{
59+
return;
60+
}
61+
62+
// Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads.
63+
_complete = true;
64+
_pipe.Writer.Complete();
65+
}
66+
67+
private void CheckNotComplete()
68+
{
69+
if (_complete)
70+
{
71+
throw new IOException("The request was aborted or the pipeline has finished.");
72+
}
73+
}
74+
75+
public override void Complete(Exception exception = null)
76+
{
77+
// No-op in the non-error case
78+
if (exception != null)
79+
{
80+
Abort(exception);
81+
}
82+
}
83+
84+
public override void CancelPendingFlush() => _pipe.Writer.CancelPendingFlush();
85+
86+
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
87+
=> _pipe.Writer.OnReaderCompleted(callback, state);
88+
89+
public override void Advance(int bytes)
90+
{
91+
CheckNotComplete();
92+
_pipe.Writer.Advance(bytes);
93+
}
94+
95+
public override Memory<byte> GetMemory(int sizeHint = 0)
96+
{
97+
CheckNotComplete();
98+
return _pipe.Writer.GetMemory(sizeHint);
99+
}
100+
101+
public override Span<byte> GetSpan(int sizeHint = 0)
102+
{
103+
CheckNotComplete();
104+
return _pipe.Writer.GetSpan(sizeHint);
105+
}
106+
}
107+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Buffers;
6+
using System.Diagnostics.Contracts;
7+
using System.IO;
8+
using System.IO.Pipelines;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
namespace Microsoft.AspNetCore.TestHost
13+
{
14+
/// <summary>
15+
/// The client's view of the response body.
16+
/// </summary>
17+
internal class ResponseBodyReaderStream : Stream
18+
{
19+
private bool _readerComplete;
20+
private bool _aborted;
21+
private Exception _abortException;
22+
23+
private readonly Action _abortRequest;
24+
private readonly Action _readComplete;
25+
private readonly Pipe _pipe;
26+
27+
internal ResponseBodyReaderStream(Pipe pipe, Action abortRequest, Action readComplete)
28+
{
29+
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
30+
_abortRequest = abortRequest ?? throw new ArgumentNullException(nameof(abortRequest));
31+
_readComplete = readComplete;
32+
}
33+
34+
public override bool CanRead => true;
35+
36+
public override bool CanSeek => false;
37+
38+
public override bool CanWrite => false;
39+
40+
#region NotSupported
41+
42+
public override long Length => throw new NotSupportedException();
43+
44+
public override long Position
45+
{
46+
get => throw new NotSupportedException();
47+
set => throw new NotSupportedException();
48+
}
49+
50+
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
51+
52+
public override void SetLength(long value) => throw new NotSupportedException();
53+
54+
public override void Flush() => throw new NotSupportedException();
55+
56+
public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException();
57+
58+
// Write with count 0 will still trigger OnFirstWrite
59+
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
60+
61+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => throw new NotSupportedException();
62+
63+
#endregion NotSupported
64+
65+
public override int Read(byte[] buffer, int offset, int count)
66+
{
67+
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
68+
}
69+
70+
public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
71+
{
72+
VerifyBuffer(buffer, offset, count);
73+
CheckAborted();
74+
75+
if (_readerComplete)
76+
{
77+
return 0;
78+
}
79+
80+
using var registration = cancellationToken.Register(Cancel);
81+
var result = await _pipe.Reader.ReadAsync(cancellationToken);
82+
83+
if (result.Buffer.IsEmpty && result.IsCompleted)
84+
{
85+
_pipe.Reader.Complete();
86+
_readComplete();
87+
_readerComplete = true;
88+
return 0;
89+
}
90+
91+
var readableBuffer = result.Buffer;
92+
var actual = Math.Min(readableBuffer.Length, count);
93+
readableBuffer = readableBuffer.Slice(0, actual);
94+
readableBuffer.CopyTo(new Span<byte>(buffer, offset, count));
95+
_pipe.Reader.AdvanceTo(readableBuffer.End);
96+
return (int)actual;
97+
}
98+
99+
private static void VerifyBuffer(byte[] buffer, int offset, int count)
100+
{
101+
if (buffer == null)
102+
{
103+
throw new ArgumentNullException("buffer");
104+
}
105+
if (offset < 0 || offset > buffer.Length)
106+
{
107+
throw new ArgumentOutOfRangeException("offset", offset, string.Empty);
108+
}
109+
if (count <= 0 || count > buffer.Length - offset)
110+
{
111+
throw new ArgumentOutOfRangeException("count", count, string.Empty);
112+
}
113+
}
114+
115+
internal void Cancel()
116+
{
117+
_aborted = true;
118+
_abortException = new OperationCanceledException();
119+
_pipe.Writer.Complete(_abortException);
120+
}
121+
122+
internal void Abort(Exception innerException)
123+
{
124+
Contract.Requires(innerException != null);
125+
_aborted = true;
126+
_abortException = innerException;
127+
}
128+
129+
private void CheckAborted()
130+
{
131+
if (_aborted)
132+
{
133+
throw new IOException(string.Empty, _abortException);
134+
}
135+
}
136+
137+
protected override void Dispose(bool disposing)
138+
{
139+
if (disposing)
140+
{
141+
_abortRequest();
142+
}
143+
base.Dispose(disposing);
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)