22// The .NET Foundation licenses this file to you under the MIT license.
33
44using System . Buffers ;
5+ using System . Diagnostics ;
56using System . IO . Pipelines ;
67using System . IO . Pipes ;
78using System . Net ;
89using System . Threading . Channels ;
910using Microsoft . AspNetCore . Connections ;
1011using Microsoft . Extensions . Logging ;
12+ using NamedPipeOptions = System . IO . Pipes . PipeOptions ;
1113using PipeOptions = System . IO . Pipelines . PipeOptions ;
1214
1315namespace Microsoft . AspNetCore . Server . Kestrel . Transport . NamedPipes . Internal ;
@@ -20,21 +22,24 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
2022 private readonly CancellationTokenSource _listeningTokenSource = new CancellationTokenSource ( ) ;
2123 private readonly CancellationToken _listeningToken ;
2224 private readonly Channel < ConnectionContext > _acceptedQueue ;
23- private readonly Task _listeningTask ;
2425 private readonly MemoryPool < byte > _memoryPool ;
2526 private readonly PipeOptions _inputOptions ;
2627 private readonly PipeOptions _outputOptions ;
28+ private readonly Mutex _mutex ;
29+ private Task ? _listeningTask ;
2730 private int _disposed ;
2831
2932 public NamedPipeConnectionListener (
3033 NamedPipeEndPoint endpoint ,
3134 NamedPipeTransportOptions options ,
32- ILoggerFactory loggerFactory )
35+ ILoggerFactory loggerFactory ,
36+ Mutex mutex )
3337 {
3438 _log = loggerFactory . CreateLogger ( "Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes" ) ;
3539 _endpoint = endpoint ;
3640 _options = options ;
37- _acceptedQueue = Channel . CreateBounded < ConnectionContext > ( new BoundedChannelOptions ( options . Backlog ) ) ;
41+ _mutex = mutex ;
42+ _acceptedQueue = Channel . CreateBounded < ConnectionContext > ( new BoundedChannelOptions ( options . Backlog ) { SingleWriter = true } ) ;
3843 _memoryPool = options . MemoryPoolFactory ( ) ;
3944 _listeningToken = _listeningTokenSource . Token ;
4045
@@ -43,50 +48,57 @@ public NamedPipeConnectionListener(
4348
4449 _inputOptions = new PipeOptions ( _memoryPool , PipeScheduler . ThreadPool , PipeScheduler . Inline , maxReadBufferSize , maxReadBufferSize / 2 , useSynchronizationContext : false ) ;
4550 _outputOptions = new PipeOptions ( _memoryPool , PipeScheduler . Inline , PipeScheduler . ThreadPool , maxWriteBufferSize , maxWriteBufferSize / 2 , useSynchronizationContext : false ) ;
51+ }
52+
53+ public void Start ( )
54+ {
55+ Debug . Assert ( _listeningTask == null , "Already started" ) ;
4656
47- // Start after all fields are initialized.
48- _listeningTask = StartAsync ( ) ;
57+ // Start first stream inline to catch creation errors.
58+ var initialStream = CreateServerStream ( ) ;
59+
60+ _listeningTask = StartAsync ( initialStream ) ;
4961 }
5062
5163 public EndPoint EndPoint => _endpoint ;
5264
53- private async Task StartAsync ( )
65+ private async Task StartAsync ( NamedPipeServerStream nextStream )
5466 {
5567 try
5668 {
5769 while ( true )
5870 {
59- NamedPipeServerStream stream ;
60-
6171 try
6272 {
63- _listeningToken . ThrowIfCancellationRequested ( ) ;
64-
65- stream = NamedPipeServerStreamAcl . Create (
66- _endpoint . PipeName ,
67- PipeDirection . InOut ,
68- NamedPipeServerStream . MaxAllowedServerInstances ,
69- PipeTransmissionMode . Byte ,
70- _endpoint . PipeOptions ,
71- inBufferSize : 0 , // Buffer in System.IO.Pipelines
72- outBufferSize : 0 , // Buffer in System.IO.Pipelines
73- _options . PipeSecurity ) ;
73+ var stream = nextStream ;
7474
7575 await stream . WaitForConnectionAsync ( _listeningToken ) ;
76+
77+ var connection = new NamedPipeConnection ( stream , _endpoint , _log , _memoryPool , _inputOptions , _outputOptions ) ;
78+ connection . Start ( ) ;
79+
80+ // Create the next stream before writing connected stream to the channel.
81+ // This ensures there is always a created stream and another process can't
82+ // create a stream with the same name with different a access policy.
83+ nextStream = CreateServerStream ( ) ;
84+
85+ while ( ! _acceptedQueue . Writer . TryWrite ( connection ) )
86+ {
87+ if ( ! await _acceptedQueue . Writer . WaitToWriteAsync ( _listeningToken ) )
88+ {
89+ throw new InvalidOperationException ( "Accept queue writer was unexpectedly closed." ) ;
90+ }
91+ }
7692 }
7793 catch ( OperationCanceledException ex ) when ( _listeningToken . IsCancellationRequested )
7894 {
7995 // Cancelled the current token
8096 NamedPipeLog . ConnectionListenerAborted ( _log , ex ) ;
8197 break ;
8298 }
83-
84- var connection = new NamedPipeConnection ( stream , _endpoint , _log , _memoryPool , _inputOptions , _outputOptions ) ;
85- connection . Start ( ) ;
86-
87- _acceptedQueue . Writer . TryWrite ( connection ) ;
8899 }
89100
101+ nextStream . Dispose ( ) ;
90102 _acceptedQueue . Writer . TryComplete ( ) ;
91103 }
92104 catch ( Exception ex )
@@ -95,6 +107,41 @@ private async Task StartAsync()
95107 }
96108 }
97109
110+ private NamedPipeServerStream CreateServerStream ( )
111+ {
112+ NamedPipeServerStream stream ;
113+ var pipeOptions = NamedPipeOptions . Asynchronous | NamedPipeOptions . WriteThrough ;
114+ if ( _options . CurrentUserOnly )
115+ {
116+ pipeOptions |= NamedPipeOptions . CurrentUserOnly ;
117+ }
118+
119+ if ( _options . PipeSecurity != null )
120+ {
121+ stream = NamedPipeServerStreamAcl . Create (
122+ _endpoint . PipeName ,
123+ PipeDirection . InOut ,
124+ NamedPipeServerStream . MaxAllowedServerInstances ,
125+ PipeTransmissionMode . Byte ,
126+ pipeOptions ,
127+ inBufferSize : 0 , // Buffer in System.IO.Pipelines
128+ outBufferSize : 0 , // Buffer in System.IO.Pipelines
129+ _options . PipeSecurity ) ;
130+ }
131+ else
132+ {
133+ stream = new NamedPipeServerStream (
134+ _endpoint . PipeName ,
135+ PipeDirection . InOut ,
136+ NamedPipeServerStream . MaxAllowedServerInstances ,
137+ PipeTransmissionMode . Byte ,
138+ pipeOptions ,
139+ inBufferSize : 0 ,
140+ outBufferSize : 0 ) ;
141+ }
142+ return stream ;
143+ }
144+
98145 public async ValueTask < ConnectionContext ? > AcceptAsync ( CancellationToken cancellationToken = default )
99146 {
100147 while ( await _acceptedQueue . Reader . WaitToReadAsync ( cancellationToken ) )
@@ -109,6 +156,8 @@ private async Task StartAsync()
109156 return null ;
110157 }
111158
159+ public ValueTask UnbindAsync ( CancellationToken cancellationToken = default ) => DisposeAsync ( ) ;
160+
112161 public async ValueTask DisposeAsync ( )
113162 {
114163 // A stream may be waiting on WaitForConnectionAsync when dispose happens.
@@ -119,12 +168,10 @@ public async ValueTask DisposeAsync()
119168 }
120169
121170 _listeningTokenSource . Dispose ( ) ;
122- await _listeningTask ;
123- }
124-
125- public async ValueTask UnbindAsync ( CancellationToken cancellationToken = default )
126- {
127- _listeningTokenSource . Cancel ( ) ;
128- await _listeningTask ;
171+ _mutex . Dispose ( ) ;
172+ if ( _listeningTask != null )
173+ {
174+ await _listeningTask ;
175+ }
129176 }
130177}
0 commit comments