@@ -28,6 +28,7 @@ public class SocketInput : ICriticalNotifyCompletion, IDisposable
28
28
private MemoryPoolBlock _pinned ;
29
29
30
30
private int _consumingState ;
31
+ private object _sync = new object ( ) ;
31
32
32
33
public SocketInput ( MemoryPool memory , IThreadPool threadPool )
33
34
{
@@ -58,64 +59,70 @@ public MemoryPoolBlock IncomingStart()
58
59
59
60
public void IncomingData ( byte [ ] buffer , int offset , int count )
60
61
{
61
- if ( count > 0 )
62
+ lock ( _sync )
62
63
{
63
- if ( _tail == null )
64
+ if ( count > 0 )
64
65
{
65
- _tail = _memory . Lease ( ) ;
66
- }
66
+ if ( _tail == null )
67
+ {
68
+ _tail = _memory . Lease ( ) ;
69
+ }
70
+
71
+ var iterator = new MemoryPoolIterator ( _tail , _tail . End ) ;
72
+ iterator . CopyFrom ( buffer , offset , count ) ;
67
73
68
- var iterator = new MemoryPoolIterator ( _tail , _tail . End ) ;
69
- iterator . CopyFrom ( buffer , offset , count ) ;
74
+ if ( _head == null )
75
+ {
76
+ _head = _tail ;
77
+ }
70
78
71
- if ( _head == null )
79
+ _tail = iterator . Block ;
80
+ }
81
+ else
72
82
{
73
- _head = _tail ;
83
+ RemoteIntakeFin = true ;
74
84
}
75
85
76
- _tail = iterator . Block ;
86
+ Complete ( ) ;
77
87
}
78
- else
79
- {
80
- RemoteIntakeFin = true ;
81
- }
82
-
83
- Complete ( ) ;
84
88
}
85
89
86
90
public void IncomingComplete ( int count , Exception error )
87
91
{
88
- if ( _pinned != null )
92
+ lock ( _sync )
89
93
{
90
- _pinned . End += count ;
91
-
92
- if ( _head == null )
94
+ if ( _pinned != null )
93
95
{
94
- _head = _tail = _pinned ;
96
+ _pinned . End += count ;
97
+
98
+ if ( _head == null )
99
+ {
100
+ _head = _tail = _pinned ;
101
+ }
102
+ else if ( _tail == _pinned )
103
+ {
104
+ // NO-OP: this was a read into unoccupied tail-space
105
+ }
106
+ else
107
+ {
108
+ _tail . Next = _pinned ;
109
+ _tail = _pinned ;
110
+ }
111
+
112
+ _pinned = null ;
95
113
}
96
- else if ( _tail == _pinned )
114
+
115
+ if ( count == 0 )
97
116
{
98
- // NO-OP: this was a read into unoccupied tail-space
117
+ RemoteIntakeFin = true ;
99
118
}
100
- else
119
+ if ( error != null )
101
120
{
102
- _tail . Next = _pinned ;
103
- _tail = _pinned ;
121
+ _awaitableError = error ;
104
122
}
105
123
106
- _pinned = null ;
107
- }
108
-
109
- if ( count == 0 )
110
- {
111
- RemoteIntakeFin = true ;
124
+ Complete ( ) ;
112
125
}
113
- if ( error != null )
114
- {
115
- _awaitableError = error ;
116
- }
117
-
118
- Complete ( ) ;
119
126
}
120
127
121
128
public void IncomingDeferred ( )
@@ -162,40 +169,43 @@ public void ConsumingComplete(
162
169
MemoryPoolIterator consumed ,
163
170
MemoryPoolIterator examined )
164
171
{
165
- MemoryPoolBlock returnStart = null ;
166
- MemoryPoolBlock returnEnd = null ;
167
-
168
- if ( ! consumed . IsDefault )
172
+ lock ( _sync )
169
173
{
170
- returnStart = _head ;
171
- returnEnd = consumed . Block ;
172
- _head = consumed . Block ;
173
- _head . Start = consumed . Index ;
174
- }
174
+ MemoryPoolBlock returnStart = null ;
175
+ MemoryPoolBlock returnEnd = null ;
175
176
176
- if ( ! examined . IsDefault &&
177
- examined . IsEnd &&
178
- RemoteIntakeFin == false &&
179
- _awaitableError == null )
180
- {
181
- _manualResetEvent . Reset ( ) ;
177
+ if ( ! consumed . IsDefault )
178
+ {
179
+ returnStart = _head ;
180
+ returnEnd = consumed . Block ;
181
+ _head = consumed . Block ;
182
+ _head . Start = consumed . Index ;
183
+ }
182
184
183
- Interlocked . CompareExchange (
184
- ref _awaitableState ,
185
- _awaitableIsNotCompleted ,
186
- _awaitableIsCompleted ) ;
187
- }
185
+ if ( ! examined . IsDefault &&
186
+ examined . IsEnd &&
187
+ RemoteIntakeFin == false &&
188
+ _awaitableError == null )
189
+ {
190
+ _manualResetEvent . Reset ( ) ;
188
191
189
- while ( returnStart != returnEnd )
190
- {
191
- var returnBlock = returnStart ;
192
- returnStart = returnStart . Next ;
193
- returnBlock . Pool . Return ( returnBlock ) ;
194
- }
192
+ Interlocked . CompareExchange (
193
+ ref _awaitableState ,
194
+ _awaitableIsNotCompleted ,
195
+ _awaitableIsCompleted ) ;
196
+ }
195
197
196
- if ( Interlocked . CompareExchange ( ref _consumingState , 0 , 1 ) != 1 )
197
- {
198
- throw new InvalidOperationException ( "No ongoing consuming operation to complete." ) ;
198
+ while ( returnStart != returnEnd )
199
+ {
200
+ var returnBlock = returnStart ;
201
+ returnStart = returnStart . Next ;
202
+ returnBlock . Pool . Return ( returnBlock ) ;
203
+ }
204
+
205
+ if ( Interlocked . CompareExchange ( ref _consumingState , 0 , 1 ) != 1 )
206
+ {
207
+ throw new InvalidOperationException ( "No ongoing consuming operation to complete." ) ;
208
+ }
199
209
}
200
210
}
201
211
0 commit comments