Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
Expand All @@ -26,10 +28,18 @@

public class LoadbalanceTest {

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() throws Exception {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -76,21 +86,28 @@ public Mono<Void> fireAndForget(Payload payload) {
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void shouldDeliverAllTheRequestsWithWightedStrategy() {
Hooks.onErrorDropped((__) -> {});

public void shouldDeliverAllTheRequestsWithWeightedStrategy() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);

Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
.then(im -> Mono.just(new TestRSocket(new WeightedRSocket(counter))));
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
final ClientTransport mockTransport2 = Mockito.mock(ClientTransport.class);

final LoadbalanceTarget target1 = LoadbalanceTarget.from("1", mockTransport1);
final LoadbalanceTarget target2 = LoadbalanceTarget.from("2", mockTransport2);

final WeightedRSocket weightedRSocket1 = new WeightedRSocket(counter);
final WeightedRSocket weightedRSocket2 = new WeightedRSocket(counter);

final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Mockito.when(rSocketConnectorMock.connect(mockTransport1))
.then(im -> Mono.just(new TestRSocket(weightedRSocket1)));
Mockito.when(rSocketConnectorMock.connect(mockTransport2))
.then(im -> Mono.just(new TestRSocket(weightedRSocket2)));

for (int i = 0; i < 1000; i++) {
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
Expand All @@ -99,42 +116,39 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
rSocketConnectorMock,
source,
WeightedLoadbalanceStrategy.builder()
.weightedStatsResolver(r -> (WeightedStats) r)
.weightedStatsResolver(
rsocket ->
((PooledRSocket) rsocket).target() == target1
? weightedRSocket1
: weightedRSocket2)
.build());

RaceTestUtils.race(
() -> {
for (int j = 0; j < 1000; j++) {
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
.retry()
.subscribe();
.subscribe(aVoid -> {}, Throwable::printStackTrace);
}
},
() -> {
for (int j = 0; j < 100; j++) {
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(
Arrays.asList(
LoadbalanceTarget.from("1", mockTransport),
LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target1));
source.next(Arrays.asList(target1, target2)).next(Collections.singletonList(target1));
source.next(Collections.singletonList(target2));
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target2));
}
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void ensureRSocketIsCleanedFromThePoolIfSourceRSocketIsDisposed() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -179,8 +193,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void ensureContextIsPropagatedCorrectlyForRequestChannel() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Hooks;
Expand All @@ -19,10 +21,18 @@

public class RoundRobinLoadbalanceStrategyTest {

@Test
public void shouldDeliverValuesProportionally() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverValuesProportionally() {
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -71,8 +81,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void shouldDeliverValuesToNewlyConnectedSockets() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -104,7 +112,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));

Expand All @@ -114,7 +122,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(2000, Offset.offset(1));

Expand All @@ -130,7 +138,7 @@ public Mono<Void> fireAndForget(Payload payload) {
Assertions.assertThat(counter1.get()).isCloseTo(2500, Offset.offset(1));
Assertions.assertThat(counter2.get()).isCloseTo(500, Offset.offset(1));

source.next(Arrays.asList(LoadbalanceTarget.from("2", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));

for (int j = 0; j < 1000; j++) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
Expand Down