Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {

allprojects {
group = 'com.strategyobject.substrateclient'
version = '0.1.2-SNAPSHOT'
version = '0.1.3-SNAPSHOT'

repositories {
mavenLocal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.strategyobject.substrateclient.scale.ScaleWriter;
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.val;
import org.junit.jupiter.api.Test;
Expand All @@ -29,7 +30,7 @@ class StorageDoubleMapImplTests {
void societyVotes() throws Exception {
try (val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build()) {
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.strategyobject.substrateclient.scale.ScaleWriter;
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.val;
import org.junit.jupiter.api.Test;
Expand All @@ -29,7 +30,7 @@ class StorageMapImplTests {
void systemBlockHash() throws Exception {
try (val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build()) {
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ProviderInterface;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.NonNull;
import lombok.val;
Expand Down Expand Up @@ -56,7 +57,7 @@ private static StorageNMapImpl<BlockHash> newSystemBlockHashStorage(State state)
private WsProvider getConnectedProvider() throws InterruptedException, ExecutionException, TimeoutException {
val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build();
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
return wsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.strategyobject.substrateclient.rpc.api.section.State;
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.val;
import org.junit.jupiter.api.Test;
Expand All @@ -33,7 +34,7 @@ void sudoKey() throws Exception {

try (val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build()) {
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);

Expand All @@ -58,7 +59,7 @@ void sudoKeyAtGenesis() throws Exception {

try (val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build()) {
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
val state = TestsHelper.createSectionFactory(wsProvider).create(State.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.strategyobject.substrateclient.scale.ScaleWriter;
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.val;
import lombok.var;
Expand Down Expand Up @@ -121,7 +122,7 @@ void submitAndWatchExtrinsic() throws Exception {
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build();

wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.strategyobject.substrateclient.rpc.api.BlockNumber;
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.val;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -134,7 +135,7 @@ void getCurrentBlock() throws ExecutionException, InterruptedException, TimeoutE
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build();

wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.strategyobject.substrateclient.rpc.api.StorageKey;
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.val;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -176,7 +177,7 @@ void queryStorageAt() throws Exception {
private WsProvider connect() throws Exception {
val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build();

wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.strategyobject.substrateclient.rpc.api.Index;
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
import com.strategyobject.substrateclient.transport.ws.WsProvider;
import lombok.val;
import org.junit.jupiter.api.Test;
Expand All @@ -28,7 +29,7 @@ class SystemTests {
void accountNextIndex() throws Exception {
try (val wsProvider = WsProvider.builder()
.setEndpoint(substrate.getWsAddress())
.disableAutoConnect()
.withPolicy(ReconnectionPolicy.MANUAL)
.build()) {
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.strategyobject.substrateclient.transport.ws;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.concurrent.TimeUnit;

/**
* Represents a delay
*/
@RequiredArgsConstructor(staticName = "of")
@Getter
public class Delay {
/**
* The time to delay execution unit
*/
private final long value;

/**
* The time unit of the delay parameter
*/
private final TimeUnit unit;

/**
* A delay that should never be scheduled
*/
public static final Delay NEVER = Delay.of(-1, TimeUnit.SECONDS);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.strategyobject.substrateclient.transport.ws;

import com.google.common.base.Preconditions;
import lombok.*;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

/**
* Represents an exponential backoff retry policy
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@Slf4j
public class ExponentialBackoffReconnectionPolicy implements ReconnectionPolicy<LongAdder> {
/**
* Max number of attempts
*/
private final long maxAttempts;
/**
* Initial delay, the time to delay execution unit
*/
private final long delay;
/**
* The time unit of the delay parameter
*/
private final TimeUnit unit;
/**
* Max delay
*/
private final long maxDelay;
/**
* A multiplier that's applied to delay after every attempt
*/
private final double factor;

/**
* @param context contains a reason of disconnection and counter of attempts
* @return a unit of time to delay the next reconnection
*/
@Override
public @NonNull Delay getNextDelay(@NonNull ReconnectionContext<LongAdder> context) {
try {
if (context.getPolicyContext().longValue() >= maxAttempts) {
log.info("Provider won't reconnect more.");

return Delay.NEVER;
}

var nextDelay = delay * Math.pow(factor, context.getPolicyContext().longValue());
nextDelay = Math.min(nextDelay, maxDelay);

log.info("Provider will try to reconnect after: {} {}", nextDelay, unit);
return Delay.of((long) nextDelay, unit);
} finally {
context.getPolicyContext().increment();
}
}

/**
* Returns the counter of attempts
*/
@Override
public LongAdder initContext() {
return new LongAdder();
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private long delay = 15;
private TimeUnit unit = TimeUnit.SECONDS;
private long maxDelay = 150;
private long maxAttempts = 10;
private double factor = 2;

Builder() {
}

public Builder retryAfter(long delay, TimeUnit unit) {
Preconditions.checkArgument(delay >= 0);

this.delay = delay;
this.unit = unit;

return this;
}

public Builder withFactor(double factor) {
Preconditions.checkArgument(factor > 0);

this.factor = factor;
return this;
}

public Builder withMaxDelay(long maxDelay) {
Preconditions.checkArgument(maxDelay >= 0);

this.maxDelay = maxDelay;
return this;
}

public Builder notMoreThan(long maxAttempts) {
Preconditions.checkArgument(maxAttempts >= 0);

this.maxAttempts = maxAttempts;
return this;
}

public ExponentialBackoffReconnectionPolicy build() {
return new ExponentialBackoffReconnectionPolicy(
this.maxAttempts,
this.delay,
this.unit,
this.maxDelay,
this.factor
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.strategyobject.substrateclient.transport.ws;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Represents a context why connection was closed
*
* @param <T> a type of policy's context required for
* computing the next delay or other policy's purposes
*/
@RequiredArgsConstructor(staticName = "of")
@Getter
public class ReconnectionContext<T> {
/**
* The code of the reason of disconnection
*/
private final int code;

/**
* The text of the reason
*/
private final String reason;

/**
* The policy's context
*/
private final T policyContext;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.strategyobject.substrateclient.transport.ws;

import lombok.NonNull;

/**
* @param <T> a type of policy's context required for
* computing the next delay or other policy's purposes
* Represents a strategy of reconnection
*/
public interface ReconnectionPolicy<T> {

/**
* The method is called when connection was closed and probably should be reconnected.
* @param context contains a reason of disconnection and policy's context.
* @return a unit of time from now to delay reconnection.
*/
@NonNull Delay getNextDelay(@NonNull ReconnectionContext<T> context);

/**
* The method is called before the first connection or when the one successfully reestablished.
* @return a context required for the policy.
*/
T initContext();

/**
* @return the builder of ExponentialBackoffReconnectionPolicy
*/
static ExponentialBackoffReconnectionPolicy.Builder exponentialBackoff() {
return ExponentialBackoffReconnectionPolicy.builder();
}

/**
* @param <T> the type of context
* @return the policy that's supposed to not reconnect automatically
*/
@SuppressWarnings("unchecked")
static <T> ReconnectionPolicy<T> manual() {
return (ReconnectionPolicy<T>) MANUAL;
}

/**
* The policy that's supposed to not reconnect automatically
*/
ReconnectionPolicy<?> MANUAL = new ReconnectionPolicy<Void>() {
@Override
public @NonNull Delay getNextDelay(@NonNull ReconnectionContext<Void> context) {
return Delay.NEVER;
}

@Override
public Void initContext() {
return null;
}
};
}
Loading