Skip to content

Commit 5e5e6a8

Browse files
committed
okhttp: Use a real socket during server transport testing
The PipeSocket was convenient and avoided real I/O, but the shutdown/close while connecting/handshaking tests were triggering a Socket bug in Java (https://bugs.openjdk.org/browse/JDK-8278326). Using a real socket doesn't trigger the bug because the test stops sharing state with the code under test. Fixes #10228 ``` Details ================== WARNING: ThreadSanitizer: data race (pid=4528) Write of size 1 at 0x0000cfb9d5f4 by thread T36 (mutexes: write M0): #0 java.net.Socket.setCreated()V Socket.java:687 #1 java.net.AbstractPlainSocketImpl.create(Z)V AbstractPlainSocketImpl.java:149 #2 java.net.Socket.createImpl(Z)V Socket.java:477 #3 java.net.Socket.getImpl()Ljava/net/SocketImpl; Socket.java:540 #4 java.net.Socket.setTcpNoDelay(Z)V Socket.java:998 #5 io.grpc.okhttp.OkHttpServerTransport.startIo(Lio/grpc/internal/SerializingExecutor;)V OkHttpServerTransport.java:164 #6 io.grpc.okhttp.OkHttpServerTransport.lambda$start$0(Lio/grpc/internal/SerializingExecutor;)V OkHttpServerTransport.java:159 #7 io.grpc.okhttp.OkHttpServerTransport$$Lambda$56.run()V ?? #8 io.grpc.internal.SerializingExecutor.run()V SerializingExecutor.java:133 #9 java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V ThreadPoolExecutor.java:1130 #10 java.util.concurrent.ThreadPoolExecutor$Worker.run()V ThreadPoolExecutor.java:630 #11 java.lang.Thread.run()V Thread.java:830 #12 (Generated Stub) <null> Previous read of size 1 at 0x0000cfb9d5f4 by thread T35 (mutexes: write M1, write M2): #0 java.net.Socket.close()V Socket.java:1512 #1 io.grpc.okhttp.OkHttpServerTransportTest$PipeSocket.close()V OkHttpServerTransportTest.java:1384 #2 io.grpc.okhttp.OkHttpServerTransportTest.clientCloseDuringHandshake()V OkHttpServerTransportTest.java:290 ```
1 parent 0f2c43a commit 5e5e6a8

File tree

1 file changed

+47
-61
lines changed

1 file changed

+47
-61
lines changed

okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java

Lines changed: 47 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,9 @@
5656
import java.io.IOException;
5757
import java.io.InputStream;
5858
import java.io.OutputStream;
59-
import java.io.PipedInputStream;
60-
import java.io.PipedOutputStream;
6159
import java.net.InetSocketAddress;
60+
import java.net.ServerSocket;
6261
import java.net.Socket;
63-
import java.net.SocketAddress;
6462
import java.util.ArrayDeque;
6563
import java.util.Arrays;
6664
import java.util.Deque;
@@ -69,6 +67,7 @@
6967
import java.util.concurrent.Executor;
7068
import java.util.concurrent.ExecutorService;
7169
import java.util.concurrent.Executors;
70+
import java.util.concurrent.Future;
7271
import java.util.concurrent.TimeUnit;
7372
import okio.Buffer;
7473
import okio.BufferedSource;
@@ -96,14 +95,14 @@ public class OkHttpServerTransportTest {
9695
private ServerTransportListener transportListener
9796
= mock(ServerTransportListener.class, delegatesTo(mockTransportListener));
9897
private OkHttpServerTransport serverTransport;
99-
private final PipeSocket socket = new PipeSocket();
98+
private final ExecutorService threadPool = Executors.newCachedThreadPool();
99+
private final SocketPair socketPair = SocketPair.create(threadPool);
100100
private final FrameWriter clientFrameWriter
101-
= new Http2().newWriter(Okio.buffer(Okio.sink(socket.inputStreamSource)), true);
101+
= new Http2().newWriter(Okio.buffer(Okio.sink(socketPair.getClientOutputStream())), true);
102102
private final FrameReader clientFrameReader
103-
= new Http2().newReader(Okio.buffer(Okio.source(socket.outputStreamSink)), true);
103+
= new Http2().newReader(Okio.buffer(Okio.source(socketPair.getClientInputStream())), true);
104104
private final FrameReader.Handler clientFramesRead = mock(FrameReader.Handler.class);
105105
private final DataFrameHandler clientDataFrames = mock(DataFrameHandler.class);
106-
private ExecutorService threadPool = Executors.newCachedThreadPool();
107106
private HandshakerSocketFactory handshakerSocketFactory
108107
= mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory()));
109108
private final FakeClock fakeClock = new FakeClock();
@@ -142,7 +141,11 @@ public void setUp() throws Exception {
142141
@After
143142
public void tearDown() throws Exception {
144143
threadPool.shutdownNow();
145-
socket.closeSourceAndSink();
144+
try {
145+
socketPair.client.close();
146+
} finally {
147+
socketPair.server.close();
148+
}
146149
}
147150

148151
@Test
@@ -172,7 +175,7 @@ public void maxConnectionAge() throws Exception {
172175
verifyGracefulShutdown(1);
173176
pingPong();
174177
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(3));
175-
assertThat(socket.isClosed()).isTrue();
178+
assertThat(socketPair.server.isClosed()).isTrue();
176179
}
177180

