Skip to content

Named pipe caching test #46515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,14 @@ private void Shutdown(Exception? shutdownReason)

try
{
// Try to gracefully close the socket even for aborts to match libuv behavior.
// Try to gracefully disconnect the pipe even for aborts to match other transport behavior.
_stream.Disconnect();
_streamDisconnected = true;
}
catch
{
// Ignore any errors from NamedPipeStream.Disconnect() since we're tearing down the connection anyway.
// Ignore any errors from NamedPipeServerStream.Disconnect() since we're tearing down the connection anyway.
// _streamDisconnected is not set to true so the stream won't be pooled for reuse.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ public NamedPipeConnectionListener(
NamedPipeEndPoint endpoint,
NamedPipeTransportOptions options,
ILoggerFactory loggerFactory,
ObjectPoolProvider objectPoolProvider,
Mutex mutex)
{
_log = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes");
_endpoint = endpoint;
_options = options;
_namedPipeServerStreamPool = new DefaultObjectPoolProvider().Create(new NamedPipeServerStreamPoolPolicy(this));
_mutex = mutex;
_memoryPool = options.MemoryPoolFactory();
_listeningToken = _listeningTokenSource.Token;
// Have to create the pool here (instead of DI) because the pool is specific to an endpoint.
_namedPipeServerStreamPool = objectPoolProvider.Create(new NamedPipeServerStreamPoolPolicy(endpoint, options));

// The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection.
// We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream
Expand All @@ -57,10 +59,12 @@ public NamedPipeConnectionListener(
_outputOptions = new PipeOptions(_memoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
}

internal void ReturnStream(NamedPipeServerStream namedPipeServerStream)
internal void ReturnStream(NamedPipeServerStream stream)
{
Debug.Assert(!stream.IsConnected, "Stream should have been successfully disconnected to reach this point.");

// The stream is automatically disposed if there isn't space in the pool.
_namedPipeServerStreamPool.Return(namedPipeServerStream);
_namedPipeServerStreamPool.Return(stream);
}

public void Start()
Expand Down Expand Up @@ -174,43 +178,45 @@ public async ValueTask DisposeAsync()

// Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed.
// Important to dispose because this empties and disposes streams in the pool.
((IDisposable)_namedPipeServerStreamPool).Dispose();
(_namedPipeServerStreamPool as IDisposable)?.Dispose();
}

private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy<NamedPipeServerStream>
{
public NamedPipeConnectionListener _listener;
private readonly NamedPipeEndPoint _endpoint;
private readonly NamedPipeTransportOptions _options;

public NamedPipeServerStreamPoolPolicy(NamedPipeConnectionListener listener)
public NamedPipeServerStreamPoolPolicy(NamedPipeEndPoint endpoint, NamedPipeTransportOptions options)
{
_listener = listener;
_endpoint = endpoint;
_options = options;
}

public NamedPipeServerStream Create()
{
NamedPipeServerStream stream;
var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough;
if (_listener._options.CurrentUserOnly)
if (_options.CurrentUserOnly)
{
pipeOptions |= NamedPipeOptions.CurrentUserOnly;
}

if (_listener._options.PipeSecurity != null)
if (_options.PipeSecurity != null)
{
stream = NamedPipeServerStreamAcl.Create(
_listener._endpoint.PipeName,
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0, // Buffer in System.IO.Pipelines
outBufferSize: 0, // Buffer in System.IO.Pipelines
_listener._options.PipeSecurity);
_options.PipeSecurity);
}
else
{
stream = new NamedPipeServerStream(
_listener._endpoint.PipeName,
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
Expand All @@ -221,6 +227,6 @@ public NamedPipeServerStream Create()
return stream;
}

public bool Return(NamedPipeServerStream obj) => true;
public bool Return(NamedPipeServerStream obj) => !obj.IsConnected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Net;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;
Expand All @@ -14,17 +15,20 @@ internal sealed class NamedPipeTransportFactory : IConnectionListenerFactory, IC
private const string LocalComputerServerName = ".";

private readonly ILoggerFactory _loggerFactory;
private readonly ObjectPoolProvider _objectPoolProvider;
private readonly NamedPipeTransportOptions _options;

public NamedPipeTransportFactory(
ILoggerFactory loggerFactory,
IOptions<NamedPipeTransportOptions> options)
IOptions<NamedPipeTransportOptions> options,
ObjectPoolProvider objectPoolProvider)
{
ArgumentNullException.ThrowIfNull(loggerFactory);

Debug.Assert(OperatingSystem.IsWindows(), "Named pipes transport requires a Windows operating system.");

_loggerFactory = loggerFactory;
_objectPoolProvider = objectPoolProvider;
_options = options.Value;
}

Expand Down Expand Up @@ -52,7 +56,7 @@ public ValueTask<IConnectionListener> BindAsync(EndPoint endpoint, CancellationT
throw new AddressInUseException($"Named pipe '{namedPipeEndPoint.PipeName}' is already in use by Kestrel.");
}

var listener = new NamedPipeConnectionListener(namedPipeEndPoint, _options, _loggerFactory, mutex);
var listener = new NamedPipeConnectionListener(namedPipeEndPoint, _options, _loggerFactory, _objectPoolProvider, mutex);
listener.Start();

return new ValueTask<IConnectionListener>(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes;
using Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.ObjectPool;

namespace Microsoft.AspNetCore.Hosting;

Expand All @@ -29,6 +31,7 @@ public static IWebHostBuilder UseNamedPipes(this IWebHostBuilder hostBuilder)

hostBuilder.ConfigureServices(services =>
{
services.TryAddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddSingleton<IConnectionListenerFactory, NamedPipeTransportFactory>();
});
return hostBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Tests;

Expand All @@ -24,11 +25,70 @@ public async Task AcceptAsync_AfterUnbind_ReturnNull()
Assert.Null(await connectionListener.AcceptAsync().DefaultTimeout());
}

private class TestObjectPoolProvider : ObjectPoolProvider
{
public List<ITestObjectPool> Pools { get; } = new List<ITestObjectPool>();

public override ObjectPool<T> Create<T>(IPooledObjectPolicy<T> policy)
{
var pool = new TestObjectPool<T>(policy);
Pools.Add(pool);

return pool;
}

private class TestObjectPool<T> : ObjectPool<T>, ITestObjectPool where T : class
{
private readonly IPooledObjectPolicy<T> _policy;

public TestObjectPool(IPooledObjectPolicy<T> policy)
{
_policy = policy;
}

public int GetCount { get; private set; }
public int ReturnSuccessCount { get; private set; }
public int ReturnFailureCount { get; private set; }

public override T Get()
{
GetCount++;
return _policy.Create();
}

public override void Return(T obj)
{
if (_policy.Return(obj))
{
ReturnSuccessCount++;
}
else
{
ReturnFailureCount++;
}
}
}
}

private interface ITestObjectPool
{
int GetCount { get; }
int ReturnSuccessCount { get; }
int ReturnFailureCount { get; }
}

[ConditionalFact]
public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts()
{
// Arrange
await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory);
var testObjectPoolProvider = new TestObjectPoolProvider();
var options = new NamedPipeTransportOptions
{
ListenerQueueCount = 2
};
await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options, objectPoolProvider: testObjectPoolProvider);
var pool = Assert.Single(testObjectPoolProvider.Pools);
Assert.Equal(options.ListenerQueueCount, pool.GetCount);

// Stream 1
var acceptTask1 = connectionListener.AcceptAsync();
Expand All @@ -40,6 +100,10 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts()
await serverConnection1.DisposeAsync().AsTask().DefaultTimeout();
Assert.True(serverConnection1.ConnectionClosed.IsCancellationRequested, "Connection 1 should be closed");

Assert.Equal(options.ListenerQueueCount + 1, pool.GetCount);
Assert.Equal(1, pool.ReturnSuccessCount);
Assert.Equal(0, pool.ReturnFailureCount);

// Stream 2
var acceptTask2 = connectionListener.AcceptAsync();
await using var clientStream2 = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint);
Expand All @@ -49,6 +113,10 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts()
Assert.False(serverConnection2.ConnectionClosed.IsCancellationRequested, "Connection 2 should be open");
await serverConnection2.DisposeAsync().AsTask().DefaultTimeout();
Assert.True(serverConnection2.ConnectionClosed.IsCancellationRequested, "Connection 2 should be closed");

