@@ -25,6 +25,7 @@ public class SocketOutput : ISocketOutput
25
25
26
26
// This locks access to to all of the below fields
27
27
private readonly object _lockObj = new object ( ) ;
28
+ private bool _isDisposed ;
28
29
29
30
// The number of write operations that have been scheduled so far
30
31
// but have not completed.
@@ -236,11 +237,16 @@ private void OnWriteCompleted(WriteContext write)
236
237
}
237
238
238
239
if ( _writeContexts . Count < _maxPooledWriteContexts
239
- && write . Buffers . Count <= _maxPooledBufferQueues )
240
+ && write . Buffers . Count <= _maxPooledBufferQueues
241
+ && ! _isDisposed )
240
242
{
241
243
write . Reset ( ) ;
242
244
_writeContexts . Enqueue ( write ) ;
243
245
}
246
+ else
247
+ {
248
+ write . Dispose ( ) ;
249
+ }
244
250
245
251
// Now that the while loop has completed the following invariants should hold true:
246
252
Debug . Assert ( _numBytesPreCompleted >= 0 ) ;
@@ -268,7 +274,21 @@ Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, Cancell
268
274
return WriteAsync ( buffer , immediate ) ;
269
275
}
270
276
271
- private class WriteContext
277
+ private void Dispose ( )
278
+ {
279
+ lock ( _lockObj )
280
+ {
281
+ _isDisposed = true ;
282
+
283
+ while ( _writeContexts . Count > 0 )
284
+ {
285
+ _writeContexts . Dequeue ( ) . Dispose ( ) ;
286
+ }
287
+ }
288
+
289
+ }
290
+
291
+ private class WriteContext : IDisposable
272
292
{
273
293
public SocketOutput Self ;
274
294
@@ -278,13 +298,16 @@ private class WriteContext
278
298
279
299
public int WriteStatus ;
280
300
public Exception WriteError ;
301
+ private UvWriteReq _writeReq ;
281
302
282
303
public int ShutdownSendStatus ;
283
304
284
305
public WriteContext ( SocketOutput self )
285
306
{
286
307
Self = self ;
287
308
Buffers = new Queue < ArraySegment < byte > > ( _maxPooledBufferQueues ) ;
309
+ _writeReq = new UvWriteReq ( Self . _log ) ;
310
+ _writeReq . Init ( Self . _thread . Loop ) ;
288
311
}
289
312
290
313
/// <summary>
@@ -305,12 +328,9 @@ public void DoWriteIfNeeded()
305
328
{
306
329
buffers [ i ++ ] = buffer ;
307
330
}
308
-
309
- var writeReq = new UvWriteReq ( Self . _log ) ;
310
- writeReq . Init ( Self . _thread . Loop ) ;
311
- writeReq . Write ( Self . _socket , new ArraySegment < ArraySegment < byte > > ( buffers ) , ( _writeReq , status , error , state ) =>
331
+
332
+ _writeReq . Write ( Self . _socket , new ArraySegment < ArraySegment < byte > > ( buffers ) , ( _writeReq , status , error , state ) =>
312
333
{
313
- _writeReq . Dispose ( ) ;
314
334
var _this = ( WriteContext ) state ;
315
335
_this . WriteStatus = status ;
316
336
_this . WriteError = error ;
@@ -348,11 +368,17 @@ public void DoShutdownIfNeeded()
348
368
/// </summary>
349
369
public void DoDisconnectIfNeeded ( )
350
370
{
351
- if ( SocketDisconnect == false || Self . _socket . IsClosed )
371
+ if ( SocketDisconnect == false )
352
372
{
353
373
Complete ( ) ;
354
374
return ;
355
375
}
376
+ else if ( Self . _socket . IsClosed )
377
+ {
378
+ Self . Dispose ( ) ;
379
+ Complete ( ) ;
380
+ return ;
381
+ }
356
382
357
383
Self . _socket . Dispose ( ) ;
358
384
Self . _log . ConnectionStop ( Self . _connectionId ) ;
@@ -373,6 +399,11 @@ public void Reset()
373
399
WriteError = null ;
374
400
ShutdownSendStatus = 0 ;
375
401
}
402
+
403
+ public void Dispose ( )
404
+ {
405
+ _writeReq . Dispose ( ) ;
406
+ }
376
407
}
377
408
}
378
409
}
0 commit comments