Skip to content

Commit 66c9067

Browse files
committed
okhttp: Use a real socket during server transport testing
The PipeSocket was convenient, but we've uncovered Socket bugs in Java like https://bugs.openjdk.org/browse/JDK-8278326 that make us really want to use real sockets. PipeSocket itself was triggering threading problems because it was triggering the Socket race and couldn't be fixed without using legitimate sockets.
1 parent 0f2c43a commit 66c9067

File tree

1 file changed

+47
-58
lines changed

1 file changed

+47
-58
lines changed

okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java

Lines changed: 47 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.io.PipedInputStream;
6060
import java.io.PipedOutputStream;
6161
import java.net.InetSocketAddress;
62+
import java.net.ServerSocket;
6263
import java.net.Socket;
6364
import java.net.SocketAddress;
6465
import java.util.ArrayDeque;
@@ -69,6 +70,7 @@
6970
import java.util.concurrent.Executor;
7071
import java.util.concurrent.ExecutorService;
7172
import java.util.concurrent.Executors;
73+
import java.util.concurrent.Future;
7274
import java.util.concurrent.TimeUnit;
7375
import okio.Buffer;
7476
import okio.BufferedSource;
@@ -96,14 +98,14 @@ public class OkHttpServerTransportTest {
9698
private ServerTransportListener transportListener
9799
= mock(ServerTransportListener.class, delegatesTo(mockTransportListener));
98100
private OkHttpServerTransport serverTransport;
99-
private final PipeSocket socket = new PipeSocket();
101+
private final ExecutorService threadPool = Executors.newCachedThreadPool();
102+
private final SocketPair socketPair = SocketPair.create(threadPool);
100103
private final FrameWriter clientFrameWriter
101-
= new Http2().newWriter(Okio.buffer(Okio.sink(socket.inputStreamSource)), true);
104+
= new Http2().newWriter(Okio.buffer(Okio.sink(socketPair.getClientOutputStream())), true);
102105
private final FrameReader clientFrameReader
103-
= new Http2().newReader(Okio.buffer(Okio.source(socket.outputStreamSink)), true);
106+
= new Http2().newReader(Okio.buffer(Okio.source(socketPair.getClientInputStream())), true);
104107
private final FrameReader.Handler clientFramesRead = mock(FrameReader.Handler.class);
105108
private final DataFrameHandler clientDataFrames = mock(DataFrameHandler.class);
106-
private ExecutorService threadPool = Executors.newCachedThreadPool();
107109
private HandshakerSocketFactory handshakerSocketFactory
108110
= mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory()));
109111
private final FakeClock fakeClock = new FakeClock();
@@ -142,7 +144,11 @@ public void setUp() throws Exception {
142144
@After
143145
public void tearDown() throws Exception {
144146
threadPool.shutdownNow();
145-
socket.closeSourceAndSink();
147+
try {
148+
socketPair.client.close();
149+
} finally {
150+
socketPair.server.close();
151+
}
146152
}
147153

148154
@Test
@@ -172,7 +178,7 @@ public void maxConnectionAge() throws Exception {
172178
verifyGracefulShutdown(1);
173179
pingPong();
174180
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(3));
175-
assertThat(socket.isClosed()).isTrue();
181+
assertThat(socketPair.server.isClosed()).isTrue();
176182
}
177183

