Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@

@ManagedService(objectName = "org.seleniumhq.grid:type=Distributor,name=LocalDistributor",
description = "Grid 4 node distributor")
public class LocalDistributor extends Distributor implements Closeable {
public class LocalDistributor extends Distributor implements AutoCloseable {

private static final Logger LOG = Logger.getLogger(LocalDistributor.class.getName());

Expand Down
40 changes: 23 additions & 17 deletions java/test/org/openqa/selenium/events/ZeroMqEventBusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,37 @@
class ZeroMqEventBusTest {

@Test
void shouldEnsureMessagesRequireSecret() throws InterruptedException, ExecutionException, TimeoutException {
void shouldEnsureMessagesRequireSecret()
throws InterruptedException, ExecutionException, TimeoutException {
String publish = "inproc://zmqebt-publish";
String subscribe = "inproc://zmqebt-subscribe";

ZContext context = new ZContext();
EventBus good = ZeroMqEventBus.create(context, publish, subscribe, true, new Secret("cheese"));
EventBus alsoGood = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("cheese"));
EventBus bad = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("peas"));
try (
EventBus good = ZeroMqEventBus.create(context, publish, subscribe, true, new Secret("cheese"));
EventBus alsoGood = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("cheese"));
EventBus bad = ZeroMqEventBus.create(context, publish, subscribe, false, new Secret("peas"))) {

RuntimeException errorException = new RuntimeException("oh noes!");
EventName eventName = new EventName("evt");
CompletableFuture<String> future = new CompletableFuture<>();
good.addListener(new EventListener<>(eventName, String.class, future::complete));
good.addListener(ZeroMqEventBus.onRejectedEvent(evt -> future.completeExceptionally(errorException)));
RuntimeException errorException = new RuntimeException("oh noes!");
EventName eventName = new EventName("evt");
CompletableFuture<String> future = new CompletableFuture<>();
good.addListener(new EventListener<>(eventName, String.class, future::complete));
good.addListener(
ZeroMqEventBus.onRejectedEvent(evt -> future.completeExceptionally(errorException)));

alsoGood.fire(new Event(eventName, "tasty"));
alsoGood.fire(new Event(eventName, "tasty"));

String value = future.get(5, SECONDS);
assertThat(value).isEqualTo("tasty");
String value = future.get(5, SECONDS);
assertThat(value).isEqualTo("tasty");

CompletableFuture<String> badFuture = new CompletableFuture<>();
good.addListener(new EventListener<>(eventName, String.class, badFuture::complete));
good.addListener(ZeroMqEventBus.onRejectedEvent(evt -> badFuture.completeExceptionally(errorException)));
bad.fire(new Event(eventName, "not tasty"));
CompletableFuture<String> badFuture = new CompletableFuture<>();
good.addListener(new EventListener<>(eventName, String.class, badFuture::complete));
good.addListener(
ZeroMqEventBus.onRejectedEvent(evt -> badFuture.completeExceptionally(errorException)));
bad.fire(new Event(eventName, "not tasty"));

Assertions.assertThatThrownBy(() -> badFuture.get(5, SECONDS)).getCause().isSameAs(errorException);
Assertions.assertThatThrownBy(() -> badFuture.get(5, SECONDS)).getCause()
.isSameAs(errorException);
}
}
}
118 changes: 65 additions & 53 deletions java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void shouldBeAbleToRegisterACustomNode() throws URISyntaxException {
c -> new Session(
new SessionId(UUID.randomUUID()), sessionUri, stereotype, c, Instant.now()));

Distributor local = new LocalDistributor(
try (LocalDistributor local = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(node),
Expand All @@ -174,16 +174,19 @@ void shouldBeAbleToRegisterACustomNode() throws URISyntaxException {
Duration.ofMinutes(5),
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize);
newSessionThreadPoolSize)) {

distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret);
distributor =
new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl,
registrationSecret);

distributor.add(node);
distributor.add(node);

wait.until(obj -> distributor.getStatus().hasCapacity());
wait.until(obj -> distributor.getStatus().hasCapacity());

NodeStatus status = getOnlyElement(distributor.getStatus().getNodes());
assertEquals(1, getStereotypes(status).get(CAPS).intValue());
NodeStatus status = getOnlyElement(distributor.getStatus().getNodes());
assertEquals(1, getStereotypes(status).get(CAPS).intValue());
}
}

@Test
Expand All @@ -197,7 +200,7 @@ void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxException
(id, caps) -> new Session(id, sessionUri, stereotype, caps, Instant.now())))
.build();

Distributor local = new LocalDistributor(
try (LocalDistributor local = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(node),
Expand All @@ -208,16 +211,19 @@ void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxException
Duration.ofMinutes(5),
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize);
newSessionThreadPoolSize)) {

distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret);
distributor =
new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl,
registrationSecret);