178181
@Test
@@ -254,7 +257,7 @@ public void startThenShutdownTwice() throws Exception {
254257
@Test
255258
public void shutdownDuringHandshake() throws Exception {
256259
doAnswer(invocation -> {
257-
socket.getInputStream().read();
260+
((Socket) invocation.getArguments()[0]).getInputStream().read();
258261
throw new IOException("handshake purposefully failed");
259262
}).when(handshakerSocketFactory).handshake(any(Socket.class), any(Attributes.class));
260263
serverBuilder.transportExecutor(threadPool);
@@ -268,7 +271,7 @@ public void shutdownDuringHandshake() throws Exception {
268271
@Test
269272
public void shutdownNowDuringHandshake() throws Exception {
270273
doAnswer(invocation -> {
271-
socket.getInputStream().read();
274+
((Socket) invocation.getArguments()[0]).getInputStream().read();
272275
throw new IOException("handshake purposefully failed");
273276
}).when(handshakerSocketFactory).handshake(any(Socket.class), any(Attributes.class));
274277
serverBuilder.transportExecutor(threadPool);
@@ -282,12 +285,12 @@ public void shutdownNowDuringHandshake() throws Exception {
282285
@Test
283286
public void clientCloseDuringHandshake() throws Exception {
284287
doAnswer(invocation -> {
285-
socket.getInputStream().read();
288+
((Socket) invocation.getArguments()[0]).getInputStream().read();
286289
throw new IOException("handshake purposefully failed");
287290
}).when(handshakerSocketFactory).handshake(any(Socket.class), any(Attributes.class));
288291
serverBuilder.transportExecutor(threadPool);
289292
initTransport();
290-
socket.close();
293+
socketPair.client.close();
291294

292295
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
293296
verify(transportListener, never()).transportReady(any(Attributes.class));
@@ -296,7 +299,7 @@ public void clientCloseDuringHandshake() throws Exception {
296299
@Test
297300
public void closeDuringHttp2Preface() throws Exception {
298301
initTransport();
299-
socket.close();
302+
socketPair.client.close();
300303

301304
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
302305
verify(transportListener, never()).transportReady(any(Attributes.class));
@@ -307,7 +310,7 @@ public void noSettingsDuringHttp2HandshakeSettings() throws Exception {
307310
initTransport();
308311
clientFrameWriter.connectionPreface();
309312
clientFrameWriter.flush();
310-
socket.close();
313+
socketPair.client.close();
311314

312315
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
313316
verify(transportListener, never()).transportReady(any(Attributes.class));
@@ -329,7 +332,7 @@ public void startThenClientDisconnect() throws Exception {
329332
initTransport();
330333
handshake();
331334

332-
socket.closeSourceAndSink();
335+
socketPair.client.close();
333336
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
334337
}
335338

@@ -1086,8 +1089,8 @@ public void channelzStats() throws Exception {
10861089
assertThat(stats.data.messagesReceived).isEqualTo(0);
10871090
assertThat(stats.data.remoteFlowControlWindow).isEqualTo(30000); // Lower bound
10881091
assertThat(stats.data.localFlowControlWindow).isEqualTo(66535);
1089-
assertThat(stats.local).isEqualTo(new InetSocketAddress("127.0.0.1", 4000));
1090-
assertThat(stats.remote).isEqualTo(new InetSocketAddress("127.0.0.2", 5000));
1092+
assertThat(stats.local).isEqualTo(socketPair.server.getLocalSocketAddress());
1093+
assertThat(stats.remote).isEqualTo(socketPair.server.getRemoteSocketAddress());
10911094
}
10921095

10931096
@Test
@@ -1188,7 +1191,7 @@ public void keepAliveEnforcer_noticesActive() throws Exception {
11881191
private void initTransport() throws Exception {
11891192
serverTransport = new OkHttpServerTransport(
11901193
new OkHttpServerTransport.Config(serverBuilder, Arrays.asList()),
1191-
socket);
1194+
socketPair.server);
11921195
serverTransport.start(transportListener);
11931196
}
11941197

@@ -1357,61 +1360,44 @@ static String getContent(InputStream message) throws IOException {
13571360
}
13581361
}
13591362

1360-
private static class PipeSocket extends Socket {
1361-
private final PipedOutputStream outputStream = new PipedOutputStream();
1362-
private final PipedInputStream outputStreamSink = new PipedInputStream();
1363-
private final PipedOutputStream inputStreamSource = new PipedOutputStream();
1364-
private final PipedInputStream inputStream = new PipedInputStream();
1363+
private static class SocketPair {
1364+
public final Socket client;
1365+
public final Socket server;
1366+
1367+
public SocketPair(Socket client, Socket server) {
1368+
this.client = client;
1369+
this.server = server;
1370+
}
13651371

1366-
public PipeSocket() {
1372+
public InputStream getClientInputStream() {
13671373
try {
1368-
outputStreamSink.connect(outputStream);
1369-
inputStream.connect(inputStreamSource);
1374+
return client.getInputStream();
13701375
} catch (IOException ex) {
1371-
throw new AssertionError(ex);
1376+
throw new RuntimeException(ex);
13721377
}
13731378
}
13741379

1375-
@Override
1376-
public synchronized void close() throws IOException {
1380+
public OutputStream getClientOutputStream() {
13771381
try {
1378-
outputStream.close();
1379-
} finally {
1380-
inputStream.close();
1381-
// PipedInputStream can only be woken by PipedOutputStream, so PipedOutputStream.close() is
1382-
// a better imitation of Socket.close().
1383-
inputStreamSource.close();
1384-
super.close();
1382+
return client.getOutputStream();
1383+
} catch (IOException ex) {
1384+
throw new RuntimeException(ex);
13851385
}
13861386
}
13871387

1388-
public void closeSourceAndSink() throws IOException {
1388+
public static SocketPair create(ExecutorService threadPool) {
13891389
try {
1390-
outputStreamSink.close();
1391-
} finally {
1392-
inputStreamSource.close();
1390+
try (ServerSocket serverSocket = new ServerSocket(0)) {
1391+
Future<Socket> serverFuture = threadPool.submit(() -> serverSocket.accept());
1392+
Socket client = new Socket();
1393+
client.connect(serverSocket.getLocalSocketAddress());
1394+
Socket server = serverFuture.get();
1395+
return new SocketPair(client, server);
1396+
}
1397+
} catch (Exception ex) {
1398+
throw new RuntimeException(ex);
13931399
}
13941400
}
1395-
1396-
@Override
1397-
public SocketAddress getLocalSocketAddress() {
1398-
return new InetSocketAddress("127.0.0.1", 4000);
1399-
}
1400-
1401-
@Override
1402-
public SocketAddress getRemoteSocketAddress() {
1403-
return new InetSocketAddress("127.0.0.2", 5000);
1404-
}
1405-
1406-
@Override
1407-
public OutputStream getOutputStream() {
1408-
return outputStream;
1409-
}
1410-
1411-
@Override
1412-
public InputStream getInputStream() {
1413-
return inputStream;
1414-
}
14151401
}
14161402

14171403
private interface DataFrameHandler {

0 commit comments

Comments
 (0)