|
2 | 2 | // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
3 | 3 |
|
4 | 4 | using System;
|
5 |
| -using System.Collections.Generic; |
6 | 5 | using System.IO;
|
7 |
| -using System.Linq; |
| 6 | +using System.IO.Pipelines; |
8 | 7 | using System.Runtime.InteropServices;
|
9 | 8 | using System.Text;
|
10 | 9 | using System.Threading;
|
|
13 | 12 | using Microsoft.AspNetCore.Http.Features;
|
14 | 13 | using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
15 | 14 | using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
| 15 | +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; |
16 | 16 | using Microsoft.AspNetCore.Testing;
|
17 | 17 | using Moq;
|
18 | 18 | using Xunit;
|
@@ -401,81 +401,77 @@ public async Task ConsumeAsyncConsumesAllRemainingInput()
|
401 | 401 | }
|
402 | 402 | }
|
403 | 403 |
|
404 |
| - public static IEnumerable<object[]> StreamData => new[] |
405 |
| - { |
406 |
| - new object[] { new ThrowOnWriteSynchronousStream() }, |
407 |
| - new object[] { new ThrowOnWriteAsynchronousStream() }, |
408 |
| - }; |
409 |
| - |
410 |
| - public static IEnumerable<object[]> RequestData => new[] |
411 |
| - { |
412 |
| - // Content-Length |
413 |
| - new object[] { new HttpRequestHeaders { HeaderContentLength = "12" }, new[] { "Hello ", "World!" } }, |
414 |
| - // Chunked |
415 |
| - new object[] { new HttpRequestHeaders { HeaderTransferEncoding = "chunked" }, new[] { "6\r\nHello \r\n", "6\r\nWorld!\r\n0\r\n\r\n" } }, |
416 |
| - }; |
417 |
| - |
418 |
| - public static IEnumerable<object[]> CombinedData => |
419 |
| - from stream in StreamData |
420 |
| - from request in RequestData |
421 |
| - select new[] { stream[0], request[0], request[1] }; |
422 |
| - |
423 |
| - [Theory] |
424 |
| - [MemberData(nameof(RequestData))] |
425 |
| - public async Task CopyToAsyncDoesNotCopyBlocks(HttpRequestHeaders headers, string[] data) |
| 404 | + [Fact] |
| 405 | + public async Task CopyToAsyncDoesNotCopyBlocks() |
426 | 406 | {
|
427 | 407 | var writeCount = 0;
|
428 |
| - var writeTcs = new TaskCompletionSource<byte[]>(); |
| 408 | + var writeTcs = new TaskCompletionSource<(byte[], int, int)>(); |
429 | 409 | var mockDestination = new Mock<Stream>() { CallBase = true };
|
430 | 410 |
|
431 | 411 | mockDestination
|
432 | 412 | .Setup(m => m.WriteAsync(It.IsAny<byte[]>(), It.IsAny<int>(), It.IsAny<int>(), CancellationToken.None))
|
433 | 413 | .Callback((byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
|
434 | 414 | {
|
435 |
| - writeTcs.SetResult(buffer); |
| 415 | + writeTcs.SetResult((buffer, offset, count)); |
436 | 416 | writeCount++;
|
437 | 417 | })
|
438 | 418 | .Returns(Task.CompletedTask);
|
439 | 419 |
|
440 |
| - using (var input = new TestInput()) |
| 420 | + using (var memoryPool = KestrelMemoryPool.Create()) |
441 | 421 | {
|
442 |
| - var body = Http1MessageBody.For(HttpVersion.Http11, headers, input.Http1Connection); |
| 422 | + var options = new PipeOptions(pool: memoryPool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false); |
| 423 | + var pair = DuplexPipe.CreateConnectionPair(options, options); |
| 424 | + var transport = pair.Transport; |
| 425 | + var application = pair.Application; |
| 426 | + var http1ConnectionContext = new Http1ConnectionContext |
| 427 | + { |
| 428 | + ServiceContext = new TestServiceContext(), |
| 429 | + ConnectionFeatures = new FeatureCollection(), |
| 430 | + Application = application, |
| 431 | + Transport = transport, |
| 432 | + MemoryPool = memoryPool, |
| 433 | + TimeoutControl = Mock.Of<ITimeoutControl>() |
| 434 | + }; |
| 435 | + var http1Connection = new Http1Connection(http1ConnectionContext) |
| 436 | + { |
| 437 | + HasStartedConsumingRequestBody = true |
| 438 | + }; |
| 439 | + |
| 440 | + var headers = new HttpRequestHeaders { HeaderContentLength = "12" }; |
| 441 | + var body = Http1MessageBody.For(HttpVersion.Http11, headers, http1Connection); |
443 | 442 |
|
444 | 443 | var copyToAsyncTask = body.CopyToAsync(mockDestination.Object);
|
445 | 444 |
|
446 |
| - // The block returned by IncomingStart always has at least 2048 available bytes, |
447 |
| - // so no need to bounds check in this test. |
448 |
| - var bytes = Encoding.ASCII.GetBytes(data[0]); |
449 |
| - var buffer = input.Application.Output.GetMemory(2028); |
450 |
| - ArraySegment<byte> block; |
451 |
| - Assert.True(MemoryMarshal.TryGetArray(buffer, out block)); |
452 |
| - Buffer.BlockCopy(bytes, 0, block.Array, block.Offset, bytes.Length); |
453 |
| - input.Application.Output.Advance(bytes.Length); |
454 |
| - await input.Application.Output.FlushAsync(); |
455 |
| - |
456 |
| - // Verify the block passed to WriteAsync is the same one incoming data was written into. |
457 |
| - Assert.Same(block.Array, await writeTcs.Task); |
458 |
| - |
459 |
| - writeTcs = new TaskCompletionSource<byte[]>(); |
460 |
| - bytes = Encoding.ASCII.GetBytes(data[1]); |
461 |
| - buffer = input.Application.Output.GetMemory(2048); |
462 |
| - Assert.True(MemoryMarshal.TryGetArray(buffer, out block)); |
463 |
| - Buffer.BlockCopy(bytes, 0, block.Array, block.Offset, bytes.Length); |
464 |
| - input.Application.Output.Advance(bytes.Length); |
465 |
| - await input.Application.Output.FlushAsync(); |
466 |
| - |
467 |
| - Assert.Same(block.Array, await writeTcs.Task); |
468 |
| - |
469 |
| - if (headers.HeaderConnection == "close") |
470 |
| - { |
471 |
| - input.Application.Output.Complete(); |
472 |
| - } |
| 445 | + var bytes = Encoding.ASCII.GetBytes("Hello "); |
| 446 | + var buffer = http1Connection.RequestBodyPipe.Writer.GetMemory(2048); |
| 447 | + ArraySegment<byte> segment; |
| 448 | + Assert.True(MemoryMarshal.TryGetArray(buffer, out segment)); |
| 449 | + Buffer.BlockCopy(bytes, 0, segment.Array, segment.Offset, bytes.Length); |
| 450 | + http1Connection.RequestBodyPipe.Writer.Advance(bytes.Length); |
| 451 | + await http1Connection.RequestBodyPipe.Writer.FlushAsync(); |
| 452 | + |
| 453 | + // Verify the block passed to Stream.WriteAsync() is the same one incoming data was written into. |
| 454 | + Assert.Equal((segment.Array, segment.Offset, bytes.Length), await writeTcs.Task); |
| 455 | + |
| 456 | + // Verify the again when GetMemory returns the tail space of the same block. |
| 457 | + writeTcs = new TaskCompletionSource<(byte[], int, int)>(); |
| 458 | + bytes = Encoding.ASCII.GetBytes("World!"); |
| 459 | + buffer = http1Connection.RequestBodyPipe.Writer.GetMemory(2048); |
| 460 | + Assert.True(MemoryMarshal.TryGetArray(buffer, out segment)); |
| 461 | + Buffer.BlockCopy(bytes, 0, segment.Array, segment.Offset, bytes.Length); |
| 462 | + http1Connection.RequestBodyPipe.Writer.Advance(bytes.Length); |
| 463 | + await http1Connection.RequestBodyPipe.Writer.FlushAsync(); |
| 464 | + |
| 465 | + Assert.Equal((segment.Array, segment.Offset, bytes.Length), await writeTcs.Task); |
| 466 | + |
| 467 | + http1Connection.RequestBodyPipe.Writer.Complete(); |
473 | 468 |
|
474 | 469 | await copyToAsyncTask;
|
475 | 470 |
|
476 | 471 | Assert.Equal(2, writeCount);
|
477 | 472 |
|
478 |
| - await body.StopAsync(); |
| 473 | + // Don't call body.StopAsync() because PumpAsync() was never called. |
| 474 | + http1Connection.RequestBodyPipe.Reader.Complete(); |
479 | 475 | }
|
480 | 476 | }
|
481 | 477 |
|
|
0 commit comments