diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs index 872ce2da1660..f3c56b4c8e07 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs @@ -228,13 +228,14 @@ private void Shutdown(Exception? shutdownReason) try { - // Try to gracefully close the socket even for aborts to match libuv behavior. + // Try to gracefully disconnect the pipe even for aborts to match other transport behavior. _stream.Disconnect(); _streamDisconnected = true; } catch { - // Ignore any errors from NamedPipeStream.Disconnect() since we're tearing down the connection anyway. + // Ignore any errors from NamedPipeServerStream.Disconnect() since we're tearing down the connection anyway. + // _streamDisconnected is not set to true so the stream won't be pooled for reuse. } } } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index c908eebfe816..2968dcd50558 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -35,15 +35,17 @@ public NamedPipeConnectionListener( NamedPipeEndPoint endpoint, NamedPipeTransportOptions options, ILoggerFactory loggerFactory, + ObjectPoolProvider objectPoolProvider, Mutex mutex) { _log = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes"); _endpoint = endpoint; _options = options; - _namedPipeServerStreamPool = new DefaultObjectPoolProvider().Create(new NamedPipeServerStreamPoolPolicy(this)); _mutex = mutex; _memoryPool = options.MemoryPoolFactory(); _listeningToken = _listeningTokenSource.Token; + // Have to create the pool here (instead of DI) because the pool is specific to an endpoint. + _namedPipeServerStreamPool = objectPoolProvider.Create(new NamedPipeServerStreamPoolPolicy(endpoint, options)); // The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection. // We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream @@ -57,10 +59,12 @@ public NamedPipeConnectionListener( _outputOptions = new PipeOptions(_memoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false); } - internal void ReturnStream(NamedPipeServerStream namedPipeServerStream) + internal void ReturnStream(NamedPipeServerStream stream) { + Debug.Assert(!stream.IsConnected, "Stream should have been successfully disconnected to reach this point."); + // The stream is automatically disposed if there isn't space in the pool. - _namedPipeServerStreamPool.Return(namedPipeServerStream); + _namedPipeServerStreamPool.Return(stream); } public void Start() @@ -174,43 +178,45 @@ public async ValueTask DisposeAsync() // Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed. // Important to dispose because this empties and disposes streams in the pool. - ((IDisposable)_namedPipeServerStreamPool).Dispose(); + (_namedPipeServerStreamPool as IDisposable)?.Dispose(); } private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy { - public NamedPipeConnectionListener _listener; + private readonly NamedPipeEndPoint _endpoint; + private readonly NamedPipeTransportOptions _options; - public NamedPipeServerStreamPoolPolicy(NamedPipeConnectionListener listener) + public NamedPipeServerStreamPoolPolicy(NamedPipeEndPoint endpoint, NamedPipeTransportOptions options) { - _listener = listener; + _endpoint = endpoint; + _options = options; } public NamedPipeServerStream Create() { NamedPipeServerStream stream; var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough; - if (_listener._options.CurrentUserOnly) + if (_options.CurrentUserOnly) { pipeOptions |= NamedPipeOptions.CurrentUserOnly; } - if (_listener._options.PipeSecurity != null) + if (_options.PipeSecurity != null) { stream = NamedPipeServerStreamAcl.Create( - _listener._endpoint.PipeName, + _endpoint.PipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Byte, pipeOptions, inBufferSize: 0, // Buffer in System.IO.Pipelines outBufferSize: 0, // Buffer in System.IO.Pipelines - _listener._options.PipeSecurity); + _options.PipeSecurity); } else { stream = new NamedPipeServerStream( - _listener._endpoint.PipeName, + _endpoint.PipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Byte, @@ -221,6 +227,6 @@ public NamedPipeServerStream Create() return stream; } - public bool Return(NamedPipeServerStream obj) => true; + public bool Return(NamedPipeServerStream obj) => !obj.IsConnected; } } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeTransportFactory.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeTransportFactory.cs index 8c11a375690c..e8f8023a91bf 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeTransportFactory.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeTransportFactory.cs @@ -5,6 +5,7 @@ using System.Net; using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.ObjectPool; using Microsoft.Extensions.Options; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal; @@ -14,17 +15,20 @@ internal sealed class NamedPipeTransportFactory : IConnectionListenerFactory, IC private const string LocalComputerServerName = "."; private readonly ILoggerFactory _loggerFactory; + private readonly ObjectPoolProvider _objectPoolProvider; private readonly NamedPipeTransportOptions _options; public NamedPipeTransportFactory( ILoggerFactory loggerFactory, - IOptions options) + IOptions options, + ObjectPoolProvider objectPoolProvider) { ArgumentNullException.ThrowIfNull(loggerFactory); Debug.Assert(OperatingSystem.IsWindows(), "Named pipes transport requires a Windows operating system."); _loggerFactory = loggerFactory; + _objectPoolProvider = objectPoolProvider; _options = options.Value; } @@ -52,7 +56,7 @@ public ValueTask BindAsync(EndPoint endpoint, CancellationT throw new AddressInUseException($"Named pipe '{namedPipeEndPoint.PipeName}' is already in use by Kestrel."); } - var listener = new NamedPipeConnectionListener(namedPipeEndPoint, _options, _loggerFactory, mutex); + var listener = new NamedPipeConnectionListener(namedPipeEndPoint, _options, _loggerFactory, _objectPoolProvider, mutex); listener.Start(); return new ValueTask(listener); diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/WebHostBuilderNamedPipeExtensions.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/WebHostBuilderNamedPipeExtensions.cs index 6a7889151fb1..53f94f801e74 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/WebHostBuilderNamedPipeExtensions.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/WebHostBuilderNamedPipeExtensions.cs @@ -6,6 +6,8 @@ using Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes; using Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.ObjectPool; namespace Microsoft.AspNetCore.Hosting; @@ -29,6 +31,7 @@ public static IWebHostBuilder UseNamedPipes(this IWebHostBuilder hostBuilder) hostBuilder.ConfigureServices(services => { + services.TryAddSingleton(); services.AddSingleton(); }); return hostBuilder; diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index c98994df5c92..b9e4bd403cb1 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -5,6 +5,7 @@ using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Testing; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.ObjectPool; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Tests; @@ -24,11 +25,70 @@ public async Task AcceptAsync_AfterUnbind_ReturnNull() Assert.Null(await connectionListener.AcceptAsync().DefaultTimeout()); } + private class TestObjectPoolProvider : ObjectPoolProvider + { + public List Pools { get; } = new List(); + + public override ObjectPool Create(IPooledObjectPolicy policy) + { + var pool = new TestObjectPool(policy); + Pools.Add(pool); + + return pool; + } + + private class TestObjectPool : ObjectPool, ITestObjectPool where T : class + { + private readonly IPooledObjectPolicy _policy; + + public TestObjectPool(IPooledObjectPolicy policy) + { + _policy = policy; + } + + public int GetCount { get; private set; } + public int ReturnSuccessCount { get; private set; } + public int ReturnFailureCount { get; private set; } + + public override T Get() + { + GetCount++; + return _policy.Create(); + } + + public override void Return(T obj) + { + if (_policy.Return(obj)) + { + ReturnSuccessCount++; + } + else + { + ReturnFailureCount++; + } + } + } + } + + private interface ITestObjectPool + { + int GetCount { get; } + int ReturnSuccessCount { get; } + int ReturnFailureCount { get; } + } + [ConditionalFact] public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts() { // Arrange - await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory); + var testObjectPoolProvider = new TestObjectPoolProvider(); + var options = new NamedPipeTransportOptions + { + ListenerQueueCount = 2 + }; + await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options, objectPoolProvider: testObjectPoolProvider); + var pool = Assert.Single(testObjectPoolProvider.Pools); + Assert.Equal(options.ListenerQueueCount, pool.GetCount); // Stream 1 var acceptTask1 = connectionListener.AcceptAsync(); @@ -40,6 +100,10 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts() await serverConnection1.DisposeAsync().AsTask().DefaultTimeout(); Assert.True(serverConnection1.ConnectionClosed.IsCancellationRequested, "Connection 1 should be closed"); + Assert.Equal(options.ListenerQueueCount + 1, pool.GetCount); + Assert.Equal(1, pool.ReturnSuccessCount); + Assert.Equal(0, pool.ReturnFailureCount); + // Stream 2 var acceptTask2 = connectionListener.AcceptAsync(); await using var clientStream2 = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint); @@ -49,6 +113,10 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts() Assert.False(serverConnection2.ConnectionClosed.IsCancellationRequested, "Connection 2 should be open"); await serverConnection2.DisposeAsync().AsTask().DefaultTimeout(); Assert.True(serverConnection2.ConnectionClosed.IsCancellationRequested, "Connection 2 should be closed"); + + Assert.Equal(options.ListenerQueueCount + 2, pool.GetCount); + Assert.Equal(2, pool.ReturnSuccessCount); + Assert.Equal(0, pool.ReturnFailureCount); } [ConditionalFact] diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeTestHelpers.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeTestHelpers.cs index 66a58002f187..fcfcedab1e95 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeTestHelpers.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeTestHelpers.cs @@ -10,6 +10,7 @@ using Microsoft.AspNetCore.Testing; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.ObjectPool; using Microsoft.Extensions.Options; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Tests; @@ -22,18 +23,20 @@ internal static class NamedPipeTestHelpers public static NamedPipeTransportFactory CreateTransportFactory( ILoggerFactory loggerFactory = null, - NamedPipeTransportOptions options = null) + NamedPipeTransportOptions options = null, + ObjectPoolProvider objectPoolProvider = null) { options ??= new NamedPipeTransportOptions(); - return new NamedPipeTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(options)); + return new NamedPipeTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(options), objectPoolProvider ?? new DefaultObjectPoolProvider()); } public static async Task CreateConnectionListenerFactory( ILoggerFactory loggerFactory = null, string pipeName = null, - NamedPipeTransportOptions options = null) + NamedPipeTransportOptions options = null, + ObjectPoolProvider objectPoolProvider = null) { - var transportFactory = CreateTransportFactory(loggerFactory, options); + var transportFactory = CreateTransportFactory(loggerFactory, options, objectPoolProvider); var endpoint = new NamedPipeEndPoint(pipeName ?? GetUniquePipeName());