178184
@Test
@@ -254,7 +260,7 @@ public void startThenShutdownTwice() throws Exception {
254260
@Test
255261
public void shutdownDuringHandshake() throws Exception {
256262
doAnswer(invocation -> {
257-
socket.getInputStream().read();
263+
((Socket) invocation.getArguments()[0]).getInputStream().read();
258264
throw new IOException("handshake purposefully failed");
259265
}).when(handshakerSocketFactory).handshake(any(Socket.class), any(Attributes.class));
260266
serverBuilder.transportExecutor(threadPool);
@@ -268,7 +274,7 @@ public void shutdownDuringHandshake() throws Exception {
268274
@Test
269275
public void shutdownNowDuringHandshake() throws Exception {
270276
doAnswer(invocation -> {
271-
socket.getInputStream().read();
277+
((Socket) invocation.getArguments()[0]).getInputStream().read();
272278
throw new IOException("handshake purposefully failed");
273279
}).when(handshakerSocketFactory).handshake(any(Socket.class), any(Attributes.class));
274280
serverBuilder.transportExecutor(threadPool);
@@ -282,12 +288,12 @@ public void shutdownNowDuringHandshake() throws Exception {
282288
@Test
283289
public void clientCloseDuringHandshake() throws Exception {
284290
doAnswer(invocation -> {
285-
socket.getInputStream().read();
291+
((Socket) invocation.getArguments()[0]).getInputStream().read();
286292
throw new IOException("handshake purposefully failed");
287293
}).when(handshakerSocketFactory).handshake(any(Socket.class), any(Attributes.class));
288294
serverBuilder.transportExecutor(threadPool);
289295
initTransport();
290-
socket.close();
296+
socketPair.client.close();
291297

292298
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
293299
verify(transportListener, never()).transportReady(any(Attributes.class));
@@ -296,7 +302,7 @@ public void clientCloseDuringHandshake() throws Exception {
296302
@Test
297303
public void closeDuringHttp2Preface() throws Exception {
298304
initTransport();
299-
socket.close();
305+
socketPair.client.close();
300306

301307
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
302308
verify(transportListener, never()).transportReady(any(Attributes.class));
@@ -307,7 +313,7 @@ public void noSettingsDuringHttp2HandshakeSettings() throws Exception {
307313
initTransport();
308314
clientFrameWriter.connectionPreface();
309315
clientFrameWriter.flush();
310-
socket.close();
316+
socketPair.client.close();
311317

312318
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
313319
verify(transportListener, never()).transportReady(any(Attributes.class));
@@ -329,7 +335,7 @@ public void startThenClientDisconnect() throws Exception {
329335
initTransport();
330336
handshake();
331337

332-
socket.closeSourceAndSink();
338+
socketPair.client.close();
333339
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
334340
}
335341

@@ -1086,8 +1092,8 @@ public void channelzStats() throws Exception {
10861092
assertThat(stats.data.messagesReceived).isEqualTo(0);
10871093
assertThat(stats.data.remoteFlowControlWindow).isEqualTo(30000); // Lower bound
10881094
assertThat(stats.data.localFlowControlWindow).isEqualTo(66535);
1089-
assertThat(stats.local).isEqualTo(new InetSocketAddress("127.0.0.1", 4000));
1090-
assertThat(stats.remote).isEqualTo(new InetSocketAddress("127.0.0.2", 5000));
1095+
assertThat(stats.local).isEqualTo(socketPair.server.getLocalSocketAddress());
1096+
assertThat(stats.remote).isEqualTo(socketPair.server.getRemoteSocketAddress());
10911097
}
10921098

10931099
@Test
@@ -1188,7 +1194,7 @@ public void keepAliveEnforcer_noticesActive() throws Exception {
11881194
private void initTransport() throws Exception {
11891195
serverTransport = new OkHttpServerTransport(
11901196
new OkHttpServerTransport.Config(serverBuilder, Arrays.asList()),
1191-
socket);
1197+
socketPair.server);
11921198
serverTransport.start(transportListener);
11931199
}
11941200

@@ -1357,61 +1363,44 @@ static String getContent(InputStream message) throws IOException {
13571363
}
13581364
}
13591365

1360-
private static class PipeSocket extends Socket {
1361-
private final PipedOutputStream outputStream = new PipedOutputStream();
1362-
private final PipedInputStream outputStreamSink = new PipedInputStream();
1363-
private final PipedOutputStream inputStreamSource = new PipedOutputStream();
1364-
private final PipedInputStream inputStream = new PipedInputStream();
1366+
private static class SocketPair {
1367+
public final Socket client;
1368+
public final Socket server;
1369+
1370+
public SocketPair(Socket client, Socket server) {
1371+
this.client = client;
1372+
this.server = server;
1373+
}
13651374

1366-
public PipeSocket() {
1375+
public InputStream getClientInputStream() {
13671376
try {
1368-
outputStreamSink.connect(outputStream);
1369-
inputStream.connect(inputStreamSource);
1377+
return client.getInputStream();
13701378
} catch (IOException ex) {
1371-
throw new AssertionError(ex);
1379+
throw new RuntimeException(ex);
13721380
}
13731381
}
13741382

1375-
@Override
1376-
public synchronized void close() throws IOException {
1383+
public OutputStream getClientOutputStream() {
13771384
try {
1378-
outputStream.close();
1379-
} finally {
1380-
inputStream.close();
1381-
// PipedInputStream can only be woken by PipedOutputStream, so PipedOutputStream.close() is
1382-
// a better imitation of Socket.close().
1383-
inputStreamSource.close();
1384-
super.close();
1385+
return client.getOutputStream();
1386+
} catch (IOException ex) {
1387+
throw new RuntimeException(ex);
13851388
}
13861389
}
13871390

1388-
public void closeSourceAndSink() throws IOException {
1391+
public static SocketPair create(ExecutorService threadPool) {
13891392
try {
1390-
outputStreamSink.close();
1391-
} finally {
1392-
inputStreamSource.close();
1393+
try (ServerSocket serverSocket = new ServerSocket(0)) {
1394+
Future<Socket> serverFuture = threadPool.submit(() -> serverSocket.accept());
1395+
Socket client = new Socket();
1396+
client.connect(serverSocket.getLocalSocketAddress());
1397+
Socket server = serverFuture.get();
1398+
return new SocketPair(client, server);
1399+
}
1400+
} catch (Exception ex) {
1401+
throw new RuntimeException(ex);
13931402
}
13941403
}
1395-
1396-
@Override
1397-
public SocketAddress getLocalSocketAddress() {
1398-
return new InetSocketAddress("127.0.0.1", 4000);
1399-
}
1400-
1401-
@Override
1402-
public SocketAddress getRemoteSocketAddress() {
1403-
return new InetSocketAddress("127.0.0.2", 5000);
1404-
}
1405-
1406-
@Override
1407-
public OutputStream getOutputStream() {
1408-
return outputStream;
1409-
}
1410-
1411-
@Override
1412-
public InputStream getInputStream() {
1413-
return inputStream;
1414-
}
14151404
}
14161405

14171406
private interface DataFrameHandler {

0 commit comments

Comments
 (0)