From 196fa63161da32014389497dbe484312c281f8f3 Mon Sep 17 00:00:00 2001 From: Vadim Nabiev Date: Sun, 30 Jan 2022 03:34:25 +0300 Subject: [PATCH] fixed bug of hanging `CompletableFuture`s of requests (when calling `WsProvider::send`) - GC collected them because they didn't have strong references instead of using `ReferenceQueue` for cleaning handlers currently is used `ScheduledExecutorService` that schedules completing futures with `TimeoutException` and removing corresponding handlers --- build.gradle | 2 +- .../common/gc/WeakReferenceFinalizer.java | 17 ----- .../transport/ws/WsProvider.java | 73 +++++++++++-------- .../transport/ws/WsProviderProxyTest.java | 73 +++++++++++++++++-- 4 files changed, 110 insertions(+), 55 deletions(-) delete mode 100644 common/src/main/java/com/strategyobject/substrateclient/common/gc/WeakReferenceFinalizer.java diff --git a/build.gradle b/build.gradle index 27b5a06a..1c70b5c5 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { allprojects { group = 'com.strategyobject.substrateclient' - version = '0.0.1-SNAPSHOT' + version = '0.0.2-SNAPSHOT' repositories { mavenLocal() diff --git a/common/src/main/java/com/strategyobject/substrateclient/common/gc/WeakReferenceFinalizer.java b/common/src/main/java/com/strategyobject/substrateclient/common/gc/WeakReferenceFinalizer.java deleted file mode 100644 index 4ce6e3ca..00000000 --- a/common/src/main/java/com/strategyobject/substrateclient/common/gc/WeakReferenceFinalizer.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.strategyobject.substrateclient.common.gc; - -import java.lang.ref.ReferenceQueue; -import java.lang.ref.WeakReference; - -public class WeakReferenceFinalizer extends WeakReference { - private final Runnable finalizer; - - public WeakReferenceFinalizer(T referent, ReferenceQueue q, Runnable finalizer) { - super(referent, q); - this.finalizer = finalizer; - } - - public void finalizeResources() { - finalizer.run(); - } -} diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java index 8ed25a74..a38b349e 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java @@ -4,7 +4,6 @@ import com.google.common.base.Strings; import com.strategyobject.substrateclient.common.eventemitter.EventEmitter; import com.strategyobject.substrateclient.common.eventemitter.EventListener; -import com.strategyobject.substrateclient.common.gc.WeakReferenceFinalizer; import com.strategyobject.substrateclient.transport.ProviderInterface; import com.strategyobject.substrateclient.transport.ProviderInterfaceEmitted; import com.strategyobject.substrateclient.transport.SubscriptionHandler; @@ -18,9 +17,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; -import java.lang.ref.WeakReference; import java.net.URI; import java.net.URISyntaxException; import java.util.*; @@ -48,7 +44,7 @@ public WsStateSubscription(BiConsumer callBack, @Getter @Setter class WsStateAwaiting { - private WeakReference> callBack; + private CompletableFuture callback; private String method; private List params; private SubscriptionHandler subscription; @@ -58,6 +54,8 @@ public class WsProvider implements ProviderInterface, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(WsProvider.class); private static final int RESUBSCRIBE_TIMEOUT = 20; private static final Map ALIASES = new HashMap<>(); + private static final ScheduledExecutorService timedOutHandlerCleaner = Executors + .newScheduledThreadPool(1); static { ALIASES.put("chain_finalisedHead", "chain_finalizedHead"); @@ -65,7 +63,6 @@ public class WsProvider implements ProviderInterface, AutoCloseable { ALIASES.put("chain_unsubscribeFinalisedHeads", "chain_unsubscribeFinalizedHeads"); } - private final ReferenceQueue> referenceQueue = new ReferenceQueue<>(); private final RpcCoder coder = new RpcCoder(); private final URI endpoint; private final Map headers; @@ -75,13 +72,15 @@ public class WsProvider implements ProviderInterface, AutoCloseable { private final Map waitingForId = new ConcurrentHashMap<>(); private final int heartbeatInterval; private final AtomicReference webSocket = new AtomicReference<>(null); + private final long responseTimeoutInMs; private int autoConnectMs; private volatile boolean isConnected = false; WsProvider(@NonNull URI endpoint, int autoConnectMs, Map headers, - int heartbeatInterval) { + int heartbeatInterval, + long responseTimeoutInMs) { Preconditions.checkArgument( endpoint.getScheme().matches("(?i)ws|wss"), "Endpoint should start with 'ws://', received " + endpoint); @@ -93,6 +92,7 @@ public class WsProvider implements ProviderInterface, AutoCloseable { this.autoConnectMs = autoConnectMs; this.headers = headers; this.heartbeatInterval = heartbeatInterval; + this.responseTimeoutInMs = responseTimeoutInMs; if (autoConnectMs > 0) { this.connect(); @@ -211,17 +211,14 @@ private CompletableFuture send(String method, logger.debug("Calling {} {}, {}, {}, {}", id, method, params, json, subscription); val whenResponseReceived = new CompletableFuture(); - val callback = new WeakReferenceFinalizer<>( - whenResponseReceived, - referenceQueue, - () -> this.handlers.remove(id)); - - this.handlers.put(id, new WsStateAwaiting<>(callback, method, params, subscription)); + this.handlers.put(id, new WsStateAwaiting<>(whenResponseReceived, method, params, subscription)); return CompletableFuture.runAsync(() -> this.webSocket.get().send(json)) .whenCompleteAsync((_res, ex) -> { if (ex != null) { this.handlers.remove(id); + } else { + scheduleCleanupIfNoResponseWithinTimeout(id); } }) .thenCombineAsync(whenResponseReceived, (_a, b) -> b); @@ -299,6 +296,23 @@ public CompletableFuture unsubscribe(String type, String method, String return whenUnsubscribed; } + private void scheduleCleanupIfNoResponseWithinTimeout(int id) { + timedOutHandlerCleaner.schedule(() -> { + val handler = this.handlers.remove(id); + if (handler == null) { + return; + } + + handler + .getCallback() + .completeExceptionally(new TimeoutException( + String.format("The node didn't respond within %s milliseconds.", + responseTimeoutInMs))); + }, + responseTimeoutInMs, + TimeUnit.MILLISECONDS); + } + private void emit(ProviderInterfaceEmitted type, Object... args) { this.eventEmitter.emit(type, args); } @@ -324,12 +338,7 @@ private void onSocketClose(int code, String reason) { // reject all hanging requests val wsClosedException = new WsClosedException(errorMessage); - this.handlers.values().forEach(x -> { - val callback = x.getCallBack().get(); - if (callback != null) { - callback.completeExceptionally(wsClosedException); - } - }); + this.handlers.values().forEach(x -> x.getCallback().completeExceptionally(wsClosedException)); this.handlers.clear(); this.waitingForId.clear(); @@ -346,7 +355,6 @@ private void onSocketError(Exception ex) { private void onSocketMessage(String message) { logger.debug("Received {}", message); - this.cleanCollectedHandlers(); JsonRpcResponse response = RpcCoder.decodeJson(message); if (Strings.isNullOrEmpty(response.getMethod())) { @@ -365,12 +373,11 @@ private void onSocketMessageResult(JsonRpcResponseSingle response) { return; } - val callback = Optional.ofNullable(handler.getCallBack().get()); try { val result = (T) response.getResult(); // first send the result - in case of subs, we may have an update // immediately if we have some queued results already - callback.ifPresent(x -> x.complete(result)); + handler.getCallback().complete(result); val subscription = handler.getSubscription(); if (subscription != null) { @@ -390,7 +397,7 @@ private void onSocketMessageResult(JsonRpcResponseSingle response) { } } } catch (Exception ex) { - callback.ifPresent(x -> x.completeExceptionally(ex)); + handler.getCallback().completeExceptionally(ex); } this.handlers.remove(id); @@ -467,19 +474,12 @@ private void resubscribe() { } } - private void cleanCollectedHandlers() { - Reference referenceFromQueue; - while ((referenceFromQueue = referenceQueue.poll()) != null) { - ((WeakReferenceFinalizer) referenceFromQueue).finalizeResources(); - referenceFromQueue.clear(); - } - } - public static class Builder { private URI endpoint; private int autoConnectMs = 2500; private Map headers = null; private int heartbeatInterval = 60; + private long responseTimeoutInMs = 20000; Builder() { try { @@ -527,8 +527,17 @@ public Builder disableHeartbeats() { return this; } + public Builder setResponseTimeout(long timeout, TimeUnit timeUnit) { + this.responseTimeoutInMs = timeUnit.toMillis(timeout); + return this; + } + public WsProvider build() { - return new WsProvider(this.endpoint, this.autoConnectMs, this.headers, this.heartbeatInterval); + return new WsProvider(this.endpoint, + this.autoConnectMs, + this.headers, + this.heartbeatInterval, + this.responseTimeoutInMs); } } } diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java index b121d17a..c95fe73c 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java @@ -2,6 +2,7 @@ import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import eu.rekawek.toxiproxy.model.ToxicDirection; import lombok.SneakyThrows; import lombok.val; import org.junit.jupiter.api.Test; @@ -10,10 +11,13 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import java.util.Map; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.*; @Testcontainers public class WsProviderProxyTest { @@ -27,12 +31,9 @@ public class WsProviderProxyTest { static final ToxiproxyContainer toxiproxy = new ToxiproxyContainer("shopify/toxiproxy") .withNetwork(network) .withNetworkAliases("toxiproxy"); - - final ToxiproxyContainer.ContainerProxy proxy = toxiproxy.getProxy(substrate, 9944); - private static final int HEARTBEAT_INTERVAL = 5; private static final int WAIT_TIMEOUT = HEARTBEAT_INTERVAL * 2; - + final ToxiproxyContainer.ContainerProxy proxy = toxiproxy.getProxy(substrate, 9944); @Test void canReconnect() { @@ -77,6 +78,68 @@ void canAutoConnectWhenServerAvailable() { } } + @Test + @SneakyThrows + void throwsExceptionWhenCanNotSendRequestAndCleanHandler() { + try (val wsProvider = WsProvider.builder() + .setEndpoint(getWsAddress()) + .disableAutoConnect() + .build()) { + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + + val timeout = proxy + .toxics() + .timeout("timeout", ToxicDirection.UPSTREAM, 1000); + + val exception = assertThrows(CompletionException.class, + () -> wsProvider.send("system_version").join()); + assertTrue(exception.getCause() instanceof WsClosedException); + + val handlers = getHandlersOf(wsProvider); + assertEquals(0, handlers.size()); + + timeout.remove(); + } + } + + @Test + @SneakyThrows + void throwsExceptionWhenResponseTimeoutAndCleanHandler() { + val responseTimeout = 500; + try (val wsProvider = WsProvider.builder() + .setEndpoint(getWsAddress()) + .setResponseTimeout(responseTimeout, TimeUnit.MILLISECONDS) + .disableAutoConnect() + .build()) { + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + + val latency = proxy + .toxics() + .latency("latency", ToxicDirection.DOWNSTREAM, responseTimeout * 3); + + val exception = assertThrows(CompletionException.class, + () -> wsProvider.send("system_version").join(), + String.format("The node didn't respond for %s milliseconds.", responseTimeout)); + assertTrue(exception.getCause() instanceof TimeoutException); + + val handlers = getHandlersOf(wsProvider); + assertEquals(0, handlers.size()); + + latency.remove(); + } + } + + private Map getHandlersOf(WsProvider wsProvider) throws NoSuchFieldException, IllegalAccessException { + val handlersFields = wsProvider.getClass().getDeclaredField("handlers"); + handlersFields.setAccessible(true); + + return (Map) handlersFields.get(wsProvider); + } + private String getWsAddress() { return String.format("ws://%s:%s", proxy.getContainerIpAddress(), proxy.getProxyPort()); }