2
2
// The .NET Foundation licenses this file to you under the MIT license.
3
3
4
4
using System . Buffers ;
5
- using System . Collections . Concurrent ;
6
5
using System . Diagnostics ;
7
6
using System . IO . Pipelines ;
8
7
using System . IO . Pipes ;
9
8
using System . Net ;
10
9
using System . Threading . Channels ;
11
10
using Microsoft . AspNetCore . Connections ;
12
11
using Microsoft . Extensions . Logging ;
12
+ using Microsoft . Extensions . ObjectPool ;
13
13
using NamedPipeOptions = System . IO . Pipes . PipeOptions ;
14
14
using PipeOptions = System . IO . Pipelines . PipeOptions ;
15
15
@@ -20,14 +20,14 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
20
20
private readonly ILogger _log ;
21
21
private readonly NamedPipeEndPoint _endpoint ;
22
22
private readonly NamedPipeTransportOptions _options ;
23
+ private readonly ObjectPool < NamedPipeServerStream > _namedPipeServerStreamPool ;
23
24
private readonly CancellationTokenSource _listeningTokenSource = new CancellationTokenSource ( ) ;
24
25
private readonly CancellationToken _listeningToken ;
25
26
private readonly Channel < ConnectionContext > _acceptedQueue ;
26
27
private readonly MemoryPool < byte > _memoryPool ;
27
28
private readonly PipeOptions _inputOptions ;
28
29
private readonly PipeOptions _outputOptions ;
29
30
private readonly Mutex _mutex ;
30
- private readonly ConcurrentQueue < NamedPipeServerStream > _streamsCache = new ConcurrentQueue < NamedPipeServerStream > ( ) ;
31
31
private Task [ ] ? _listeningTasks ;
32
32
private int _disposed ;
33
33
@@ -40,6 +40,7 @@ public NamedPipeConnectionListener(
40
40
_log = loggerFactory . CreateLogger ( "Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes" ) ;
41
41
_endpoint = endpoint ;
42
42
_options = options ;
43
+ _namedPipeServerStreamPool = new DefaultObjectPoolProvider ( ) . Create ( new NamedPipeServerStreamPoolPolicy ( this ) ) ;
43
44
_mutex = mutex ;
44
45
_memoryPool = options . MemoryPoolFactory ( ) ;
45
46
_listeningToken = _listeningTokenSource . Token ;
@@ -56,17 +57,10 @@ public NamedPipeConnectionListener(
56
57
_outputOptions = new PipeOptions ( _memoryPool , PipeScheduler . Inline , PipeScheduler . ThreadPool , maxWriteBufferSize , maxWriteBufferSize / 2 , useSynchronizationContext : false ) ;
57
58
}
58
59
59
- internal bool TryCacheStream ( NamedPipeServerStream namedPipeServerStream )
60
+ internal void ReturnStream ( NamedPipeServerStream namedPipeServerStream )
60
61
{
61
- // Limit the number of cached named pipe server streams.
62
- // This isn't thread safe so it's possible for Count and Enqueue to race and slightly exceed this limit.
63
- if ( _streamsCache . Count <= 50 )
64
- {
65
- _streamsCache . Enqueue ( namedPipeServerStream ) ;
66
- return true ;
67
- }
68
-
69
- return false ;
62
+ // The stream is automatically disposed if there isn't space in the pool.
63
+ _namedPipeServerStreamPool . Return ( namedPipeServerStream ) ;
70
64
}
71
65
72
66
public void Start ( )
@@ -78,7 +72,7 @@ public void Start()
78
72
for ( var i = 0 ; i < _listeningTasks . Length ; i ++ )
79
73
{
80
74
// Start first stream inline to catch creation errors.
81
- var initialStream = CreateServerStream ( ) ;
75
+ var initialStream = _namedPipeServerStreamPool . Get ( ) ;
82
76
83
77
_listeningTasks [ i ] = Task . Run ( ( ) => StartAsync ( initialStream ) ) ;
84
78
}
@@ -104,7 +98,7 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
104
98
// Create the next stream before writing connected stream to the channel.
105
99
// This ensures there is always a created stream and another process can't
106
100
// create a stream with the same name with different a access policy.
107
- nextStream = GetOrCreateServerStream ( ) ;
101
+ nextStream = _namedPipeServerStreamPool . Get ( ) ;
108
102
109
103
while ( ! _acceptedQueue . Writer . TryWrite ( connection ) )
110
104
{
@@ -121,7 +115,7 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
121
115
122
116
// Dispose existing pipe, create a new one and continue accepting.
123
117
nextStream . Dispose ( ) ;
124
- nextStream = GetOrCreateServerStream ( ) ;
118
+ nextStream = _namedPipeServerStreamPool . Get ( ) ;
125
119
}
126
120
catch ( OperationCanceledException ex ) when ( _listeningToken . IsCancellationRequested )
127
121
{
@@ -140,52 +134,6 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
140
134
}
141
135
}
142
136
143
- private NamedPipeServerStream GetOrCreateServerStream ( )
144
- {
145
- if ( ! _streamsCache . TryDequeue ( out var stream ) )
146
- {
147
- // Cache is empty. Create a new server stream.
148
- stream = CreateServerStream ( ) ;
149
- }
150
-
151
- return stream ;
152
- }
153
-
154
- private NamedPipeServerStream CreateServerStream ( )
155
- {
156
- NamedPipeServerStream stream ;
157
- var pipeOptions = NamedPipeOptions . Asynchronous | NamedPipeOptions . WriteThrough ;
158
- if ( _options . CurrentUserOnly )
159
- {
160
- pipeOptions |= NamedPipeOptions . CurrentUserOnly ;
161
- }
162
-
163
- if ( _options . PipeSecurity != null )
164
- {
165
- stream = NamedPipeServerStreamAcl . Create (
166
- _endpoint . PipeName ,
167
- PipeDirection . InOut ,
168
- NamedPipeServerStream . MaxAllowedServerInstances ,
169
- PipeTransmissionMode . Byte ,
170
- pipeOptions ,
171
- inBufferSize : 0 , // Buffer in System.IO.Pipelines
172
- outBufferSize : 0 , // Buffer in System.IO.Pipelines
173
- _options . PipeSecurity ) ;
174
- }
175
- else
176
- {
177
- stream = new NamedPipeServerStream (
178
- _endpoint . PipeName ,
179
- PipeDirection . InOut ,
180
- NamedPipeServerStream . MaxAllowedServerInstances ,
181
- PipeTransmissionMode . Byte ,
182
- pipeOptions ,
183
- inBufferSize : 0 ,
184
- outBufferSize : 0 ) ;
185
- }
186
- return stream ;
187
- }
188
-
189
137
public async ValueTask < ConnectionContext ? > AcceptAsync ( CancellationToken cancellationToken = default )
190
138
{
191
139
while ( await _acceptedQueue . Reader . WaitToReadAsync ( cancellationToken ) )
@@ -217,5 +165,56 @@ public async ValueTask DisposeAsync()
217
165
{
218
166
await Task . WhenAll ( _listeningTasks ) ;
219
167
}
168
+
169
+ // Dispose pool after listening tasks are complete so there is no chance a stream
170
+ // is fetched from the pool after the pool is disposed.
171
+ ( ( IDisposable ) _namedPipeServerStreamPool ) . Dispose ( ) ;
172
+ }
173
+
174
+ private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy < NamedPipeServerStream >
175
+ {
176
+ public NamedPipeConnectionListener _listener ;
177
+
178
+ public NamedPipeServerStreamPoolPolicy ( NamedPipeConnectionListener listener )
179
+ {
180
+ _listener = listener ;
181
+ }
182
+
183
+ public NamedPipeServerStream Create ( )
184
+ {
185
+ NamedPipeServerStream stream ;
186
+ var pipeOptions = NamedPipeOptions . Asynchronous | NamedPipeOptions . WriteThrough ;
187
+ if ( _listener . _options . CurrentUserOnly )
188
+ {
189
+ pipeOptions |= NamedPipeOptions . CurrentUserOnly ;
190
+ }
191
+
192
+ if ( _listener . _options . PipeSecurity != null )
193
+ {
194
+ stream = NamedPipeServerStreamAcl . Create (
195
+ _listener . _endpoint . PipeName ,
196
+ PipeDirection . InOut ,
197
+ NamedPipeServerStream . MaxAllowedServerInstances ,
198
+ PipeTransmissionMode . Byte ,
199
+ pipeOptions ,
200
+ inBufferSize : 0 , // Buffer in System.IO.Pipelines
201
+ outBufferSize : 0 , // Buffer in System.IO.Pipelines
202
+ _listener . _options . PipeSecurity ) ;
203
+ }
204
+ else
205
+ {
206
+ stream = new NamedPipeServerStream (
207
+ _listener . _endpoint . PipeName ,
208
+ PipeDirection . InOut ,
209
+ NamedPipeServerStream . MaxAllowedServerInstances ,
210
+ PipeTransmissionMode . Byte ,
211
+ pipeOptions ,
212
+ inBufferSize : 0 ,
213
+ outBufferSize : 0 ) ;
214
+ }
215
+ return stream ;
216
+ }
217
+
218
+ public bool Return ( NamedPipeServerStream obj ) => true ;
220
219
}
221
220
}
0 commit comments