Skip to content

Commit 15a73e6

Browse files
committed
Convert TestServer to pipes. #11598
1 parent 15fb5b9 commit 15a73e6

7 files changed

+448
-270
lines changed

src/Hosting/TestHost/src/ClientHandler.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ protected override async Task<HttpResponseMessage> SendAsync(
6464

6565
var contextBuilder = new HttpContextBuilder(_application, AllowSynchronousIO, PreserveExecutionContext);
6666

67-
Stream responseBody = null;
6867
var requestContent = request.Content ?? new StreamContent(Stream.Null);
6968
var body = await requestContent.ReadAsStreamAsync();
7069
contextBuilder.Configure(context =>
@@ -114,8 +113,6 @@ protected override async Task<HttpResponseMessage> SendAsync(
114113
body.Seek(0, SeekOrigin.Begin);
115114
}
116115
req.Body = new AsyncStreamWrapper(body, () => contextBuilder.AllowSynchronousIO);
117-
118-
responseBody = context.Response.Body;
119116
});
120117

121118
var response = new HttpResponseMessage();
@@ -138,7 +135,7 @@ protected override async Task<HttpResponseMessage> SendAsync(
138135
response.ReasonPhrase = httpContext.Features.Get<IHttpResponseFeature>().ReasonPhrase;
139136
response.RequestMessage = request;
140137

141-
response.Content = new StreamContent(responseBody);
138+
response.Content = new StreamContent(httpContext.Response.Body);
142139

143140
foreach (var header in httpContext.Response.Headers)
144141
{

src/Hosting/TestHost/src/HttpContextBuilder.cs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.IO.Pipelines;
56
using System.Threading;
67
using System.Threading.Tasks;
78
using Microsoft.AspNetCore.Http;
@@ -16,7 +17,8 @@ internal class HttpContextBuilder : IHttpBodyControlFeature
1617
private readonly HttpContext _httpContext;
1718

1819
private readonly TaskCompletionSource<HttpContext> _responseTcs = new TaskCompletionSource<HttpContext>(TaskCreationOptions.RunContinuationsAsynchronously);
19-
private readonly ResponseStream _responseStream;
20+
private readonly ResponseBodyReaderStream _responseReaderStream;
21+
private readonly ResponseBodyPipeWriter _responsePipeWriter;
2022
private readonly ResponseFeature _responseFeature;
2123
private readonly RequestLifetimeFeature _requestLifetimeFeature = new RequestLifetimeFeature();
2224
private readonly ResponseTrailersFeature _responseTrailersFeature = new ResponseTrailersFeature();
@@ -37,14 +39,17 @@ internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronou
3739
request.Protocol = "HTTP/1.1";
3840
request.Method = HttpMethods.Get;
3941

42+
var pipe = new Pipe();
43+
_responseReaderStream = new ResponseBodyReaderStream(pipe, AbortRequest, () => _responseReadCompleteCallback?.Invoke(_httpContext));
44+
_responsePipeWriter = new ResponseBodyPipeWriter(pipe, ReturnResponseMessageAsync);
45+
_responseFeature.Body = new ResponseBodyWriterStream(_responsePipeWriter, () => AllowSynchronousIO);
46+
4047
_httpContext.Features.Set<IHttpBodyControlFeature>(this);
4148
_httpContext.Features.Set<IHttpResponseFeature>(_responseFeature);
4249
_httpContext.Features.Set<IHttpResponseStartFeature>(_responseFeature);
4350
_httpContext.Features.Set<IHttpRequestLifetimeFeature>(_requestLifetimeFeature);
4451
_httpContext.Features.Set<IHttpResponseTrailersFeature>(_responseTrailersFeature);
45-
46-
_responseStream = new ResponseStream(ReturnResponseMessageAsync, AbortRequest, () => AllowSynchronousIO, () => _responseReadCompleteCallback?.Invoke(_httpContext));
47-
_responseFeature.Body = _responseStream;
52+
_httpContext.Features.Set<IResponseBodyPipeFeature>(_responsePipeWriter);
4853
}
4954

5055
public bool AllowSynchronousIO { get; set; }
@@ -119,14 +124,14 @@ internal void AbortRequest()
119124
{
120125
_requestLifetimeFeature.Abort();
121126
}
122-
_responseStream.CompleteWrites();
127+
_responsePipeWriter.CompleteWrites();
123128
}
124129

