@@ -163,7 +163,7 @@ void shouldBeAbleToRegisterACustomNode() throws URISyntaxException {
163
163
c -> new Session (
164
164
new SessionId (UUID .randomUUID ()), sessionUri , stereotype , c , Instant .now ()));
165
165
166
- Distributor local = new LocalDistributor (
166
+ try ( LocalDistributor local = new LocalDistributor (
167
167
tracer ,
168
168
bus ,
169
169
new PassthroughHttpClient .Factory (node ),
@@ -174,16 +174,19 @@ void shouldBeAbleToRegisterACustomNode() throws URISyntaxException {
174
174
Duration .ofMinutes (5 ),
175
175
false ,
176
176
Duration .ofSeconds (5 ),
177
- newSessionThreadPoolSize );
177
+ newSessionThreadPoolSize )) {
178
178
179
- distributor = new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl , registrationSecret );
179
+ distributor =
180
+ new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl ,
181
+ registrationSecret );
180
182
181
- distributor .add (node );
183
+ distributor .add (node );
182
184
183
- wait .until (obj -> distributor .getStatus ().hasCapacity ());
185
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
184
186
185
- NodeStatus status = getOnlyElement (distributor .getStatus ().getNodes ());
186
- assertEquals (1 , getStereotypes (status ).get (CAPS ).intValue ());
187
+ NodeStatus status = getOnlyElement (distributor .getStatus ().getNodes ());
188
+ assertEquals (1 , getStereotypes (status ).get (CAPS ).intValue ());
189
+ }
187
190
}
188
191
189
192
@ Test
@@ -197,7 +200,7 @@ void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxException
197
200
(id , caps ) -> new Session (id , sessionUri , stereotype , caps , Instant .now ())))
198
201
.build ();
199
202
200
- Distributor local = new LocalDistributor (
203
+ try ( LocalDistributor local = new LocalDistributor (
201
204
tracer ,
202
205
bus ,
203
206
new PassthroughHttpClient .Factory (node ),
@@ -208,16 +211,19 @@ void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxException
208
211
Duration .ofMinutes (5 ),
209
212
false ,
210
213
Duration .ofSeconds (5 ),
211
- newSessionThreadPoolSize );
214
+ newSessionThreadPoolSize )) {
212
215
213
- distributor = new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl , registrationSecret );
216
+ distributor =
217
+ new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl ,
218
+ registrationSecret );
214
219
215
- bus .fire (new NodeStatusEvent (node .getStatus ()));
220
+ bus .fire (new NodeStatusEvent (node .getStatus ()));
216
221
217
- wait .until (obj -> distributor .getStatus ().hasCapacity ());
222
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
218
223
219
- NodeStatus status = getOnlyElement (distributor .getStatus ().getNodes ());
220
- assertEquals (1 , getStereotypes (status ).get (CAPS ).intValue ());
224
+ NodeStatus status = getOnlyElement (distributor .getStatus ().getNodes ());
225
+ assertEquals (1 , getStereotypes (status ).get (CAPS ).intValue ());
226
+ }
221
227
}
222
228
223
229
@ Test
@@ -241,7 +247,7 @@ void shouldKeepOnlyOneNodeWhenTwoRegistrationsHaveTheSameUriByListeningForEvents
241
247
handler .addHandler (firstNode );
242
248
handler .addHandler (secondNode );
243
249
244
- Distributor local = new LocalDistributor (
250
+ try ( LocalDistributor local = new LocalDistributor (
245
251
tracer ,
246
252
bus ,
247
253
new PassthroughHttpClient .Factory (handler ),
@@ -252,18 +258,21 @@ void shouldKeepOnlyOneNodeWhenTwoRegistrationsHaveTheSameUriByListeningForEvents
252
258
Duration .ofMinutes (5 ),
253
259
false ,
254
260
Duration .ofSeconds (5 ),
255
- newSessionThreadPoolSize );
261
+ newSessionThreadPoolSize )) {
256
262
257
- distributor = new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl , registrationSecret );
263
+ distributor =
264
+ new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl ,
265
+ registrationSecret );
258
266
259
- bus .fire (new NodeStatusEvent (firstNode .getStatus ()));
260
- bus .fire (new NodeStatusEvent (secondNode .getStatus ()));
267
+ bus .fire (new NodeStatusEvent (firstNode .getStatus ()));
268
+ bus .fire (new NodeStatusEvent (secondNode .getStatus ()));
261
269
262
- wait .until (obj -> distributor .getStatus ());
270
+ wait .until (obj -> distributor .getStatus ());
263
271
264
- Set <NodeStatus > nodes = distributor .getStatus ().getNodes ();
272
+ Set <NodeStatus > nodes = distributor .getStatus ().getNodes ();
265
273
266
- assertEquals (1 , nodes .size ());
274
+ assertEquals (1 , nodes .size ());
275
+ }
267
276
}
268
277
269
278
@ Test
@@ -278,7 +287,7 @@ void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange()
278
287
(id , caps ) -> new Session (id , sessionUri , stereotype , caps , Instant .now ())))
279
288
.build ();
280
289
281
- Distributor local = new LocalDistributor (
290
+ try ( LocalDistributor local = new LocalDistributor (
282
291
tracer ,
283
292
bus ,
284
293
new PassthroughHttpClient .Factory (node ),
@@ -289,40 +298,43 @@ void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange()
289
298
Duration .ofMinutes (5 ),
290
299
false ,
291
300
Duration .ofSeconds (5 ),
292
- newSessionThreadPoolSize );
301
+ newSessionThreadPoolSize )) {
293
302
294
- distributor = new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl , registrationSecret );
303
+ distributor =
304
+ new RemoteDistributor (tracer , new PassthroughHttpClient .Factory (local ), externalUrl ,
305
+ registrationSecret );
295
306
296
- bus .fire (new NodeStatusEvent (node .getStatus ()));
307
+ bus .fire (new NodeStatusEvent (node .getStatus ()));
297
308
298
- // Start empty
299
- wait .until (obj -> distributor .getStatus ().hasCapacity ());
309
+ // Start empty
310
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
300
311
301
- NodeStatus nodeStatus = getOnlyElement (distributor .getStatus ().getNodes ());
302
- assertEquals (1 , getStereotypes (nodeStatus ).get (CAPS ).intValue ());
303
-
304
- // Craft a status that makes it look like the node is busy, and post it on the bus.
305
- NodeStatus status = node .getStatus ();
306
- NodeStatus crafted = new NodeStatus (
307
- status .getNodeId (),
308
- status .getExternalUri (),
309
- status .getMaxSessionCount (),
310
- ImmutableSet .of (
311
- new Slot (
312
- new SlotId (status .getNodeId (), UUID .randomUUID ()),
313
- CAPS ,
314
- Instant .now (),
315
- new Session (
316
- new SessionId (UUID .randomUUID ()), sessionUri , CAPS , CAPS , Instant .now ()))),
317
- UP ,
318
- Duration .ofSeconds (10 ),
319
- status .getVersion (),
320
- status .getOsInfo ());
321
-
322
- bus .fire (new NodeStatusEvent (crafted ));
323
-
324
- // We claimed the only slot is filled. Life is good.
325
- wait .until (obj -> !distributor .getStatus ().hasCapacity ());
312
+ NodeStatus nodeStatus = getOnlyElement (distributor .getStatus ().getNodes ());
313
+ assertEquals (1 , getStereotypes (nodeStatus ).get (CAPS ).intValue ());
314
+
315
+ // Craft a status that makes it look like the node is busy, and post it on the bus.
316
+ NodeStatus status = node .getStatus ();
317
+ NodeStatus crafted = new NodeStatus (
318
+ status .getNodeId (),
319
+ status .getExternalUri (),
320
+ status .getMaxSessionCount (),
321
+ ImmutableSet .of (
322
+ new Slot (
323
+ new SlotId (status .getNodeId (), UUID .randomUUID ()),
324
+ CAPS ,
325
+ Instant .now (),
326
+ new Session (
327
+ new SessionId (UUID .randomUUID ()), sessionUri , CAPS , CAPS , Instant .now ()))),
328
+ UP ,
329
+ Duration .ofSeconds (10 ),
330
+ status .getVersion (),
331
+ status .getOsInfo ());
332
+
333
+ bus .fire (new NodeStatusEvent (crafted ));
334
+
335
+ // We claimed the only slot is filled. Life is good.
336
+ wait .until (obj -> !distributor .getStatus ().hasCapacity ());
337
+ }
326
338
}
327
339
328
340
private Map <Capabilities , Integer > getStereotypes (NodeStatus status ) {
0 commit comments