Skip to content

Commit 4ea74dd

Browse files
committed
Cache and reuse NamedPipeServerStream instances
1 parent 20c7ae1 commit 4ea74dd

File tree

2 files changed

+47
-13
lines changed

2 files changed

+47
-13
lines changed

src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;
1313

1414
internal sealed class NamedPipeConnection : TransportConnection, IConnectionNamedPipeFeature
1515
{
16+
private static readonly ConnectionAbortedException SendGracefullyCompletedException = new ConnectionAbortedException("The named pipe transport's send loop completed gracefully.");
1617
private const int MinAllocBufferSize = 4096;
17-
18+
private readonly NamedPipeConnectionListener _connectionListener;
1819
private readonly NamedPipeServerStream _stream;
1920
private readonly ILogger _log;
2021
private readonly IDuplexPipe _originalTransport;
2122

2223
private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();
2324
private bool _connectionClosed;
24-
private bool _connectionDisposed;
25+
private bool _connectionShutdown;
26+
private bool _streamDisconnected;
2527
private Exception? _shutdownReason;
2628
private readonly object _shutdownLock = new object();
2729

@@ -30,13 +32,15 @@ internal sealed class NamedPipeConnection : TransportConnection, IConnectionName
3032
internal Task _sendingTask = Task.CompletedTask;
3133

3234
public NamedPipeConnection(
35+
NamedPipeConnectionListener connectionListener,
3336
NamedPipeServerStream stream,
3437
NamedPipeEndPoint endPoint,
3538
ILogger logger,
3639
MemoryPool<byte> memoryPool,
3740
PipeOptions inputOptions,
3841
PipeOptions outputOptions)
3942
{
43+
_connectionListener = connectionListener;
4044
_stream = stream;
4145
_log = logger;
4246
MemoryPool = memoryPool;
@@ -120,7 +124,7 @@ private async Task DoReceiveAsync()
120124
// This exception should always be ignored because _shutdownReason should be set.
121125
error = ex;
122126

123-
if (!_connectionDisposed)
127+
if (!_connectionShutdown)
124128
{
125129
// This is unexpected if the socket hasn't been disposed yet.
126130
NamedPipeLog.ConnectionError(_log, this, error);
@@ -206,33 +210,32 @@ private void Shutdown(Exception? shutdownReason)
206210
{
207211
lock (_shutdownLock)
208212
{
209-
if (_connectionDisposed)
213+
if (_connectionShutdown)
210214
{
211215
return;
212216
}
213217

214218
// Make sure to close the connection only after the _aborted flag is set.
215219
// Without this, the RequestsCanBeAbortedMidRead test will sometimes fail when
216220
// a BadHttpRequestException is thrown instead of a TaskCanceledException.
217-
_connectionDisposed = true;
221+
_connectionShutdown = true;
218222

219223
// shutdownReason should only be null if the output was completed gracefully, so no one should ever
220224
// ever observe the nondescript ConnectionAbortedException except for connection middleware attempting
221225
// to half close the connection which is currently unsupported.
222-
_shutdownReason = shutdownReason ?? new ConnectionAbortedException("The Socket transport's send loop completed gracefully.");
226+
_shutdownReason = shutdownReason ?? SendGracefullyCompletedException;
223227
NamedPipeLog.ConnectionDisconnect(_log, this, _shutdownReason.Message);
224228

225229
try
226230
{
227231
// Try to gracefully close the socket even for aborts to match libuv behavior.
228232
_stream.Disconnect();
233+
_streamDisconnected = true;
229234
}
230235
catch
231236
{
232237
// Ignore any errors from NamedPipeStream.Disconnect() since we're tearing down the connection anyway.
233238
}
234-
235-
_stream.Dispose();
236239
}
237240
}
238241

@@ -287,8 +290,13 @@ public override async ValueTask DisposeAsync()
287290
catch (Exception ex)
288291
{
289292
_log.LogError(0, ex, $"Unexpected exception in {nameof(NamedPipeConnection)}.{nameof(Start)}.");
293+
_stream.Dispose();
294+
return;
295+
}
296+
297+
if (!_streamDisconnected || !_connectionListener.TryCacheStream(_stream))
298+
{
299+
_stream.Dispose();
290300
}
291-
292-
await _stream.DisposeAsync();
293301
}
294302
}

src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Buffers;
5+
using System.Collections.Concurrent;
56
using System.Diagnostics;
67
using System.IO.Pipelines;
78
using System.IO.Pipes;
@@ -26,6 +27,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
2627
private readonly PipeOptions _inputOptions;
2728
private readonly PipeOptions _outputOptions;
2829
private readonly Mutex _mutex;
30+
private readonly ConcurrentQueue<NamedPipeServerStream> _streamsCache = new ConcurrentQueue<NamedPipeServerStream>();
2931
private Task[]? _listeningTasks;
3032
private int _disposed;
3133

@@ -54,6 +56,19 @@ public NamedPipeConnectionListener(
5456
_outputOptions = new PipeOptions(_memoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
5557
}
5658

59+
internal bool TryCacheStream(NamedPipeServerStream namedPipeServerStream)
60+
{
61+
// Limit the number of cached named pipe server streams.
62+
// This isn't thread safe so it's possible for Count and Enqueue to race and slightly exceed this limit.
63+
if (_streamsCache.Count <= 50)
64+
{
65+
_streamsCache.Enqueue(namedPipeServerStream);
66+
return true;
67+
}
68+
69+
return false;
70+
}
71+
5772
public void Start()
5873
{
5974
Debug.Assert(_listeningTasks == null, "Already started");
@@ -83,13 +98,13 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
8398

8499
await stream.WaitForConnectionAsync(_listeningToken);
85100

86-
var connection = new NamedPipeConnection(stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
101+
var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
87102
connection.Start();
88103

89104
// Create the next stream before writing connected stream to the channel.
90105
// This ensures there is always a created stream and another process can't
91106
// create a stream with the same name with different a access policy.
92-
nextStream = CreateServerStream();
107+
nextStream = GetOrCreateServerStream();
93108

94109
while (!_acceptedQueue.Writer.TryWrite(connection))
95110
{
@@ -106,7 +121,7 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
106121

107122
// Dispose existing pipe, create a new one and continue accepting.
108123
nextStream.Dispose();
109-
nextStream = CreateServerStream();
124+
nextStream = GetOrCreateServerStream();
110125
}
111126
catch (OperationCanceledException ex) when (_listeningToken.IsCancellationRequested)
112127
{
@@ -125,6 +140,17 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
125140
}
126141
}
127142

143+
private NamedPipeServerStream GetOrCreateServerStream()
144+
{
145+
if (!_streamsCache.TryDequeue(out var stream))
146+
{
147+
// Cache is empty. Create a new server stream.
148+
stream = CreateServerStream();
149+
}
150+
151+
return stream;
152+
}
153+
128154
private NamedPipeServerStream CreateServerStream()
129155
{
130156
NamedPipeServerStream stream;

0 commit comments

Comments
 (0)