Skip to content

Commit 2b4d1e8

Browse files
authored
Merge pull request #313 from yschimke/ports
use ephemeral ports
2 parents f03f832 + d908730 commit 2b4d1e8

File tree

13 files changed

+62
-78
lines changed

13 files changed

+62
-78
lines changed

rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.junit.After;
4040
import org.junit.Before;
4141
import org.junit.Test;
42-
import org.junit.Ignore;
4342
import reactor.core.publisher.Flux;
4443
import reactor.core.publisher.Mono;
4544

@@ -137,7 +136,6 @@ public void teardown() {
137136
server.close().block();
138137
}
139138

140-
@Ignore
141139
@Test(timeout = 5_000L)
142140
public void testRequest() {
143141
client.requestResponse(new PayloadImpl("REQUEST", "META")).block();
@@ -147,7 +145,6 @@ public void testRequest() {
147145
assertTrue(calledFrame);
148146
}
149147

150-
@Ignore
151148
@Test
152149
public void testStream() throws Exception {
153150
TestSubscriber subscriber = TestSubscriber.create();
@@ -158,7 +155,6 @@ public void testStream() throws Exception {
158155
subscriber.assertNotComplete();
159156
}
160157

161-
@Ignore
162158
@Test(timeout = 5_000L)
163159
public void testClose() throws ExecutionException, InterruptedException, TimeoutException {
164160
client.close().block();

rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.junit.After;
2020
import org.junit.Before;
2121
import org.junit.Test;
22-
import org.junit.Ignore;
2322
import reactor.core.publisher.Flux;
2423
import reactor.core.publisher.Mono;
2524
import reactor.core.publisher.UnicastProcessor;
@@ -57,7 +56,6 @@ public void cleanup() {
5756
server.close().block();
5857
}
5958

60-
@Ignore
6159
@Test(timeout = 5_000L)
6260
public void testCompleteWithoutNext() throws InterruptedException {
6361
handler =
@@ -74,7 +72,6 @@ public Flux<Payload> requestStream(Payload payload) {
7472
assertFalse(hasElements);
7573
}
7674

77-
@Ignore
7875
@Test(timeout = 5_000L)
7976
public void testSingleStream() throws InterruptedException {
8077
handler =
@@ -92,7 +89,6 @@ public Flux<Payload> requestStream(Payload payload) {
9289
assertEquals("RESPONSE", StandardCharsets.UTF_8.decode(result.getData()).toString());
9390
}
9491

95-
@Ignore
9692
@Test(timeout = 5_000L)
9793
public void testZeroPayload() throws InterruptedException {
9894
handler =
@@ -110,7 +106,6 @@ public Flux<Payload> requestStream(Payload payload) {
110106
assertEquals("", StandardCharsets.UTF_8.decode(result.getData()).toString());
111107
}
112108

113-
@Ignore
114109
@Test(timeout = 5_000L)
115110
public void testRequestResponseErrors() throws InterruptedException {
116111
handler =
@@ -145,7 +140,6 @@ public Mono<Payload> requestResponse(Payload payload) {
145140
assertEquals("SUCCESS", StandardCharsets.UTF_8.decode(response2.getData()).toString());
146141
}
147142

148-
@Ignore
149143
@Test(timeout = 5_000L)
150144
public void testTwoConcurrentStreams() throws InterruptedException {
151145
ConcurrentHashMap<String, UnicastProcessor<Payload>> map = new ConcurrentHashMap<>();

rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.rsocket.transport.ServerTransport;
2828
import io.rsocket.util.PayloadImpl;
2929
import java.nio.charset.StandardCharsets;
30+
import java.util.function.BiFunction;
3031
import java.util.function.Function;
3132
import java.util.function.Supplier;
3233
import org.junit.rules.ExternalResource;
@@ -35,34 +36,35 @@
3536
import reactor.core.publisher.Flux;
3637
import reactor.core.publisher.Mono;
3738

38-
public class ClientSetupRule<T> extends ExternalResource {
39+
public class ClientSetupRule<T, S extends Closeable> extends ExternalResource {
3940

4041
private Supplier<T> addressSupplier;
41-
private Function<T, RSocket> clientConnector;
42-
private Function<T, Closeable> serverInit;
42+
private BiFunction<T, S, RSocket> clientConnector;
43+
private Function<T, S> serverInit;
4344

4445
private RSocket client;
4546

4647
public ClientSetupRule(
4748
Supplier<T> addressSupplier,
48-
Function<T, ClientTransport> clientTransportSupplier,
49-
Function<T, ServerTransport<? extends Closeable>> serverTransportSupplier) {
49+
BiFunction<T, S, ClientTransport> clientTransportSupplier,
50+
Function<T, ServerTransport<S>> serverTransportSupplier) {
5051
this.addressSupplier = addressSupplier;
5152

52-
this.clientConnector =
53-
address ->
54-
RSocketFactory.connect()
55-
.transport(clientTransportSupplier.apply(address))
56-
.start()
57-
.doOnError(t -> t.printStackTrace())
58-
.block();
59-
6053
this.serverInit =
6154
address ->
6255
RSocketFactory.receive()
6356
.acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket()))
6457
.transport(serverTransportSupplier.apply(address))
6558
.start()
59+
.map(s -> (S) s) // TODO fix casting
60+
.block();
61+
62+
this.clientConnector =
63+
(address, server) ->
64+
RSocketFactory.connect()
65+
.transport(clientTransportSupplier.apply(address, server))
66+
.start()
67+
.doOnError(t -> t.printStackTrace())
6668
.block();
6769
}
6870

@@ -72,8 +74,8 @@ public Statement apply(Statement base, Description description) {
7274
@Override
7375
public void evaluate() throws Throwable {
7476
T address = addressSupplier.get();
75-
Closeable server = serverInit.apply(address);
76-
client = clientConnector.apply(address);
77+
S server = serverInit.apply(address);
78+
client = clientConnector.apply(address, server);
7779
base.evaluate();
7880
server.close().block();
7981
}

rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/ReactiveStreamsRemote.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package io.rsocket.aeron.internal.reactivestreams;
1717

18+
import io.rsocket.Closeable;
1819
import java.net.SocketAddress;
1920
import java.util.concurrent.TimeUnit;
2021
import java.util.function.Consumer;
2122
import java.util.function.Function;
22-
23-
import io.rsocket.Closeable;
2423
import org.reactivestreams.Publisher;
2524
import reactor.core.publisher.Flux;
2625
import reactor.core.publisher.Mono;

rsocket-transport-aeron/src/main/java/io/rsocket/aeron/server/AeronServerTransport.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@
2626
import io.rsocket.transport.ServerTransport;
2727
import reactor.core.publisher.Mono;
2828

29-
import java.net.SocketAddress;
30-
import java.util.concurrent.TimeUnit;
31-
3229
/** */
33-
public class AeronServerTransport implements ServerTransport {
30+
public class AeronServerTransport implements ServerTransport<Closeable> {
3431
private final AeronWrapper aeronWrapper;
3532
private final AeronSocketAddress managementSubscriptionSocket;
3633
private final EventLoop eventLoop;

rsocket-transport-aeron/src/test/java/io/rsocket/aeron/AeronClientSetupRule.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616

1717
package io.rsocket.aeron;
1818

19+
import io.rsocket.Closeable;
1920
import io.rsocket.aeron.client.AeronClientTransport;
2021
import io.rsocket.aeron.internal.*;
2122
import io.rsocket.aeron.internal.reactivestreams.AeronClientChannelConnector;
2223
import io.rsocket.aeron.internal.reactivestreams.AeronSocketAddress;
2324
import io.rsocket.aeron.server.AeronServerTransport;
2425
import io.rsocket.test.ClientSetupRule;
2526

26-
class AeronClientSetupRule extends ClientSetupRule<AeronSocketAddress> {
27+
class AeronClientSetupRule extends ClientSetupRule<AeronSocketAddress, Closeable> {
2728

2829
public static final AeronSocketAddress ADDRESS =
2930
AeronSocketAddress.create("aeron:udp", "127.0.0.1", 39790);
@@ -56,6 +57,6 @@ class AeronClientSetupRule extends ClientSetupRule<AeronSocketAddress> {
5657
private static final AeronClientTransport client;
5758

5859
AeronClientSetupRule() {
59-
super(() -> ADDRESS, address -> client, address -> server);
60+
super(() -> ADDRESS, (address, server) -> client, address -> server);
6061
}
6162
}

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalUriHandler.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,25 @@
33
import io.rsocket.transport.ClientTransport;
44
import io.rsocket.transport.ServerTransport;
55
import io.rsocket.uri.UriHandler;
6-
76
import java.net.URI;
87
import java.util.Optional;
98

109
public class LocalUriHandler implements UriHandler {
11-
@Override
12-
public Optional<ClientTransport> buildClient(URI uri) {
13-
if (uri.getScheme().equals("local")) {
14-
return Optional.of(LocalClientTransport.create(uri.getSchemeSpecificPart()));
15-
}
16-
17-
return UriHandler.super.buildClient(uri);
10+
@Override
11+
public Optional<ClientTransport> buildClient(URI uri) {
12+
if (uri.getScheme().equals("local")) {
13+
return Optional.of(LocalClientTransport.create(uri.getSchemeSpecificPart()));
1814
}
1915

20-
@Override
21-
public Optional<ServerTransport> buildServer(URI uri) {
22-
if (uri.getScheme().equals("local")) {
23-
return Optional.of(LocalServerTransport.create(uri.getSchemeSpecificPart()));
24-
}
16+
return UriHandler.super.buildClient(uri);
17+
}
2518

26-
return UriHandler.super.buildServer(uri);
19+
@Override
20+
public Optional<ServerTransport> buildServer(URI uri) {
21+
if (uri.getScheme().equals("local")) {
22+
return Optional.of(LocalServerTransport.create(uri.getSchemeSpecificPart()));
2723
}
24+
25+
return UriHandler.super.buildServer(uri);
26+
}
2827
}

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientSetupRule.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,17 @@
1616

1717
package io.rsocket.transport.local;
1818

19-
import io.rsocket.RSocketFactory;
19+
import io.rsocket.Closeable;
2020
import io.rsocket.test.ClientSetupRule;
21-
import io.rsocket.test.TestRSocket;
2221
import java.util.concurrent.atomic.AtomicInteger;
23-
import java.util.function.Supplier;
24-
import reactor.core.publisher.Mono;
2522

26-
public class LocalClientSetupRule extends ClientSetupRule<String> {
23+
public class LocalClientSetupRule extends ClientSetupRule<String, Closeable> {
2724
private static final AtomicInteger uniqueNameGenerator = new AtomicInteger();
2825

2926
public LocalClientSetupRule() {
3027
super(
31-
// This needs to be called twice before it increments
32-
// - once for the client and once for the server
33-
new Supplier<String>() {
34-
boolean increment = true;
35-
36-
@Override
37-
public String get() {
38-
if (increment) {
39-
increment = false;
40-
return "test" + uniqueNameGenerator.incrementAndGet();
41-
} else {
42-
increment = true;
43-
return "test" + uniqueNameGenerator.get();
44-
}
45-
}
46-
},
47-
address -> LocalClientTransport.create(address),
28+
() -> "test" + uniqueNameGenerator.incrementAndGet(),
29+
(address, server) -> LocalClientTransport.create(address),
4830
address -> LocalServerTransport.create(address));
4931
}
5032
}

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalUriTransportRegistryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package io.rsocket.transport.local;
22

3+
import static org.junit.Assert.assertTrue;
4+
35
import io.rsocket.transport.ClientTransport;
46
import io.rsocket.transport.ServerTransport;
57
import io.rsocket.uri.UriTransportRegistry;
68
import org.junit.Test;
79

8-
import static org.junit.Assert.assertTrue;
9-
1010
public class LocalUriTransportRegistryTest {
1111
@Test
1212
public void testLocalClient() {

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.rsocket.DuplexConnection;
2020
import io.rsocket.transport.ClientTransport;
2121
import io.rsocket.transport.netty.WebsocketDuplexConnection;
22+
import java.net.InetSocketAddress;
2223
import java.net.URI;
2324
import reactor.core.publisher.Mono;
2425
import reactor.ipc.netty.http.client.HttpClient;
@@ -42,6 +43,10 @@ public static WebsocketClientTransport create(String bindAddress, int port) {
4243
return create(httpClient, "/");
4344
}
4445

46+
public static WebsocketClientTransport create(InetSocketAddress address) {
47+
return create(address.getHostName(), address.getPort());
48+
}
49+
4550
public static WebsocketClientTransport create(URI uri) {
4651
HttpClient httpClient = createClient(uri);
4752
return create(httpClient, uri.getPath());

0 commit comments

Comments
 (0)