Assert.Equal(options.ListenerQueueCount + 2, pool.GetCount);
Assert.Equal(2, pool.ReturnSuccessCount);
Assert.Equal(0, pool.ReturnFailureCount);
}

[ConditionalFact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Tests;
Expand All @@ -22,18 +23,20 @@ internal static class NamedPipeTestHelpers

public static NamedPipeTransportFactory CreateTransportFactory(
ILoggerFactory loggerFactory = null,
NamedPipeTransportOptions options = null)
NamedPipeTransportOptions options = null,
ObjectPoolProvider objectPoolProvider = null)
{
options ??= new NamedPipeTransportOptions();
return new NamedPipeTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(options));
return new NamedPipeTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(options), objectPoolProvider ?? new DefaultObjectPoolProvider());
}

public static async Task<NamedPipeConnectionListener> CreateConnectionListenerFactory(
ILoggerFactory loggerFactory = null,
string pipeName = null,
NamedPipeTransportOptions options = null)
NamedPipeTransportOptions options = null,
ObjectPoolProvider objectPoolProvider = null)
{
var transportFactory = CreateTransportFactory(loggerFactory, options);
var transportFactory = CreateTransportFactory(loggerFactory, options, objectPoolProvider);

var endpoint = new NamedPipeEndPoint(pipeName ?? GetUniquePipeName());

Expand Down