diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java index 00f7ed1fe5a2f..fc4df8ea77288 100644 --- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java @@ -122,7 +122,7 @@ @ManagedService(objectName = "org.seleniumhq.grid:type=Distributor,name=LocalDistributor", description = "Grid 4 node distributor") -public class LocalDistributor extends Distributor implements Closeable { +public class LocalDistributor extends Distributor implements AutoCloseable { private static final Logger LOG = Logger.getLogger(LocalDistributor.class.getName()); diff --git a/java/test/org/openqa/selenium/events/ZeroMqEventBusTest.java b/java/test/org/openqa/selenium/events/ZeroMqEventBusTest.java index 812e2b5239101..0b08ed5081ab1 100644 --- a/java/test/org/openqa/selenium/events/ZeroMqEventBusTest.java +++ b/java/test/org/openqa/selenium/events/ZeroMqEventBusTest.java @@ -33,31 +33,37 @@ class ZeroMqEventBusTest { @Test - void shouldEnsureMessagesRequireSecret() throws InterruptedException, ExecutionException, TimeoutException { + void shouldEnsureMessagesRequireSecret() + throws InterruptedException, ExecutionException, TimeoutException { String publish = "inproc://zmqebt-publish"; String subscribe = "inproc://zmqebt-subscribe"; ZContext context = new ZContext(); - EventBus good = ZeroMqEventBus.create(context, publish, subscribe, true, new Secret("cheese")); - EventBus alsoGood = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("cheese")); - EventBus bad = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("peas")); + try ( + EventBus good = ZeroMqEventBus.create(context, publish, subscribe, true, new Secret("cheese")); + EventBus alsoGood = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("cheese")); + EventBus bad = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("peas"))) { - RuntimeException errorException = new RuntimeException("oh noes!"); - EventName eventName = new EventName("evt"); - CompletableFuture future = new CompletableFuture<>(); - good.addListener(new EventListener<>(eventName, String.class, future::complete)); - good.addListener(ZeroMqEventBus.onRejectedEvent(evt -> future.completeExceptionally(errorException))); + RuntimeException errorException = new RuntimeException("oh noes!"); + EventName eventName = new EventName("evt"); + CompletableFuture future = new CompletableFuture<>(); + good.addListener(new EventListener<>(eventName, String.class, future::complete)); + good.addListener( + ZeroMqEventBus.onRejectedEvent(evt -> future.completeExceptionally(errorException))); - alsoGood.fire(new Event(eventName, "tasty")); + alsoGood.fire(new Event(eventName, "tasty")); - String value = future.get(5, SECONDS); - assertThat(value).isEqualTo("tasty"); + String value = future.get(5, SECONDS); + assertThat(value).isEqualTo("tasty"); - CompletableFuture badFuture = new CompletableFuture<>(); - good.addListener(new EventListener<>(eventName, String.class, badFuture::complete)); - good.addListener(ZeroMqEventBus.onRejectedEvent(evt -> badFuture.completeExceptionally(errorException))); - bad.fire(new Event(eventName, "not tasty")); + CompletableFuture badFuture = new CompletableFuture<>(); + good.addListener(new EventListener<>(eventName, String.class, badFuture::complete)); + good.addListener( + ZeroMqEventBus.onRejectedEvent(evt -> badFuture.completeExceptionally(errorException))); + bad.fire(new Event(eventName, "not tasty")); - Assertions.assertThatThrownBy(() -> badFuture.get(5, SECONDS)).getCause().isSameAs(errorException); + Assertions.assertThatThrownBy(() -> badFuture.get(5, SECONDS)).getCause() + .isSameAs(errorException); + } } } diff --git a/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java b/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java index 66b8674da3dfc..5d6cb34f27198 100644 --- a/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java +++ b/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java @@ -163,7 +163,7 @@ void shouldBeAbleToRegisterACustomNode() throws URISyntaxException { c -> new Session( new SessionId(UUID.randomUUID()), sessionUri, stereotype, c, Instant.now())); - Distributor local = new LocalDistributor( + try (LocalDistributor local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -174,16 +174,19 @@ void shouldBeAbleToRegisterACustomNode() throws URISyntaxException { Duration.ofMinutes(5), false, Duration.ofSeconds(5), - newSessionThreadPoolSize); + newSessionThreadPoolSize)) { - distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret); + distributor = + new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, + registrationSecret); - distributor.add(node); + distributor.add(node); - wait.until(obj -> distributor.getStatus().hasCapacity()); + wait.until(obj -> distributor.getStatus().hasCapacity()); - NodeStatus status = getOnlyElement(distributor.getStatus().getNodes()); - assertEquals(1, getStereotypes(status).get(CAPS).intValue()); + NodeStatus status = getOnlyElement(distributor.getStatus().getNodes()); + assertEquals(1, getStereotypes(status).get(CAPS).intValue()); + } } @Test @@ -197,7 +200,7 @@ void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxException (id, caps) -> new Session(id, sessionUri, stereotype, caps, Instant.now()))) .build(); - Distributor local = new LocalDistributor( + try (LocalDistributor local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -208,16 +211,19 @@ void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxException Duration.ofMinutes(5), false, Duration.ofSeconds(5), - newSessionThreadPoolSize); + newSessionThreadPoolSize)) { - distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret); + distributor = + new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, + registrationSecret); - bus.fire(new NodeStatusEvent(node.getStatus())); + bus.fire(new NodeStatusEvent(node.getStatus())); - wait.until(obj -> distributor.getStatus().hasCapacity()); + wait.until(obj -> distributor.getStatus().hasCapacity()); - NodeStatus status = getOnlyElement(distributor.getStatus().getNodes()); - assertEquals(1, getStereotypes(status).get(CAPS).intValue()); + NodeStatus status = getOnlyElement(distributor.getStatus().getNodes()); + assertEquals(1, getStereotypes(status).get(CAPS).intValue()); + } } @Test @@ -241,7 +247,7 @@ void shouldKeepOnlyOneNodeWhenTwoRegistrationsHaveTheSameUriByListeningForEvents handler.addHandler(firstNode); handler.addHandler(secondNode); - Distributor local = new LocalDistributor( + try (LocalDistributor local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -252,18 +258,21 @@ void shouldKeepOnlyOneNodeWhenTwoRegistrationsHaveTheSameUriByListeningForEvents Duration.ofMinutes(5), false, Duration.ofSeconds(5), - newSessionThreadPoolSize); + newSessionThreadPoolSize)) { - distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret); + distributor = + new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, + registrationSecret); - bus.fire(new NodeStatusEvent(firstNode.getStatus())); - bus.fire(new NodeStatusEvent(secondNode.getStatus())); + bus.fire(new NodeStatusEvent(firstNode.getStatus())); + bus.fire(new NodeStatusEvent(secondNode.getStatus())); - wait.until(obj -> distributor.getStatus()); + wait.until(obj -> distributor.getStatus()); - Set nodes = distributor.getStatus().getNodes(); + Set nodes = distributor.getStatus().getNodes(); - assertEquals(1, nodes.size()); + assertEquals(1, nodes.size()); + } } @Test @@ -278,7 +287,7 @@ void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange() (id, caps) -> new Session(id, sessionUri, stereotype, caps, Instant.now()))) .build(); - Distributor local = new LocalDistributor( + try (LocalDistributor local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -289,40 +298,43 @@ void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange() Duration.ofMinutes(5), false, Duration.ofSeconds(5), - newSessionThreadPoolSize); + newSessionThreadPoolSize)) { - distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret); + distributor = + new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, + registrationSecret); - bus.fire(new NodeStatusEvent(node.getStatus())); + bus.fire(new NodeStatusEvent(node.getStatus())); - // Start empty - wait.until(obj -> distributor.getStatus().hasCapacity()); + // Start empty + wait.until(obj -> distributor.getStatus().hasCapacity()); - NodeStatus nodeStatus = getOnlyElement(distributor.getStatus().getNodes()); - assertEquals(1, getStereotypes(nodeStatus).get(CAPS).intValue()); - - // Craft a status that makes it look like the node is busy, and post it on the bus. - NodeStatus status = node.getStatus(); - NodeStatus crafted = new NodeStatus( - status.getNodeId(), - status.getExternalUri(), - status.getMaxSessionCount(), - ImmutableSet.of( - new Slot( - new SlotId(status.getNodeId(), UUID.randomUUID()), - CAPS, - Instant.now(), - new Session( - new SessionId(UUID.randomUUID()), sessionUri, CAPS, CAPS, Instant.now()))), - UP, - Duration.ofSeconds(10), - status.getVersion(), - status.getOsInfo()); - - bus.fire(new NodeStatusEvent(crafted)); - - // We claimed the only slot is filled. Life is good. - wait.until(obj -> !distributor.getStatus().hasCapacity()); + NodeStatus nodeStatus = getOnlyElement(distributor.getStatus().getNodes()); + assertEquals(1, getStereotypes(nodeStatus).get(CAPS).intValue()); + + // Craft a status that makes it look like the node is busy, and post it on the bus. + NodeStatus status = node.getStatus(); + NodeStatus crafted = new NodeStatus( + status.getNodeId(), + status.getExternalUri(), + status.getMaxSessionCount(), + ImmutableSet.of( + new Slot( + new SlotId(status.getNodeId(), UUID.randomUUID()), + CAPS, + Instant.now(), + new Session( + new SessionId(UUID.randomUUID()), sessionUri, CAPS, CAPS, Instant.now()))), + UP, + Duration.ofSeconds(10), + status.getVersion(), + status.getOsInfo()); + + bus.fire(new NodeStatusEvent(crafted)); + + // We claimed the only slot is filled. Life is good. + wait.until(obj -> !distributor.getStatus().hasCapacity()); + } } private Map getStereotypes(NodeStatus status) { diff --git a/java/test/org/openqa/selenium/grid/distributor/DistributorTest.java b/java/test/org/openqa/selenium/grid/distributor/DistributorTest.java index 65d7251fae087..ff8de65e48fbb 100644 --- a/java/test/org/openqa/selenium/grid/distributor/DistributorTest.java +++ b/java/test/org/openqa/selenium/grid/distributor/DistributorTest.java @@ -107,7 +107,7 @@ class DistributorTest { private final Wait wait = new FluentWait<>(new Object()).withTimeout(Duration.ofSeconds(5)); private Tracer tracer; private EventBus bus; - private Distributor local; + private LocalDistributor local; private Capabilities stereotype; private Capabilities caps; private URI nodeUri; @@ -147,6 +147,9 @@ public void setUp() throws URISyntaxException { @AfterEach public void cleanUp() { bus.close(); + if (local !=null) { + local.close(); + } } @Test @@ -215,7 +218,7 @@ void shouldBeAbleToAddANodeAndCreateASession() { new TestSessionFactory((id, c) -> new Session(id, nodeUri, stereotype, c, Instant.now()))) .build(); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -227,14 +230,14 @@ void shouldBeAbleToAddANodeAndCreateASession() { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); MutableCapabilities sessionCaps = new MutableCapabilities(caps); sessionCaps.setCapability("sausages", "gravy"); Either result = - distributor.newSession(createRequest(sessionCaps)); + local.newSession(createRequest(sessionCaps)); assertThatEither(result).isRight(); Session session = result.right().getSession(); assertThat(session.getCapabilities().getCapability("sausages")) @@ -259,7 +262,7 @@ void creatingASessionAddsItToTheSessionMap() { new TestSessionFactory((id, c) -> new Session(id, nodeUri, stereotype, c, Instant.now()))) .build(); - LocalDistributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -271,14 +274,14 @@ void creatingASessionAddsItToTheSessionMap() { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); MutableCapabilities sessionCaps = new MutableCapabilities(caps); sessionCaps.setCapability("sausages", "gravy"); Either result = - distributor.newSession(createRequest(sessionCaps)); + local.newSession(createRequest(sessionCaps)); assertThatEither(result).isRight(); Session returned = result.right().getSession(); Session session = sessions.get(returned.getId()); @@ -304,7 +307,7 @@ void shouldBeAbleToRemoveANode() throws MalformedURLException { new TestSessionFactory((id, c) -> new Session(id, nodeUri, stereotype, c, Instant.now()))) .build(); - Distributor local = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -345,7 +348,7 @@ void testDrainingNodeDoesNotAcceptNewSessions() { new TestSessionFactory((id, c) -> new Session(id, nodeUri, stereotype, c, Instant.now()))) .build(); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -357,12 +360,12 @@ void testDrainingNodeDoesNotAcceptNewSessions() { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - distributor.drain(node.getId()); + local.add(node); + local.drain(node.getId()); assertTrue(node.isDraining()); - Either result = distributor.newSession(createRequest(caps)); + Either result = local.newSession(createRequest(caps)); assertThatEither(result).isLeft(); } @@ -385,7 +388,7 @@ void testDrainedNodeShutsDownOnceEmpty() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); bus.addListener(NodeDrainComplete.listener(ignored -> latch.countDown())); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -397,19 +400,19 @@ void testDrainedNodeShutsDownOnceEmpty() throws InterruptedException { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); - distributor.drain(node.getId()); + local.drain(node.getId()); latch.await(5, TimeUnit.SECONDS); assertThat(latch.getCount()).isZero(); - assertThat(distributor.getStatus().getNodes()).isEmpty(); + assertThat(local.getStatus().getNodes()).isEmpty(); Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isLeft(); } @@ -432,7 +435,7 @@ void drainedNodeDoesNotShutDownIfNotEmpty() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); bus.addListener(NodeDrainComplete.listener(ignored -> latch.countDown())); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -444,20 +447,20 @@ void drainedNodeDoesNotShutDownIfNotEmpty() throws InterruptedException { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); Either session = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(session).isRight(); - distributor.drain(node.getId()); + local.drain(node.getId()); latch.await(5, TimeUnit.SECONDS); assertThat(latch.getCount()).isEqualTo(1); - assertThat(distributor.getStatus().getNodes().size()).isEqualTo(1); + assertThat(local.getStatus().getNodes().size()).isEqualTo(1); } @Test @@ -482,7 +485,7 @@ void drainedNodeShutsDownAfterSessionsFinish() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); bus.addListener(NodeDrainComplete.listener(ignored -> latch.countDown())); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -494,28 +497,28 @@ void drainedNodeShutsDownAfterSessionsFinish() throws InterruptedException { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); Either firstResponse = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); Either secondResponse = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); - distributor.drain(node.getId()); + local.drain(node.getId()); - assertThat(distributor.getStatus().getNodes().size()).isEqualTo(1); + assertThat(local.getStatus().getNodes().size()).isEqualTo(1); node.stop(firstResponse.right().getSession().getId()); node.stop(secondResponse.right().getSession().getId()); latch.await(5, TimeUnit.SECONDS); - waitTillNodesAreRemoved(distributor); + waitTillNodesAreRemoved(local); assertThat(latch.getCount()).isZero(); - assertThat(distributor.getStatus().getNodes()).isEmpty(); + assertThat(local.getStatus().getNodes()).isEmpty(); } @Test @@ -572,7 +575,7 @@ void theMostLightlyLoadedNodeIsSelectedFirst() { handler.addHandler(medium); handler.addHandler(heavy); handler.addHandler(massive); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -589,13 +592,13 @@ void theMostLightlyLoadedNodeIsSelectedFirst() { .add(lightest) .add(massive); - wait.until(obj -> distributor.getStatus().getNodes().size() == 4); - wait.until(ignored -> distributor.getStatus().getNodes().stream() + wait.until(obj -> local.getStatus().getNodes().size() == 4); + wait.until(ignored -> local.getStatus().getNodes().stream() .allMatch(node -> node.getAvailability() == UP && node.hasCapacity())); - wait.until(obj -> distributor.getStatus().hasCapacity()); + wait.until(obj -> local.getStatus().hasCapacity()); Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isRight(); Session session = result.right().getSession(); assertThat(session.getUri()).isEqualTo(lightest.getStatus().getExternalUri()); @@ -617,7 +620,7 @@ void shouldUseLastSessionCreatedTimeAsTieBreaker() { handler.addHandler(sessions); handler.addHandler(leastRecent); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -630,17 +633,17 @@ void shouldUseLastSessionCreatedTimeAsTieBreaker() { Duration.ofSeconds(5), newSessionThreadPoolSize) .add(leastRecent); - waitToHaveCapacity(distributor); + waitToHaveCapacity(local); - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); Node middle = createNode(caps, 5, 0); handler.addHandler(middle); - distributor.add(middle); - waitForAllNodesToHaveCapacity(distributor, 2); + local.add(middle); + waitForAllNodesToHaveCapacity(local, 2); Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isRight(); Session session = result.right().getSession(); // Least lightly loaded is middle @@ -648,10 +651,10 @@ void shouldUseLastSessionCreatedTimeAsTieBreaker() { Node mostRecent = createNode(caps, 5, 0); handler.addHandler(mostRecent); - distributor.add(mostRecent); - waitForAllNodesToHaveCapacity(distributor, 3); + local.add(mostRecent); + waitForAllNodesToHaveCapacity(local, 3); - result = distributor.newSession(createRequest(caps)); + result = local.newSession(createRequest(caps)); assertThatEither(result).isRight(); session = result.right().getSession(); // Least lightly loaded is most recent @@ -663,7 +666,7 @@ void shouldUseLastSessionCreatedTimeAsTieBreaker() { assertThat(getFreeStereotypeCounts(middle.getStatus())).isEqualTo(expected); // All nodes are now equally loaded. We should be going in time order now - result = distributor.newSession(createRequest(caps)); + result = local.newSession(createRequest(caps)); assertThatEither(result).isRight(); session = result.right().getSession(); assertThat(session.getUri()).isEqualTo(leastRecent.getStatus().getExternalUri()); @@ -703,7 +706,7 @@ void shouldIncludeHostsThatAreUpInHostList() { .build(); handler.addHandler(alwaysDown); - LocalDistributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -715,13 +718,13 @@ void shouldIncludeHostsThatAreUpInHostList() { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - handler.addHandler(distributor); - distributor.add(alwaysDown); - waitForAllNodesToMeetCondition(distributor, 1, DOWN); + handler.addHandler(local); + local.add(alwaysDown); + waitForAllNodesToMeetCondition(local, 1, DOWN); // Should be unable to create a session because the node is down. Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isLeft(); Node alwaysUp = LocalNode.builder(tracer, bus, uri, uri, registrationSecret) @@ -733,10 +736,10 @@ void shouldIncludeHostsThatAreUpInHostList() { .build(); handler.addHandler(alwaysUp); - distributor.add(alwaysUp); - waitToHaveCapacity(distributor); + local.add(alwaysUp); + waitToHaveCapacity(local); - result = distributor.newSession(createRequest(caps)); + result = local.newSession(createRequest(caps)); assertThatEither(result).isRight(); } @@ -755,7 +758,7 @@ void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() { .add(caps, new TestSessionFactory((id, c) -> new Session( id, nodeUri, stereotype, c, Instant.now()))) .build(); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -768,16 +771,16 @@ void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() { Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); // Use up the one slot available Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isRight(); // Now try and create a session. - result = distributor.newSession(createRequest(caps)); + result = local.newSession(createRequest(caps)); assertThatEither(result).isLeft(); } @@ -797,7 +800,7 @@ void shouldReleaseSlotOnceSessionEnds() { id, nodeUri, stereotype, c, Instant.now()))) .build(); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -809,12 +812,12 @@ void shouldReleaseSlotOnceSessionEnds() { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); // Use up the one slot available Session session; - Either result = distributor.newSession(createRequest(caps)); + Either result = local.newSession(createRequest(caps)); assertThatEither(result).isRight(); session = result.right().getSession(); // Make sure the session map has the session @@ -833,10 +836,10 @@ void shouldReleaseSlotOnceSessionEnds() { } }); - waitToHaveCapacity(distributor); + waitToHaveCapacity(local); // And we should now be able to create another session. - result = distributor.newSession(createRequest(caps)); + result = local.newSession(createRequest(caps)); assertThatEither(result).isRight(); } @@ -896,7 +899,7 @@ void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() { })) .build(); - Distributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(node), @@ -908,14 +911,14 @@ void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() { false, Duration.ofSeconds(5), newSessionThreadPoolSize); - distributor.add(node); - waitToHaveCapacity(distributor); + local.add(node); + waitToHaveCapacity(local); Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isLeft(); - assertThat(distributor.getStatus().hasCapacity()).isTrue(); + assertThat(local.getStatus().hasCapacity()).isTrue(); } @Test @@ -943,7 +946,7 @@ void shouldReturnNodesThatWereDownToPoolOfNodesOnceTheyMarkTheirHealthCheckPasse .build(); handler.addHandler(node); - LocalDistributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -955,22 +958,22 @@ void shouldReturnNodesThatWereDownToPoolOfNodesOnceTheyMarkTheirHealthCheckPasse false, Duration.ofSeconds(5), newSessionThreadPoolSize); - handler.addHandler(distributor); - distributor.add(node); - waitForAllNodesToMeetCondition(distributor, 1, DOWN); + handler.addHandler(local); + local.add(node); + waitForAllNodesToMeetCondition(local, 1, DOWN); // Should be unable to create a session because the node is down. Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isLeft(); // Mark the node as being up isUp.set(UP); // Kick the machinery to ensure that everything is fine. - distributor.refresh(); + local.refresh(); // Because the node is now up and running, we should now be able to create a session - result = distributor.newSession(createRequest(caps)); + result = local.newSession(createRequest(caps)); assertThatEither(result).isRight(); } @@ -1009,7 +1012,7 @@ void shouldNotRemoveNodeWhoseHealthCheckPassesBeforeThreshold() .build(); handler.addHandler(node); - LocalDistributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -1021,15 +1024,15 @@ void shouldNotRemoveNodeWhoseHealthCheckPassesBeforeThreshold() false, Duration.ofSeconds(5), newSessionThreadPoolSize); - handler.addHandler(distributor); - distributor.add(node); + handler.addHandler(local); + local.add(node); latch.await(60, TimeUnit.SECONDS); - waitToHaveCapacity(distributor); + waitToHaveCapacity(local); Either result = - distributor.newSession(createRequest(caps)); + local.newSession(createRequest(caps)); assertThatEither(result).isRight(); } @@ -1070,7 +1073,7 @@ void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() { registrationSecret, 5); - LocalDistributor distributor = new LocalDistributor( + local = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -1089,23 +1092,23 @@ void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() { Capabilities chrome = new ImmutableCapabilities("browserName", "chrome"); //Store our "expected results" sets for the various browser-specific nodes - Set edgeNodes = createNodeSet(handler, distributor, 3, edge, chrome, firefox); + Set edgeNodes = createNodeSet(handler, local, 3, edge, chrome, firefox); //chromeNodes is all these new nodes PLUS all the Edge nodes from before - Set chromeNodes = createNodeSet(handler, distributor, 5, chrome, firefox); + Set chromeNodes = createNodeSet(handler, local, 5, chrome, firefox); chromeNodes.addAll(edgeNodes); //all nodes support firefox, so add them to the firefoxNodes set - Set firefoxNodes = createNodeSet(handler, distributor, 3, firefox); + Set firefoxNodes = createNodeSet(handler, local, 3, firefox); firefoxNodes.addAll(edgeNodes); firefoxNodes.addAll(chromeNodes); - waitForAllNodesToHaveCapacity(distributor, 11); + waitForAllNodesToHaveCapacity(local, 11); //Assign 5 Chrome and 5 Firefox sessions to the distributor, make sure they don't go to the Edge node for (int i=0; i<5; i++) { Either chromeResult = - distributor.newSession(createRequest(chrome)); + local.newSession(createRequest(chrome)); assertThatEither(chromeResult).isRight(); Session chromeSession = chromeResult.right().getSession(); @@ -1118,7 +1121,7 @@ void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() { ); Either firefoxResult = - distributor.newSession(createRequest(firefox)); + local.newSession(createRequest(firefox)); assertThatEither(firefoxResult).isRight(); Session firefoxSession = firefoxResult.right().getSession(); LOG.info(String.format("Firefox Session %d assigned to %s", i, chromeSession.getUri())); @@ -1131,7 +1134,7 @@ void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() { //The Chrome Nodes should be full at this point, but Firefox isn't... so send an Edge session and make sure it routes to an Edge node Either edgeResult = - distributor.newSession(createRequest(edge)); + local.newSession(createRequest(edge)); assertThatEither(edgeResult).isRight(); Session edgeSession = edgeResult.right().getSession(); assertTrue(edgeNodes.stream().anyMatch(node -> node.getUri().equals(edgeSession.getUri()))); diff --git a/java/test/org/openqa/selenium/grid/router/JmxTest.java b/java/test/org/openqa/selenium/grid/router/JmxTest.java index cac391db2834b..f6b2290382e61 100644 --- a/java/test/org/openqa/selenium/grid/router/JmxTest.java +++ b/java/test/org/openqa/selenium/grid/router/JmxTest.java @@ -27,7 +27,6 @@ import org.openqa.selenium.grid.config.MapConfig; import org.openqa.selenium.grid.data.DefaultSlotMatcher; import org.openqa.selenium.grid.data.Session; -import org.openqa.selenium.grid.distributor.Distributor; import org.openqa.selenium.grid.distributor.local.LocalDistributor; import org.openqa.selenium.grid.distributor.selector.DefaultSlotSelector; import org.openqa.selenium.grid.jmx.JMXHelper; @@ -259,7 +258,7 @@ void shouldBeAbleToMonitorHub() throws Exception { secret, 5); - Distributor distributor = new LocalDistributor( + try (LocalDistributor distributor = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(localNode), @@ -270,28 +269,29 @@ void shouldBeAbleToMonitorHub() throws Exception { Duration.ofMinutes(5), false, Duration.ofSeconds(5), - Runtime.getRuntime().availableProcessors()); + Runtime.getRuntime().availableProcessors())) { - distributor.add(localNode); + distributor.add(localNode); - MBeanInfo info = beanServer.getMBeanInfo(name); - assertThat(info).isNotNull(); + MBeanInfo info = beanServer.getMBeanInfo(name); + assertThat(info).isNotNull(); - String nodeUpCount = (String) beanServer.getAttribute(name, "NodeUpCount"); - LOG.info("Node up count=" + nodeUpCount); - assertThat(Integer.parseInt(nodeUpCount)).isEqualTo(1); + String nodeUpCount = (String) beanServer.getAttribute(name, "NodeUpCount"); + LOG.info("Node up count=" + nodeUpCount); + assertThat(Integer.parseInt(nodeUpCount)).isEqualTo(1); - String nodeDownCount = (String) beanServer.getAttribute(name, "NodeDownCount"); - LOG.info("Node down count=" + nodeDownCount); - assertThat(Integer.parseInt(nodeDownCount)).isZero(); + String nodeDownCount = (String) beanServer.getAttribute(name, "NodeDownCount"); + LOG.info("Node down count=" + nodeDownCount); + assertThat(Integer.parseInt(nodeDownCount)).isZero(); - String activeSlots = (String) beanServer.getAttribute(name, "ActiveSlots"); - LOG.info("Active slots count=" + activeSlots); - assertThat(Integer.parseInt(activeSlots)).isZero(); + String activeSlots = (String) beanServer.getAttribute(name, "ActiveSlots"); + LOG.info("Active slots count=" + activeSlots); + assertThat(Integer.parseInt(activeSlots)).isZero(); - String idleSlots = (String) beanServer.getAttribute(name, "IdleSlots"); - LOG.info("Idle slots count=" + idleSlots); - assertThat(Integer.parseInt(idleSlots)).isEqualTo(1); + String idleSlots = (String) beanServer.getAttribute(name, "IdleSlots"); + LOG.info("Idle slots count=" + idleSlots); + assertThat(Integer.parseInt(idleSlots)).isEqualTo(1); + } } } diff --git a/java/test/org/openqa/selenium/grid/router/SessionCleanUpTest.java b/java/test/org/openqa/selenium/grid/router/SessionCleanUpTest.java index b0f2e9245ef41..5b7f022f530f1 100644 --- a/java/test/org/openqa/selenium/grid/router/SessionCleanUpTest.java +++ b/java/test/org/openqa/selenium/grid/router/SessionCleanUpTest.java @@ -148,8 +148,7 @@ void shouldRemoveSessionAfterNodeIsShutDownGracefully() { registrationSecret, 5); handler.addHandler(queue); - - LocalDistributor distributor = new LocalDistributor( + try(LocalDistributor distributor = new LocalDistributor( tracer, bus, clientFactory, @@ -160,85 +159,87 @@ void shouldRemoveSessionAfterNodeIsShutDownGracefully() { Duration.ofSeconds(1), false, Duration.ofSeconds(5), - Runtime.getRuntime().availableProcessors()); - handler.addHandler(distributor); + Runtime.getRuntime().availableProcessors())) { + handler.addHandler(distributor); - Router router = new Router(tracer, clientFactory, sessions, queue, distributor); - handler.addHandler(router); + Router router = new Router(tracer, clientFactory, sessions, queue, distributor); + handler.addHandler(router); - server = new NettyServer( - new BaseServerOptions( - new MapConfig(ImmutableMap.of())), - handler); + server = new NettyServer( + new BaseServerOptions( + new MapConfig(ImmutableMap.of())), + handler); - server.start(); + server.start(); - StringBuilder rawCaps = new StringBuilder(); - try (JsonOutput out = new Json().newOutput(rawCaps)) { - out.setPrettyPrint(false).write(capabilities); - } + StringBuilder rawCaps = new StringBuilder(); + try (JsonOutput out = new Json().newOutput(rawCaps)) { + out.setPrettyPrint(false).write(capabilities); + } + + Config additionalConfig = + new TomlConfig( + new StringReader( + "[node]\n" + + "detect-drivers = false\n" + + "driver-factories = [\n" + + String.format("\"%s\",", LocalTestSessionFactory.class.getName()) + "\n" + + String.format("\"%s\"", rawCaps.toString().replace("\"", "\\\"")) + "\n" + + "]")); + + String[] rawConfig = new String[]{ + "[events]", + "publish = \"tcp://localhost:" + publish + "\"", + "subscribe = \"tcp://localhost:" + subscribe + "\"", + "", + "[network]", + "relax-checks = true", + "", + "[server]", + "registration-secret = \"hereford hop\""}; + + Config nodeConfig = new MemoizedConfig( + new CompoundConfig( + additionalConfig, + new TomlConfig(new StringReader(String.join("\n", rawConfig))), + new MapConfig( + ImmutableMap.of("server", ImmutableMap.of("port", PortProber.findFreePort()))))); + + Server nodeServer = new NodeServer().asServer(nodeConfig).start(); + + waitToHaveCapacity(distributor); + + HttpRequest request = new HttpRequest(POST, "/session"); + request.setContent(asJson( + ImmutableMap.of( + "capabilities", ImmutableMap.of( + "alwaysMatch", capabilities)))); + + HttpClient client = clientFactory.createClient(server.getUrl()); + HttpResponse httpResponse = client.execute(request); + assertThat(httpResponse.getStatus()).isEqualTo(HTTP_OK); + + Optional> maybeResponse = + Optional.ofNullable(Values.get(httpResponse, Map.class)); + + assertThat(maybeResponse).isPresent(); + String rawResponse = JSON.toJson(maybeResponse.get().get("sessionId")); + SessionId id = JSON.toType(rawResponse, SessionId.class); - Config additionalConfig = - new TomlConfig( - new StringReader( - "[node]\n" + - "detect-drivers = false\n" + - "driver-factories = [\n" + - String.format("\"%s\",", LocalTestSessionFactory.class.getName()) + "\n" + - String.format("\"%s\"", rawCaps.toString().replace("\"", "\\\"")) + "\n" + - "]")); - - String[] rawConfig = new String[]{ - "[events]", - "publish = \"tcp://localhost:" + publish + "\"", - "subscribe = \"tcp://localhost:" + subscribe + "\"", - "", - "[network]", - "relax-checks = true", - "", - "[server]", - "registration-secret = \"hereford hop\""}; - - Config nodeConfig = new MemoizedConfig( - new CompoundConfig( - additionalConfig, - new TomlConfig(new StringReader(String.join("\n", rawConfig))), - new MapConfig( - ImmutableMap.of("server", ImmutableMap.of("port", PortProber.findFreePort()))))); - - Server nodeServer = new NodeServer().asServer(nodeConfig).start(); - - waitToHaveCapacity(distributor); - - HttpRequest request = new HttpRequest(POST, "/session"); - request.setContent(asJson( - ImmutableMap.of( - "capabilities", ImmutableMap.of( - "alwaysMatch", capabilities)))); - - HttpClient client = clientFactory.createClient(server.getUrl()); - HttpResponse httpResponse = client.execute(request); - assertThat(httpResponse.getStatus()).isEqualTo(HTTP_OK); - - Optional> maybeResponse = - Optional.ofNullable(Values.get(httpResponse, Map.class)); - - assertThat(maybeResponse).isPresent(); - String rawResponse = JSON.toJson(maybeResponse.get().get("sessionId")); - SessionId id = JSON.toType(rawResponse, SessionId.class); - - Session session = sessions.get(id); - - assertThat(session.getCapabilities().getBrowserName()).isEqualTo(capabilities.getBrowserName()); - - nodeServer.stop(); - - waitTillNodesAreRemoved(distributor); - - try { - waitTillSessionIsRemoved(sessions, id); - } catch (Exception e) { - fail("Session not removed"); + Session session = sessions.get(id); + + assertThat(session.getCapabilities().getBrowserName()).isEqualTo( + capabilities.getBrowserName()); + + nodeServer.stop(); + + waitTillNodesAreRemoved(distributor); + + try { + waitTillSessionIsRemoved(sessions, id); + } catch (Exception e) { + fail("Session not removed"); + } } } @@ -271,7 +272,7 @@ void shouldRemoveSessionAfterNodeIsDown() throws URISyntaxException { .build(); handler.addHandler(node); - LocalDistributor distributor = new LocalDistributor( + try(LocalDistributor distributor = new LocalDistributor( tracer, bus, new PassthroughHttpClient.Factory(handler), @@ -282,48 +283,49 @@ void shouldRemoveSessionAfterNodeIsDown() throws URISyntaxException { Duration.ofSeconds(1), false, Duration.ofSeconds(5), - Runtime.getRuntime().availableProcessors()); - handler.addHandler(distributor); - distributor.add(node); + Runtime.getRuntime().availableProcessors())) { + handler.addHandler(distributor); + distributor.add(node); - waitToHaveCapacity(distributor); + waitToHaveCapacity(distributor); - Either result = - distributor.newSession(new SessionRequest( - new RequestId(UUID.randomUUID()), - Instant.now(), - ImmutableSet.of(W3C), - ImmutableSet.of(capabilities), - ImmutableMap.of(), - ImmutableMap.of())); - assertThat(result.isRight()).isTrue(); + Either result = + distributor.newSession(new SessionRequest( + new RequestId(UUID.randomUUID()), + Instant.now(), + ImmutableSet.of(W3C), + ImmutableSet.of(capabilities), + ImmutableMap.of(), + ImmutableMap.of())); + assertThat(result.isRight()).isTrue(); - SessionId id = result.right().getSession().getId(); - Session session = sessions.get(id); + SessionId id = result.right().getSession().getId(); + Session session = sessions.get(id); - assertThat(session.getCapabilities().getBrowserName()).isEqualTo(capabilities.getBrowserName()); + assertThat(session.getCapabilities().getBrowserName()).isEqualTo( + capabilities.getBrowserName()); - availability.set(DOWN); + availability.set(DOWN); - waitTillNodesAreRemoved(distributor); + waitTillNodesAreRemoved(distributor); - try { - waitTillSessionIsRemoved(sessions, id); - } catch (Exception e) { - fail("Session not removed"); - } - - Either sessionResponse = - distributor.newSession(new SessionRequest( - new RequestId(UUID.randomUUID()), - Instant.now(), - ImmutableSet.of(W3C), - ImmutableSet.of(capabilities), - ImmutableMap.of(), - ImmutableMap.of())); - assertThat(sessionResponse.isLeft()).isTrue(); - assertThat(distributor.getStatus().getNodes().isEmpty()).isTrue(); + try { + waitTillSessionIsRemoved(sessions, id); + } catch (Exception e) { + fail("Session not removed"); + } + Either sessionResponse = + distributor.newSession(new SessionRequest( + new RequestId(UUID.randomUUID()), + Instant.now(), + ImmutableSet.of(W3C), + ImmutableSet.of(capabilities), + ImmutableMap.of(), + ImmutableMap.of())); + assertThat(sessionResponse.isLeft()).isTrue(); + assertThat(distributor.getStatus().getNodes().isEmpty()).isTrue(); + } } private void waitToHaveCapacity(Distributor distributor) { diff --git a/java/test/org/openqa/selenium/grid/router/SessionQueueGridTest.java b/java/test/org/openqa/selenium/grid/router/SessionQueueGridTest.java index 8c84f60327b1c..2ce73faae3a29 100644 --- a/java/test/org/openqa/selenium/grid/router/SessionQueueGridTest.java +++ b/java/test/org/openqa/selenium/grid/router/SessionQueueGridTest.java @@ -83,6 +83,7 @@ class SessionQueueGridTest { private HttpClient.Factory clientFactory; private Secret registrationSecret; private Server server; + private EventBus bus; private static Server createServer(HttpHandler handler) { return new NettyServer( @@ -95,7 +96,7 @@ private static Server createServer(HttpHandler handler) { @BeforeEach public void setup() throws URISyntaxException, MalformedURLException { Tracer tracer = DefaultTestTracer.createTracer(); - EventBus bus = new GuavaEventBus(); + bus = new GuavaEventBus(); int nodePort = PortProber.findFreePort(); URI nodeUri = new URI("http://localhost:" + nodePort); CombinedHandler handler = new CombinedHandler(); @@ -237,6 +238,7 @@ void shouldBeAbleToClearQueue() { @AfterEach public void stopServer() { + bus.close(); server.stop(); } diff --git a/java/test/org/openqa/selenium/grid/sessionmap/jdbc/JdbcBackedSessionMapTest.java b/java/test/org/openqa/selenium/grid/sessionmap/jdbc/JdbcBackedSessionMapTest.java index d5b5133529d2d..947a861568f60 100644 --- a/java/test/org/openqa/selenium/grid/sessionmap/jdbc/JdbcBackedSessionMapTest.java +++ b/java/test/org/openqa/selenium/grid/sessionmap/jdbc/JdbcBackedSessionMapTest.java @@ -60,6 +60,7 @@ public static void createDB() throws SQLException { @AfterAll public static void killDBConnection() throws SQLException { connection.close(); + bus.close(); } @Test diff --git a/java/test/org/openqa/selenium/grid/sessionmap/redis/RedisBackedSessionMapTest.java b/java/test/org/openqa/selenium/grid/sessionmap/redis/RedisBackedSessionMapTest.java index 1b95227655835..2abe6d2829e84 100644 --- a/java/test/org/openqa/selenium/grid/sessionmap/redis/RedisBackedSessionMapTest.java +++ b/java/test/org/openqa/selenium/grid/sessionmap/redis/RedisBackedSessionMapTest.java @@ -64,6 +64,7 @@ public void setUp() throws URISyntaxException { public void tearDownRedisServer() { sessions.getRedisClient().close(); safelyCall(() -> server.stop()); + bus.close(); } @Test