Skip to content

Cache and reuse NamedPipeServerStream instances #46473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;

internal sealed class NamedPipeConnection : TransportConnection, IConnectionNamedPipeFeature
{
private static readonly ConnectionAbortedException SendGracefullyCompletedException = new ConnectionAbortedException("The named pipe transport's send loop completed gracefully.");
private const int MinAllocBufferSize = 4096;

private readonly NamedPipeConnectionListener _connectionListener;
private readonly NamedPipeServerStream _stream;
private readonly ILogger _log;
private readonly IDuplexPipe _originalTransport;

private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();
private bool _connectionClosed;
private bool _connectionDisposed;
private bool _connectionShutdown;
private bool _streamDisconnected;
private Exception? _shutdownReason;
private readonly object _shutdownLock = new object();

Expand All @@ -30,13 +32,15 @@ internal sealed class NamedPipeConnection : TransportConnection, IConnectionName
internal Task _sendingTask = Task.CompletedTask;

public NamedPipeConnection(
NamedPipeConnectionListener connectionListener,
NamedPipeServerStream stream,
NamedPipeEndPoint endPoint,
ILogger logger,
MemoryPool<byte> memoryPool,
PipeOptions inputOptions,
PipeOptions outputOptions)
{
_connectionListener = connectionListener;
_stream = stream;
_log = logger;
MemoryPool = memoryPool;
Expand Down Expand Up @@ -120,7 +124,7 @@ private async Task DoReceiveAsync()
// This exception should always be ignored because _shutdownReason should be set.
error = ex;

if (!_connectionDisposed)
if (!_connectionShutdown)
{
// This is unexpected if the socket hasn't been disposed yet.
NamedPipeLog.ConnectionError(_log, this, error);
Expand Down Expand Up @@ -206,33 +210,32 @@ private void Shutdown(Exception? shutdownReason)
{
lock (_shutdownLock)
{
if (_connectionDisposed)
if (_connectionShutdown)
{
return;
}

// Make sure to close the connection only after the _aborted flag is set.
// Without this, the RequestsCanBeAbortedMidRead test will sometimes fail when
// a BadHttpRequestException is thrown instead of a TaskCanceledException.
_connectionDisposed = true;
_connectionShutdown = true;

// shutdownReason should only be null if the output was completed gracefully, so no one should ever
// ever observe the nondescript ConnectionAbortedException except for connection middleware attempting
// to half close the connection which is currently unsupported.
_shutdownReason = shutdownReason ?? new ConnectionAbortedException("The Socket transport's send loop completed gracefully.");
_shutdownReason = shutdownReason ?? SendGracefullyCompletedException;
NamedPipeLog.ConnectionDisconnect(_log, this, _shutdownReason.Message);

try
{
// Try to gracefully close the socket even for aborts to match libuv behavior.
_stream.Disconnect();
_streamDisconnected = true;
}
catch
{
// Ignore any errors from NamedPipeStream.Disconnect() since we're tearing down the connection anyway.
}

_stream.Dispose();
}
}

Expand Down Expand Up @@ -287,8 +290,17 @@ public override async ValueTask DisposeAsync()
catch (Exception ex)
{
_log.LogError(0, ex, $"Unexpected exception in {nameof(NamedPipeConnection)}.{nameof(Start)}.");
_stream.Dispose();
return;
}

if (!_streamDisconnected)
{
_stream.Dispose();
}
else
{
_connectionListener.ReturnStream(_stream);
}

await _stream.DisposeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Channels;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using NamedPipeOptions = System.IO.Pipes.PipeOptions;
using PipeOptions = System.IO.Pipelines.PipeOptions;

Expand All @@ -19,6 +20,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
private readonly ILogger _log;
private readonly NamedPipeEndPoint _endpoint;
private readonly NamedPipeTransportOptions _options;
private readonly ObjectPool<NamedPipeServerStream> _namedPipeServerStreamPool;
private readonly CancellationTokenSource _listeningTokenSource = new CancellationTokenSource();
private readonly CancellationToken _listeningToken;
private readonly Channel<ConnectionContext> _acceptedQueue;
Expand All @@ -38,6 +40,7 @@ public NamedPipeConnectionListener(
_log = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes");
_endpoint = endpoint;
_options = options;
_namedPipeServerStreamPool = new DefaultObjectPoolProvider().Create(new NamedPipeServerStreamPoolPolicy(this));
_mutex = mutex;
_memoryPool = options.MemoryPoolFactory();
_listeningToken = _listeningTokenSource.Token;
Expand All @@ -54,6 +57,12 @@ public NamedPipeConnectionListener(
_outputOptions = new PipeOptions(_memoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
}

internal void ReturnStream(NamedPipeServerStream namedPipeServerStream)
{
// The stream is automatically disposed if there isn't space in the pool.
_namedPipeServerStreamPool.Return(namedPipeServerStream);
}

public void Start()
{
Debug.Assert(_listeningTasks == null, "Already started");
Expand All @@ -63,7 +72,7 @@ public void Start()
for (var i = 0; i < _listeningTasks.Length; i++)
{
// Start first stream inline to catch creation errors.
var initialStream = CreateServerStream();
var initialStream = _namedPipeServerStreamPool.Get();

_listeningTasks[i] = Task.Run(() => StartAsync(initialStream));
}
Expand All @@ -83,13 +92,13 @@ private async Task StartAsync(NamedPipeServerStream nextStream)

await stream.WaitForConnectionAsync(_listeningToken);

var connection = new NamedPipeConnection(stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
connection.Start();

// Create the next stream before writing connected stream to the channel.
// This ensures there is always a created stream and another process can't
// create a stream with the same name with different a access policy.
nextStream = CreateServerStream();
nextStream = _namedPipeServerStreamPool.Get();

while (!_acceptedQueue.Writer.TryWrite(connection))
{
Expand All @@ -106,7 +115,7 @@ private async Task StartAsync(NamedPipeServerStream nextStream)

// Dispose existing pipe, create a new one and continue accepting.
nextStream.Dispose();
nextStream = CreateServerStream();
nextStream = _namedPipeServerStreamPool.Get();
}
catch (OperationCanceledException ex) when (_listeningToken.IsCancellationRequested)
{
Expand All @@ -125,41 +134,6 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
}
}

private NamedPipeServerStream CreateServerStream()
{
NamedPipeServerStream stream;
var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough;
if (_options.CurrentUserOnly)
{
pipeOptions |= NamedPipeOptions.CurrentUserOnly;
}

if (_options.PipeSecurity != null)
{
stream = NamedPipeServerStreamAcl.Create(
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0, // Buffer in System.IO.Pipelines
outBufferSize: 0, // Buffer in System.IO.Pipelines
_options.PipeSecurity);
}
else
{
stream = new NamedPipeServerStream(
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0,
outBufferSize: 0);
}
return stream;
}

public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
{
while (await _acceptedQueue.Reader.WaitToReadAsync(cancellationToken))
Expand Down Expand Up @@ -191,5 +165,56 @@ public async ValueTask DisposeAsync()
{
await Task.WhenAll(_listeningTasks);
}

// Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed.
// Important to dispose because this empties and disposes streams in the pool.
((IDisposable)_namedPipeServerStreamPool).Dispose();
}

private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy<NamedPipeServerStream>
{
public NamedPipeConnectionListener _listener;

public NamedPipeServerStreamPoolPolicy(NamedPipeConnectionListener listener)
{
_listener = listener;
}

public NamedPipeServerStream Create()
{
NamedPipeServerStream stream;
var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough;
if (_listener._options.CurrentUserOnly)
{
pipeOptions |= NamedPipeOptions.CurrentUserOnly;
}

if (_listener._options.PipeSecurity != null)
{
stream = NamedPipeServerStreamAcl.Create(
_listener._endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0, // Buffer in System.IO.Pipelines
outBufferSize: 0, // Buffer in System.IO.Pipelines
_listener._options.PipeSecurity);
}
else
{
stream = new NamedPipeServerStream(
_listener._endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0,
outBufferSize: 0);
}
return stream;
}

public bool Return(NamedPipeServerStream obj) => true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<ItemGroup>
<Reference Include="Microsoft.AspNetCore.Hosting.Abstractions" />
<Reference Include="Microsoft.AspNetCore.Connections.Abstractions" />
<Reference Include="Microsoft.Extensions.ObjectPool" />
<Reference Include="Microsoft.Extensions.Options" />
</ItemGroup>

Expand Down