33
33
import java .net .InetSocketAddress ;
34
34
import java .util .ArrayList ;
35
35
import java .util .Collections ;
36
+ import java .util .HashSet ;
36
37
import java .util .List ;
37
38
import java .util .Set ;
38
39
import java .util .concurrent .CopyOnWriteArraySet ;
43
44
import java .util .concurrent .atomic .AtomicInteger ;
44
45
import java .util .concurrent .atomic .AtomicLong ;
45
46
import java .util .concurrent .atomic .AtomicReference ;
47
+ import java .util .function .LongSupplier ;
46
48
47
49
public class RestCancellableNodeClientTests extends ESTestCase {
48
50
@@ -150,8 +152,42 @@ public void testChannelAlreadyClosed() {
150
152
}
151
153
}
152
154
155
+ public void testConcurrentExecuteAndClose () {
156
+ final TestClient testClient = new TestClient (Settings .EMPTY , threadPool , true );
157
+ int initialHttpChannels = RestCancellableNodeClient .getNumChannels ();
158
+ int numTasks = randomIntBetween (1 , 30 );
159
+ TestHttpChannel channel = new TestHttpChannel ();
160
+ final CountDownLatch startLatch = new CountDownLatch (1 );
161
+ final CountDownLatch doneLatch = new CountDownLatch (numTasks + 1 );
162
+ final Set <TaskId > expectedTasks = new HashSet <>(numTasks );
163
+ for (int j = 0 ; j < numTasks ; j ++) {
164
+ RestCancellableNodeClient client = new RestCancellableNodeClient (testClient , channel );
165
+ threadPool .generic ().execute (() -> {
166
+ client .execute (SearchAction .INSTANCE , new SearchRequest (), ActionListener .wrap (ESTestCase ::fail ));
167
+ startLatch .countDown ();
168
+ doneLatch .countDown ();
169
+ });
170
+ expectedTasks .add (new TaskId (testClient .getLocalNodeId (), j ));
171
+ }
172
+ threadPool .generic ().execute (() -> {
173
+ try {
174
+ safeAwait (startLatch );
175
+ channel .awaitClose ();
176
+ } catch (InterruptedException e ) {
177
+ Thread .currentThread ().interrupt ();
178
+ throw new AssertionError (e );
179
+ } finally {
180
+ doneLatch .countDown ();
181
+ }
182
+ });
183
+ safeAwait (doneLatch );
184
+ assertEquals (initialHttpChannels , RestCancellableNodeClient .getNumChannels ());
185
+ assertEquals (expectedTasks , testClient .cancelledTasks );
186
+ }
187
+
153
188
private static class TestClient extends NodeClient {
154
- private final AtomicLong counter = new AtomicLong (0 );
189
+ private final LongSupplier searchTaskIdGenerator = new AtomicLong (0 )::getAndIncrement ;
190
+ private final LongSupplier cancelTaskIdGenerator = new AtomicLong (1000 )::getAndIncrement ;
155
191
private final Set <TaskId > cancelledTasks = new CopyOnWriteArraySet <>();
156
192
private final AtomicInteger searchRequests = new AtomicInteger (0 );
157
193
private final boolean timeout ;
@@ -171,7 +207,13 @@ public <Request extends ActionRequest, Response extends ActionResponse> Task exe
171
207
case CancelTasksAction .NAME :
172
208
CancelTasksRequest cancelTasksRequest = (CancelTasksRequest ) request ;
173
209
assertTrue ("tried to cancel the same task more than once" , cancelledTasks .add (cancelTasksRequest .getTaskId ()));
174
- Task task = request .createTask (counter .getAndIncrement (), "cancel_task" , action .name (), null , Collections .emptyMap ());
210
+ Task task = request .createTask (
211
+ cancelTaskIdGenerator .getAsLong (),
212
+ "cancel_task" ,
213
+ action .name (),
214
+ null ,
215
+ Collections .emptyMap ()
216
+ );
175
217
if (randomBoolean ()) {
176
218
listener .onResponse (null );
177
219
} else {
@@ -182,7 +224,13 @@ public <Request extends ActionRequest, Response extends ActionResponse> Task exe
182
224
return task ;
183
225
case SearchAction .NAME :
184
226
searchRequests .incrementAndGet ();
185
- Task searchTask = request .createTask (counter .getAndIncrement (), "search" , action .name (), null , Collections .emptyMap ());
227
+ Task searchTask = request .createTask (
228
+ searchTaskIdGenerator .getAsLong (),
229
+ "search" ,
230
+ action .name (),
231
+ null ,
232
+ Collections .emptyMap ()
233
+ );
186
234
if (timeout == false ) {
187
235
if (rarely ()) {
188
236
// make sure that search is sometimes also called from the same thread before the task is returned
@@ -193,7 +241,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> Task exe
193
241
}
194
242
return searchTask ;
195
243
default :
196
- throw new UnsupportedOperationException ( );
244
+ throw new AssertionError ( "unexpected action " + action . name () );
197
245
}
198
246
199
247
}
@@ -224,9 +272,7 @@ public InetSocketAddress getRemoteAddress() {
224
272
225
273
@ Override
226
274
public void close () {
227
- if (open .compareAndSet (true , false ) == false ) {
228
- throw new IllegalStateException ("channel already closed!" );
229
- }
275
+ assertTrue ("HttpChannel is already closed" , open .compareAndSet (true , false ));
230
276
ActionListener <Void > listener = closeListener .get ();
231
277
if (listener != null ) {
232
278
boolean failure = randomBoolean ();
@@ -242,6 +288,7 @@ public void close() {
242
288
}
243
289
244
290
private void awaitClose () throws InterruptedException {
291
+ assertNotNull ("must set closeListener before calling awaitClose" , closeListener .get ());
245
292
close ();
246
293
closeLatch .await ();
247
294
}
@@ -258,7 +305,7 @@ public void addCloseListener(ActionListener<Void> listener) {
258
305
listener .onResponse (null );
259
306
} else {
260
307
if (closeListener .compareAndSet (null , listener ) == false ) {
261
- throw new IllegalStateException ("close listener already set, only one is allowed!" );
308
+ throw new AssertionError ("close listener already set, only one is allowed!" );
262
309
}
263
310
}
264
311
}
0 commit comments