bus.fire(new NodeStatusEvent(node.getStatus()));
bus.fire(new NodeStatusEvent(node.getStatus()));

wait.until(obj -> distributor.getStatus().hasCapacity());
wait.until(obj -> distributor.getStatus().hasCapacity());

NodeStatus status = getOnlyElement(distributor.getStatus().getNodes());
assertEquals(1, getStereotypes(status).get(CAPS).intValue());
NodeStatus status = getOnlyElement(distributor.getStatus().getNodes());
assertEquals(1, getStereotypes(status).get(CAPS).intValue());
}
}

@Test
Expand All @@ -241,7 +247,7 @@ void shouldKeepOnlyOneNodeWhenTwoRegistrationsHaveTheSameUriByListeningForEvents
handler.addHandler(firstNode);
handler.addHandler(secondNode);

Distributor local = new LocalDistributor(
try (LocalDistributor local = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(handler),
Expand All @@ -252,18 +258,21 @@ void shouldKeepOnlyOneNodeWhenTwoRegistrationsHaveTheSameUriByListeningForEvents
Duration.ofMinutes(5),
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize);
newSessionThreadPoolSize)) {

distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret);
distributor =
new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl,
registrationSecret);

bus.fire(new NodeStatusEvent(firstNode.getStatus()));
bus.fire(new NodeStatusEvent(secondNode.getStatus()));
bus.fire(new NodeStatusEvent(firstNode.getStatus()));
bus.fire(new NodeStatusEvent(secondNode.getStatus()));

wait.until(obj -> distributor.getStatus());
wait.until(obj -> distributor.getStatus());

Set<NodeStatus> nodes = distributor.getStatus().getNodes();
Set<NodeStatus> nodes = distributor.getStatus().getNodes();

assertEquals(1, nodes.size());
assertEquals(1, nodes.size());
}
}

@Test
Expand All @@ -278,7 +287,7 @@ void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange()
(id, caps) -> new Session(id, sessionUri, stereotype, caps, Instant.now())))
.build();

Distributor local = new LocalDistributor(
try (LocalDistributor local = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(node),
Expand All @@ -289,40 +298,43 @@ void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange()
Duration.ofMinutes(5),
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize);
newSessionThreadPoolSize)) {

distributor = new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl, registrationSecret);
distributor =
new RemoteDistributor(tracer, new PassthroughHttpClient.Factory(local), externalUrl,
registrationSecret);

bus.fire(new NodeStatusEvent(node.getStatus()));
bus.fire(new NodeStatusEvent(node.getStatus()));

// Start empty
wait.until(obj -> distributor.getStatus().hasCapacity());
// Start empty
wait.until(obj -> distributor.getStatus().hasCapacity());

NodeStatus nodeStatus = getOnlyElement(distributor.getStatus().getNodes());
assertEquals(1, getStereotypes(nodeStatus).get(CAPS).intValue());

// Craft a status that makes it look like the node is busy, and post it on the bus.
NodeStatus status = node.getStatus();
NodeStatus crafted = new NodeStatus(
status.getNodeId(),
status.getExternalUri(),
status.getMaxSessionCount(),
ImmutableSet.of(
new Slot(
new SlotId(status.getNodeId(), UUID.randomUUID()),
CAPS,
Instant.now(),
new Session(
new SessionId(UUID.randomUUID()), sessionUri, CAPS, CAPS, Instant.now()))),
UP,
Duration.ofSeconds(10),
status.getVersion(),
status.getOsInfo());

bus.fire(new NodeStatusEvent(crafted));

// We claimed the only slot is filled. Life is good.
wait.until(obj -> !distributor.getStatus().hasCapacity());
NodeStatus nodeStatus = getOnlyElement(distributor.getStatus().getNodes());
assertEquals(1, getStereotypes(nodeStatus).get(CAPS).intValue());

// Craft a status that makes it look like the node is busy, and post it on the bus.
NodeStatus status = node.getStatus();
NodeStatus crafted = new NodeStatus(
status.getNodeId(),
status.getExternalUri(),
status.getMaxSessionCount(),
ImmutableSet.of(
new Slot(
new SlotId(status.getNodeId(), UUID.randomUUID()),
CAPS,
Instant.now(),
new Session(
new SessionId(UUID.randomUUID()), sessionUri, CAPS, CAPS, Instant.now()))),
UP,
Duration.ofSeconds(10),
status.getVersion(),
status.getOsInfo());

bus.fire(new NodeStatusEvent(crafted));

// We claimed the only slot is filled. Life is good.
wait.until(obj -> !distributor.getStatus().hasCapacity());
}
}

private Map<Capabilities, Integer> getStereotypes(NodeStatus status) {
Expand Down
Loading