125130
internal async Task CompleteResponseAsync()
126131
{
127132
_pipelineFinished = true;
128133
await ReturnResponseMessageAsync();
129-
_responseStream.CompleteWrites();
134+
_responsePipeWriter.CompleteWrites();
130135
await _responseFeature.FireOnResponseCompletedAsync();
131136
}
132137

@@ -155,14 +160,25 @@ internal async Task ReturnResponseMessageAsync()
155160
{
156161
newFeatures[pair.Key] = pair.Value;
157162
}
163+
var serverResponseFeature = _httpContext.Features.Get<IHttpResponseFeature>();
164+
// The client gets a deep copy of this so they can interact with the body stream independently of the server.
165+
var clientResponseFeature = new HttpResponseFeature()
166+
{
167+
StatusCode = serverResponseFeature.StatusCode,
168+
ReasonPhrase = serverResponseFeature.ReasonPhrase,
169+
Headers = serverResponseFeature.Headers,
170+
Body = _responseReaderStream
171+
};
172+
newFeatures.Set<IHttpResponseFeature>(clientResponseFeature);
158173
_responseTcs.TrySetResult(new DefaultHttpContext(newFeatures));
159174
}
160175
}
161176

162177
internal void Abort(Exception exception)
163178
{
164179
_pipelineFinished = true;
165-
_responseStream.Abort(exception);
180+
_responsePipeWriter.Abort(exception);
181+
_responseReaderStream.Abort(exception);
166182
_responseTcs.TrySetException(exception);
167183
}
168184
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Diagnostics.Contracts;
6+
using System.IO;
7+
using System.IO.Pipelines;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.AspNetCore.Http.Features;
11+
12+
namespace Microsoft.AspNetCore.TestHost
13+
{
14+
internal class ResponseBodyPipeWriter : PipeWriter, IResponseBodyPipeFeature
15+
{
16+
private readonly Func<Task> _onFirstWriteAsync;
17+
private readonly Pipe _pipe;
18+
19+
private bool _firstWrite;
20+
private bool _complete;
21+
22+
PipeWriter IResponseBodyPipeFeature.Writer => this;
23+
24+
internal ResponseBodyPipeWriter(Pipe pipe, Func<Task> onFirstWriteAsync)
25+
{
26+
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
27+
_onFirstWriteAsync = onFirstWriteAsync ?? throw new ArgumentNullException(nameof(onFirstWriteAsync));
28+
_firstWrite = true;
29+
}
30+
31+
public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
32+
{
33+
cancellationToken.ThrowIfCancellationRequested();
34+
CheckNotComplete();
35+
36+
await FirstWriteAsync();
37+
return await _pipe.Writer.FlushAsync(cancellationToken);
38+
}
39+
40+
private Task FirstWriteAsync()
41+
{
42+
if (_firstWrite)
43+
{
44+
_firstWrite = false;
45+
return _onFirstWriteAsync();
46+
}
47+
return Task.CompletedTask;
48+
}
49+
50+
internal void Abort(Exception innerException)
51+
{
52+
Contract.Requires(innerException != null);
53+
_complete = true;
54+
_pipe.Writer.Complete(new IOException(string.Empty, innerException));
55+
}
56+
57+
internal void CompleteWrites()
58+
{
59+
if (_complete)
60+
{
61+
return;
62+
}
63+
64+
// Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads.
65+
_complete = true;
66+
_pipe.Writer.Complete();
67+
}
68+
69+
private void CheckNotComplete()
70+
{
71+
if (_complete)
72+
{
73+
throw new IOException("The request was aborted or the pipeline has finished");
74+
}
75+
}
76+
77+
public override void Complete(Exception exception = null)
78+
{
79+
// No-op in the non-error case
80+
if (exception != null)
81+
{
82+
Abort(exception);
83+
}
84+
}
85+
86+
public override void CancelPendingFlush() => _pipe.Writer.CancelPendingFlush();
87+
88+
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
89+
=> _pipe.Writer.OnReaderCompleted(callback, state);
90+
91+
public override void Advance(int bytes)
92+
{
93+
CheckNotComplete();
94+
_pipe.Writer.Advance(bytes);
95+
}
96+
97+
public override Memory<byte> GetMemory(int sizeHint = 0)
98+
{
99+
CheckNotComplete();
100+
return _pipe.Writer.GetMemory(sizeHint);
101+
}
102+
103+
public override Span<byte> GetSpan(int sizeHint = 0)
104+
{
105+
CheckNotComplete();
106+
return _pipe.Writer.GetSpan(sizeHint);
107+
}
108+
}
109+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Buffers;
6+
using System.Diagnostics;
7+
using System.Diagnostics.Contracts;
8+
using System.IO;
9+
using System.IO.Pipelines;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
13+
namespace Microsoft.AspNetCore.TestHost
14+
{
15+
/// <summary>
16+
/// The client's view of the response body.
17+
/// </summary>
18+
internal class ResponseBodyReaderStream : Stream
19+
{
20+
private bool _readerComplete;
21+
private bool _aborted;
22+
private Exception _abortException;
23+
24+
private readonly Action _abortRequest;
25+
private readonly Action _readComplete;
26+
private readonly Pipe _pipe;
27+
28+
internal ResponseBodyReaderStream(Pipe pipe, Action abortRequest, Action readComplete)
29+
{
30+
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
31+
_abortRequest = abortRequest ?? throw new ArgumentNullException(nameof(abortRequest));
32+
_readComplete = readComplete;
33+
}
34+
35+
public override bool CanRead
36+
{
37+
get { return true; }
38+
}
39+
40+
public override bool CanSeek
41+
{
42+
get { return false; }
43+
}
44+
45+
public override bool CanWrite
46+
{
47+
get { return false; }
48+
}
49+
50+
#region NotSupported
51+
52+
public override long Length
53+
{
54+
get { throw new NotSupportedException(); }
55+
}
56+
57+
public override long Position
58+
{
59+
get { throw new NotSupportedException(); }
60+
set { throw new NotSupportedException(); }
61+
}
62+
63+
public override long Seek(long offset, SeekOrigin origin)
64+
{
65+
throw new NotSupportedException();
66+
}
67+
68+
public override void SetLength(long value)
69+
{
70+
throw new NotSupportedException();
71+
}
72+
73+
public override void Flush()
74+
{
75+
throw new NotSupportedException();
76+
}
77+
78+
public override Task FlushAsync(CancellationToken cancellationToken)
79+
{
80+
throw new NotSupportedException();
81+
}
82+
83+
// Write with count 0 will still trigger OnFirstWrite
84+
public override void Write(byte[] buffer, int offset, int count)
85+
{
86+
throw new NotSupportedException();
87+
}
88+
89+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
90+
{
91+
throw new NotSupportedException();
92+
}
93+
94+
#endregion NotSupported
95+
96+
public override int Read(byte[] buffer, int offset, int count)
97+
{
98+
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
99+
}
100+
101+
public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
102+
{
103+
VerifyBuffer(buffer, offset, count);
104+
CheckAborted();
105+
106+
if (_readerComplete)
107+
{
108+
return 0;
109+
}
110+
111+
using var registration = cancellationToken.Register(Cancel);
112+
var result = await _pipe.Reader.ReadAsync(cancellationToken);
113+
114+
if (result.Buffer.IsEmpty && result.IsCompleted)
115+
{
116+
_pipe.Reader.Complete();
117+
_readComplete();
118+
_readerComplete = true;
119+
return 0;
120+
}
121+
122+
var readableBuffer = result.Buffer;
123+
var actual = Math.Min(readableBuffer.Length, count);
124+
readableBuffer = readableBuffer.Slice(0, actual);
125+
readableBuffer.CopyTo(new Span<byte>(buffer, offset, count));
126+
_pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
127+
return (int)actual;
128+
}
129+
130+
private static void VerifyBuffer(byte[] buffer, int offset, int count)
131+
{
132+
if (buffer == null)
133+
{
134+
throw new ArgumentNullException("buffer");
135+
}
136+
if (offset < 0 || offset > buffer.Length)
137+
{
138+
throw new ArgumentOutOfRangeException("offset", offset, string.Empty);
139+
}
140+
if (count <= 0 || count > buffer.Length - offset)
141+
{
142+
throw new ArgumentOutOfRangeException("count", count, string.Empty);
143+
}
144+
}
145+
146+
internal void Cancel()
147+
{
148+
_aborted = true;
149+
_abortException = new OperationCanceledException();
150+
_pipe.Writer.Complete(_abortException);
151+
}
152+
153+
internal void Abort(Exception innerException)
154+
{
155+
Contract.Requires(innerException != null);
156+
_aborted = true;
157+
_abortException = innerException;
158+
}
159+
160+
private void CheckAborted()
161+
{
162+
if (_aborted)
163+
{
164+
throw new IOException(string.Empty, _abortException);
165+
}
166+
}
167+
168+
protected override void Dispose(bool disposing)
169+
{
170+
if (disposing)
171+
{
172+
_abortRequest();
173+
}
174+
base.Dispose(disposing);
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)