diff --git a/build.gradle b/build.gradle index d6da2b1c..9250a5ae 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { allprojects { group = 'com.strategyobject.substrateclient' - version = '0.1.2-SNAPSHOT' + version = '0.1.3-SNAPSHOT' repositories { mavenLocal() diff --git a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageDoubleMapImplTests.java b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageDoubleMapImplTests.java index 722a76f2..d9557382 100644 --- a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageDoubleMapImplTests.java +++ b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageDoubleMapImplTests.java @@ -8,6 +8,7 @@ import com.strategyobject.substrateclient.scale.ScaleWriter; import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.val; import org.junit.jupiter.api.Test; @@ -29,7 +30,7 @@ class StorageDoubleMapImplTests { void societyVotes() throws Exception { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); diff --git a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageMapImplTests.java b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageMapImplTests.java index 37fda4f8..161e6083 100644 --- a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageMapImplTests.java +++ b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageMapImplTests.java @@ -7,6 +7,7 @@ import com.strategyobject.substrateclient.scale.ScaleWriter; import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.val; import org.junit.jupiter.api.Test; @@ -29,7 +30,7 @@ class StorageMapImplTests { void systemBlockHash() throws Exception { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); diff --git a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageNMapImplTests.java b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageNMapImplTests.java index d16833df..3b429095 100644 --- a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageNMapImplTests.java +++ b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageNMapImplTests.java @@ -13,6 +13,7 @@ import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; import com.strategyobject.substrateclient.transport.ProviderInterface; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.NonNull; import lombok.val; @@ -56,7 +57,7 @@ private static StorageNMapImpl newSystemBlockHashStorage(State state) private WsProvider getConnectedProvider() throws InterruptedException, ExecutionException, TimeoutException { val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build(); wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); return wsProvider; diff --git a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageValueImplTests.java b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageValueImplTests.java index 74c7a8df..da4e3287 100644 --- a/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageValueImplTests.java +++ b/pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageValueImplTests.java @@ -8,6 +8,7 @@ import com.strategyobject.substrateclient.rpc.api.section.State; import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.val; import org.junit.jupiter.api.Test; @@ -33,7 +34,7 @@ void sudoKey() throws Exception { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); @@ -58,7 +59,7 @@ void sudoKeyAtGenesis() throws Exception { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); val state = TestsHelper.createSectionFactory(wsProvider).create(State.class); diff --git a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/AuthorTests.java b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/AuthorTests.java index 93e51191..c8097ecd 100644 --- a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/AuthorTests.java +++ b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/AuthorTests.java @@ -11,6 +11,7 @@ import com.strategyobject.substrateclient.scale.ScaleWriter; import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.val; import lombok.var; @@ -121,7 +122,7 @@ void submitAndWatchExtrinsic() throws Exception { private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException { val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build(); wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); diff --git a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/ChainTests.java b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/ChainTests.java index 876d4435..949f3f48 100644 --- a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/ChainTests.java +++ b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/ChainTests.java @@ -4,6 +4,7 @@ import com.strategyobject.substrateclient.rpc.api.BlockNumber; import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.val; import org.junit.jupiter.api.Assertions; @@ -134,7 +135,7 @@ void getCurrentBlock() throws ExecutionException, InterruptedException, TimeoutE private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException { val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build(); wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); diff --git a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/StateTests.java b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/StateTests.java index 330ca7af..c5ddc094 100644 --- a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/StateTests.java +++ b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/StateTests.java @@ -5,6 +5,7 @@ import com.strategyobject.substrateclient.rpc.api.StorageKey; import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.val; import org.junit.jupiter.api.Assertions; @@ -176,7 +177,7 @@ void queryStorageAt() throws Exception { private WsProvider connect() throws Exception { val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build(); wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); diff --git a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/SystemTests.java b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/SystemTests.java index 1a5d1c9e..dcd3cb10 100644 --- a/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/SystemTests.java +++ b/rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/SystemTests.java @@ -5,6 +5,7 @@ import com.strategyobject.substrateclient.rpc.api.Index; import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy; import com.strategyobject.substrateclient.transport.ws.WsProvider; import lombok.val; import org.junit.jupiter.api.Test; @@ -28,7 +29,7 @@ class SystemTests { void accountNextIndex() throws Exception { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/Delay.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/Delay.java new file mode 100644 index 00000000..f3eb72ca --- /dev/null +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/Delay.java @@ -0,0 +1,28 @@ +package com.strategyobject.substrateclient.transport.ws; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.concurrent.TimeUnit; + +/** + * Represents a delay + */ +@RequiredArgsConstructor(staticName = "of") +@Getter +public class Delay { + /** + * The time to delay execution unit + */ + private final long value; + + /** + * The time unit of the delay parameter + */ + private final TimeUnit unit; + + /** + * A delay that should never be scheduled + */ + public static final Delay NEVER = Delay.of(-1, TimeUnit.SECONDS); +} diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ExponentialBackoffReconnectionPolicy.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ExponentialBackoffReconnectionPolicy.java new file mode 100644 index 00000000..c1758d41 --- /dev/null +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ExponentialBackoffReconnectionPolicy.java @@ -0,0 +1,123 @@ +package com.strategyobject.substrateclient.transport.ws; + +import com.google.common.base.Preconditions; +import lombok.*; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +/** + * Represents an exponential backoff retry policy + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Getter +@Slf4j +public class ExponentialBackoffReconnectionPolicy implements ReconnectionPolicy { + /** + * Max number of attempts + */ + private final long maxAttempts; + /** + * Initial delay, the time to delay execution unit + */ + private final long delay; + /** + * The time unit of the delay parameter + */ + private final TimeUnit unit; + /** + * Max delay + */ + private final long maxDelay; + /** + * A multiplier that's applied to delay after every attempt + */ + private final double factor; + + /** + * @param context contains a reason of disconnection and counter of attempts + * @return a unit of time to delay the next reconnection + */ + @Override + public @NonNull Delay getNextDelay(@NonNull ReconnectionContext context) { + try { + if (context.getPolicyContext().longValue() >= maxAttempts) { + log.info("Provider won't reconnect more."); + + return Delay.NEVER; + } + + var nextDelay = delay * Math.pow(factor, context.getPolicyContext().longValue()); + nextDelay = Math.min(nextDelay, maxDelay); + + log.info("Provider will try to reconnect after: {} {}", nextDelay, unit); + return Delay.of((long) nextDelay, unit); + } finally { + context.getPolicyContext().increment(); + } + } + + /** + * Returns the counter of attempts + */ + @Override + public LongAdder initContext() { + return new LongAdder(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private long delay = 15; + private TimeUnit unit = TimeUnit.SECONDS; + private long maxDelay = 150; + private long maxAttempts = 10; + private double factor = 2; + + Builder() { + } + + public Builder retryAfter(long delay, TimeUnit unit) { + Preconditions.checkArgument(delay >= 0); + + this.delay = delay; + this.unit = unit; + + return this; + } + + public Builder withFactor(double factor) { + Preconditions.checkArgument(factor > 0); + + this.factor = factor; + return this; + } + + public Builder withMaxDelay(long maxDelay) { + Preconditions.checkArgument(maxDelay >= 0); + + this.maxDelay = maxDelay; + return this; + } + + public Builder notMoreThan(long maxAttempts) { + Preconditions.checkArgument(maxAttempts >= 0); + + this.maxAttempts = maxAttempts; + return this; + } + + public ExponentialBackoffReconnectionPolicy build() { + return new ExponentialBackoffReconnectionPolicy( + this.maxAttempts, + this.delay, + this.unit, + this.maxDelay, + this.factor + ); + } + } +} diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ReconnectionContext.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ReconnectionContext.java new file mode 100644 index 00000000..a7e07ef0 --- /dev/null +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ReconnectionContext.java @@ -0,0 +1,29 @@ +package com.strategyobject.substrateclient.transport.ws; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Represents a context why connection was closed + * + * @param a type of policy's context required for + * computing the next delay or other policy's purposes + */ +@RequiredArgsConstructor(staticName = "of") +@Getter +public class ReconnectionContext { + /** + * The code of the reason of disconnection + */ + private final int code; + + /** + * The text of the reason + */ + private final String reason; + + /** + * The policy's context + */ + private final T policyContext; +} diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ReconnectionPolicy.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ReconnectionPolicy.java new file mode 100644 index 00000000..6cea8e7f --- /dev/null +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/ReconnectionPolicy.java @@ -0,0 +1,55 @@ +package com.strategyobject.substrateclient.transport.ws; + +import lombok.NonNull; + +/** + * @param a type of policy's context required for + * computing the next delay or other policy's purposes + * Represents a strategy of reconnection + */ +public interface ReconnectionPolicy { + + /** + * The method is called when connection was closed and probably should be reconnected. + * @param context contains a reason of disconnection and policy's context. + * @return a unit of time from now to delay reconnection. + */ + @NonNull Delay getNextDelay(@NonNull ReconnectionContext context); + + /** + * The method is called before the first connection or when the one successfully reestablished. + * @return a context required for the policy. + */ + T initContext(); + + /** + * @return the builder of ExponentialBackoffReconnectionPolicy + */ + static ExponentialBackoffReconnectionPolicy.Builder exponentialBackoff() { + return ExponentialBackoffReconnectionPolicy.builder(); + } + + /** + * @param the type of context + * @return the policy that's supposed to not reconnect automatically + */ + @SuppressWarnings("unchecked") + static ReconnectionPolicy manual() { + return (ReconnectionPolicy) MANUAL; + } + + /** + * The policy that's supposed to not reconnect automatically + */ + ReconnectionPolicy MANUAL = new ReconnectionPolicy() { + @Override + public @NonNull Delay getNextDelay(@NonNull ReconnectionContext context) { + return Delay.NEVER; + } + + @Override + public Void initContext() { + return null; + } + }; +} diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java index f67a1b8b..f6f7c67e 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java @@ -18,6 +18,7 @@ import java.net.URISyntaxException; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -48,7 +49,7 @@ class WsStateAwaiting { } @Slf4j -public class WsProvider implements ProviderInterface, AutoCloseable { +public class WsProvider implements ProviderInterface, AutoCloseable { private static final int RESUBSCRIBE_TIMEOUT = 20; private static final Map ALIASES = new HashMap<>(); private static final ScheduledExecutorService timedOutHandlerCleaner; @@ -69,33 +70,33 @@ public class WsProvider implements ProviderInterface, AutoCloseable { private final Map waitingForId = new ConcurrentHashMap<>(); private final int heartbeatInterval; private final long responseTimeoutInMs; - private volatile int autoConnectMs; + private final AtomicReference reconnectionPolicyContext = new AtomicReference<>(); + private volatile ReconnectionPolicy reconnectionPolicy; private volatile WebSocketClient webSocket = null; private volatile CompletableFuture whenConnected = null; private volatile CompletableFuture whenDisconnected = null; private volatile ProviderStatus status = ProviderStatus.DISCONNECTED; + private final ScheduledExecutorService reconnector = Executors.newSingleThreadScheduledExecutor(); WsProvider(@NonNull URI endpoint, - int autoConnectMs, Map headers, int heartbeatInterval, - long responseTimeoutInMs) { + long responseTimeoutInMs, + @NonNull ReconnectionPolicy reconnectionPolicy) { Preconditions.checkArgument( endpoint.getScheme().matches("(?i)ws|wss"), "Endpoint should start with 'ws://', received " + endpoint); - Preconditions.checkArgument( - autoConnectMs >= 0, - "AutoConnect delay cannot be less than 0"); this.endpoint = endpoint; - this.autoConnectMs = autoConnectMs; this.headers = headers; this.heartbeatInterval = heartbeatInterval; this.responseTimeoutInMs = responseTimeoutInMs; + this.reconnectionPolicy = reconnectionPolicy; + this.reconnectionPolicyContext.set(reconnectionPolicy.initContext()); } - public static Builder builder() { - return new Builder(); + public static Builder builder() { + return new Builder<>(); } /** @@ -193,7 +194,7 @@ public synchronized CompletableFuture disconnect() { this.whenDisconnected = whenDisconnectedFuture; // switch off autoConnect, we are in manual mode now - this.autoConnectMs = 0; + this.reconnectionPolicy = ReconnectionPolicy.manual(); this.status = ProviderStatus.DISCONNECTING; val ws = this.webSocket; @@ -358,7 +359,7 @@ private synchronized void onSocketClose(int code, String reason) { code, reason); - if (this.autoConnectMs > 0) { + if (this.reconnectionPolicy != ReconnectionPolicy.MANUAL) { log.error(errorMessage); } @@ -377,10 +378,33 @@ private synchronized void onSocketClose(int code, String reason) { this.whenDisconnected = null; } - if (this.autoConnectMs > 0) { - log.info("Trying to reconnect to {}", this.endpoint); - this.connect(); + val whenConnectedFuture = this.whenConnected; + if (whenConnectedFuture != null) { + whenConnectedFuture.completeExceptionally(wsClosedException); + this.whenConnected = null; + } + + if (this.reconnectionPolicy != ReconnectionPolicy.MANUAL) { + scheduleReconnect(code, reason); + } + } + + private void scheduleReconnect(int code, String reason) { + val delay = reconnectionPolicy + .getNextDelay(ReconnectionContext.of(code, + reason, + reconnectionPolicyContext.get())); + if (delay.getValue() < 0) { + return; } + + reconnector.schedule( + () -> { + log.info("Trying to reconnect to {}", this.endpoint); + this.connect(); + }, + delay.getValue(), + delay.getUnit()); } private void onSocketError(Exception ex) { @@ -466,6 +490,7 @@ public synchronized void onSocketOpen() { log.info("Connected to: {}", this.webSocket.getURI()); this.status = ProviderStatus.CONNECTED; + reconnectionPolicyContext.set(reconnectionPolicy.initContext()); this.emit(ProviderInterfaceEmitted.CONNECTED); this.resubscribe(); } @@ -473,6 +498,8 @@ public synchronized void onSocketOpen() { @Override public void close() { try { + reconnector.shutdownNow(); + val currentStatus = this.status; if (currentStatus == ProviderStatus.CONNECTED || currentStatus == ProviderStatus.DISCONNECTING) { this.disconnect(); @@ -515,14 +542,14 @@ private void resubscribe() { } } - public static class Builder implements Supplier { + public static class Builder implements Supplier { private static final String DEFAULT_URI = "ws://127.0.0.1:9944"; private URI endpoint; - private int autoConnectMs = 2500; private Map headers = null; private int heartbeatInterval = 30; private long responseTimeoutInMs = 20000; + private ReconnectionPolicy reconnectionPolicy; Builder() { try { @@ -532,12 +559,12 @@ public static class Builder implements Supplier { } } - public Builder setEndpoint(@NonNull URI endpoint) { + public Builder setEndpoint(@NonNull URI endpoint) { this.endpoint = endpoint; return this; } - public Builder setEndpoint(@NonNull String endpoint) { + public Builder setEndpoint(@NonNull String endpoint) { try { return setEndpoint(new URI(endpoint)); } catch (URISyntaxException ex) { @@ -545,47 +572,46 @@ public Builder setEndpoint(@NonNull String endpoint) { } } - public Builder setAutoConnectDelay(int autoConnectMs) { - this.autoConnectMs = autoConnectMs; - return this; - } - - public Builder disableAutoConnect() { - this.autoConnectMs = 0; - return this; - } - - public Builder setHeaders(Map headers) { + public Builder setHeaders(Map headers) { this.headers = headers; return this; } - public Builder setHeartbeatsInterval(int heartbeatInterval) { + public Builder setHeartbeatsInterval(int heartbeatInterval) { this.heartbeatInterval = heartbeatInterval; return this; } - public Builder disableHeartbeats() { + public Builder disableHeartbeats() { this.heartbeatInterval = 0; return this; } - public Builder setResponseTimeout(long timeout, TimeUnit timeUnit) { + public Builder setResponseTimeout(long timeout, TimeUnit timeUnit) { this.responseTimeoutInMs = timeUnit.toMillis(timeout); return this; } - public WsProvider build() { + @SuppressWarnings({"unchecked", "rawtypes"}) + public Builder withPolicy(ReconnectionPolicy policy) { + this.reconnectionPolicy = (ReconnectionPolicy) policy; + return (Builder) this; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public WsProvider build() { return new WsProvider(this.endpoint, - this.autoConnectMs, this.headers, this.heartbeatInterval, - this.responseTimeoutInMs); + this.responseTimeoutInMs, + this.reconnectionPolicy == null ? + ExponentialBackoffReconnectionPolicy.builder().build() : + this.reconnectionPolicy); } @Override - public WsProvider get() { + public WsProvider get() { return build(); } } -} +} \ No newline at end of file diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/ExponentialBackoffReconnectionPolicyTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/ExponentialBackoffReconnectionPolicyTest.java new file mode 100644 index 00000000..ef6ccf2f --- /dev/null +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/ExponentialBackoffReconnectionPolicyTest.java @@ -0,0 +1,45 @@ +package com.strategyobject.substrateclient.transport.ws; + +import lombok.val; +import lombok.var; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ExponentialBackoffReconnectionPolicyTest { + @ParameterizedTest + @CsvSource({ + "-1,10,20,SECONDS,5,1.5,7", + "10,5,10,SECONDS,10,2,10", + "20,5,100,SECONDS,10,2,3", + "10,10,10,MINUTES,10,2,10", + "5,100,5,MILLISECONDS,10,2,10", + "25,100,100,MILLISECONDS,3,0.5,3"}) + void getNextDelay(long expected, + long initialDelay, + long maxDelay, + TimeUnit unit, + int maxAttempts, + double factor, + int iterations) { + val policy = ExponentialBackoffReconnectionPolicy.builder() + .retryAfter(initialDelay, unit) + .withFactor(factor) + .withMaxDelay(maxDelay) + .notMoreThan(maxAttempts) + .build(); + val context = policy.initContext(); + + for (var i = 0; i < iterations - 1; i++) { + policy.getNextDelay(ReconnectionContext.of(-1, "some", context)); + } + + val delay = policy.getNextDelay(ReconnectionContext.of(-1, "some", context)); + + assertEquals(iterations, context.intValue()); + assertEquals(expected, delay.getValue()); + } +} \ No newline at end of file diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java index 6c3ee395..c79f0a1b 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java @@ -2,6 +2,7 @@ import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; +import com.strategyobject.substrateclient.transport.ProviderInterfaceEmitted; import eu.rekawek.toxiproxy.model.ToxicDirection; import lombok.SneakyThrows; import lombok.val; @@ -13,10 +14,13 @@ import java.util.Map; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.jupiter.api.Assertions.*; @Testcontainers @@ -32,6 +36,7 @@ class WsProviderProxyTest { .withNetwork(network) .withNetworkAliases("toxiproxy"); private static final int HEARTBEAT_INTERVAL = 5; + private static final int RECONNECT_INTERVAL = 5; private static final int WAIT_TIMEOUT = HEARTBEAT_INTERVAL * 3; final ToxiproxyContainer.ContainerProxy proxy = toxiproxy.getProxy(substrate, 9944); @@ -41,6 +46,10 @@ void canReconnect() { try (val wsProvider = WsProvider.builder() .setEndpoint(getWsAddress()) .setHeartbeatsInterval(HEARTBEAT_INTERVAL) + .withPolicy(ReconnectionPolicy.exponentialBackoff() + .retryAfter(RECONNECT_INTERVAL, TimeUnit.SECONDS) + .withMaxDelay(RECONNECT_INTERVAL) + .build()) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -53,7 +62,41 @@ void canReconnect() { proxy.setConnectionCut(false); await() - .atMost(WAIT_TIMEOUT, TimeUnit.SECONDS) + .atMost(RECONNECT_INTERVAL * 2, TimeUnit.SECONDS) + .until(wsProvider::isConnected); + } + } + + @Test + @SneakyThrows + void canReconnectWhenConnectionWasClosedForALongPeriod() { + try (val wsProvider = WsProvider.builder() + .setEndpoint(getWsAddress()) + .setHeartbeatsInterval(HEARTBEAT_INTERVAL) + .withPolicy(ReconnectionPolicy.exponentialBackoff() + .retryAfter(RECONNECT_INTERVAL, TimeUnit.SECONDS) + .withMaxDelay(RECONNECT_INTERVAL) + .build()) + .build()) { + + val disconnectionCounter = new AtomicInteger(0); + wsProvider.on(ProviderInterfaceEmitted.DISCONNECTED, i -> disconnectionCounter.incrementAndGet()); + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + + val CLOSE_AFTER = 1000; + val toxic = proxy + .toxics() + .timeout("timeout", ToxicDirection.DOWNSTREAM, CLOSE_AFTER); + await() + .atLeast(CLOSE_AFTER, TimeUnit.MILLISECONDS) + .atMost(HEARTBEAT_INTERVAL + RECONNECT_INTERVAL * 4, TimeUnit.SECONDS) + .untilAtomic(disconnectionCounter, greaterThan(2)); + assertFalse(wsProvider.isConnected()); + + toxic.remove(); + await() + .atMost(RECONNECT_INTERVAL * 2, TimeUnit.SECONDS) .until(wsProvider::isConnected); } } @@ -61,21 +104,28 @@ void canReconnect() { @Test @SneakyThrows void canAutoConnectWhenServerAvailable() { - proxy.setConnectionCut(true); + val closed = proxy + .toxics() + .limitData("closed", ToxicDirection.DOWNSTREAM, 0); try (val wsProvider = WsProvider.builder() .setEndpoint(getWsAddress()) .disableHeartbeats() + .withPolicy(ReconnectionPolicy.exponentialBackoff() + .retryAfter(RECONNECT_INTERVAL, TimeUnit.SECONDS) + .withMaxDelay(RECONNECT_INTERVAL) + .build()) .build()) { - assertThrows( - TimeoutException.class, + val exception = assertThrows( + ExecutionException.class, () -> wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS)); + assertTrue(exception.getCause() instanceof WsClosedException); assertFalse(wsProvider.isConnected()); - proxy.setConnectionCut(false); + closed.remove(); await() - .atMost(WAIT_TIMEOUT, TimeUnit.SECONDS) + .atMost(RECONNECT_INTERVAL * 2, TimeUnit.SECONDS) .until(wsProvider::isConnected); } } @@ -85,7 +135,7 @@ void canAutoConnectWhenServerAvailable() { void throwsExceptionWhenCanNotSendRequestAndCleanHandler() { try (val wsProvider = WsProvider.builder() .setEndpoint(getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -113,7 +163,7 @@ void throwsExceptionWhenResponseTimeoutAndCleanHandler() { try (val wsProvider = WsProvider.builder() .setEndpoint(getWsAddress()) .setResponseTimeout(responseTimeout, TimeUnit.MILLISECONDS) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -135,7 +185,7 @@ void throwsExceptionWhenResponseTimeoutAndCleanHandler() { } } - private Map getHandlersOf(WsProvider wsProvider) throws NoSuchFieldException, IllegalAccessException { + private Map getHandlersOf(WsProvider wsProvider) throws NoSuchFieldException, IllegalAccessException { val handlersFields = wsProvider.getClass().getDeclaredField("handlers"); handlersFields.setAccessible(true); diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java index 2591a0ac..d75bcd49 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java @@ -30,7 +30,7 @@ class WsProviderTest { void canConnect() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { assertDoesNotThrow(() -> wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS)); @@ -55,7 +55,7 @@ void connectReturnsSameFutureWhenCalledMultiple() { void notifiesWhenConnected() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { val notified = new AtomicBoolean(); @@ -74,7 +74,7 @@ void notifiesWhenConnected() { void canCancelNotification() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { val notified = new AtomicBoolean(); @@ -91,7 +91,7 @@ void canCancelNotification() { void canDisconnect() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -122,7 +122,7 @@ void disconnectReturnsSameFutureWhenCalledMultiple() { void notifiesWhenDisconnected() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -143,7 +143,7 @@ void notifiesWhenDisconnected() { void canSend() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -159,7 +159,7 @@ void canSend() { void canSubscribe() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -183,7 +183,7 @@ void canSubscribe() { void canUnsubscribe() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -210,7 +210,7 @@ void canUnsubscribe() { void sendThrowsRpcExceptions() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); @@ -225,7 +225,7 @@ void sendThrowsRpcExceptions() { void supportsSubscriptions() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { assertTrue(wsProvider.hasSubscriptions()); @@ -237,7 +237,7 @@ void supportsSubscriptions() { void canReconnectManually() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) - .disableAutoConnect() + .withPolicy(ReconnectionPolicy.MANUAL) .build()) { wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);