|
35 | 35 | import java.util.Arrays; |
36 | 36 | import java.util.Collection; |
37 | 37 | import java.util.List; |
| 38 | +import java.util.concurrent.TimeoutException; |
38 | 39 |
|
39 | 40 | import org.apache.commons.logging.Log; |
40 | 41 | import org.apache.commons.logging.LogFactory; |
|
47 | 48 | import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; |
48 | 49 | import org.apache.hadoop.test.GenericTestUtils; |
49 | 50 | import org.junit.After; |
| 51 | +import org.junit.AssumptionViolatedException; |
50 | 52 | import org.junit.Before; |
51 | 53 | import org.junit.runner.RunWith; |
52 | 54 | import org.junit.runners.Parameterized; |
@@ -84,6 +86,7 @@ public HttpURLConnection configure(HttpURLConnection conn) throws IOException { |
84 | 86 | return conn; |
85 | 87 | } |
86 | 88 | }); |
| 89 | + private volatile boolean failedToConsumeBacklog; |
87 | 90 |
|
88 | 91 | public enum TimeoutSource { ConnectionFactory, Configuration }; |
89 | 92 |
|
@@ -122,6 +125,7 @@ public void setUp() throws Exception { |
122 | 125 |
|
123 | 126 | clients = new ArrayList<SocketChannel>(); |
124 | 127 | serverThread = null; |
| 128 | + failedToConsumeBacklog = false; |
125 | 129 | } |
126 | 130 |
|
127 | 131 | @After |
@@ -211,6 +215,7 @@ public void testRedirectConnectTimeout() throws Exception { |
211 | 215 | fs.getFileChecksum(new Path("/file")); |
212 | 216 | fail("expected timeout"); |
213 | 217 | } catch (SocketTimeoutException e) { |
| 218 | + assumeBacklogConsumed(); |
214 | 219 | GenericTestUtils.assertExceptionContains( |
215 | 220 | fs.getUri().getAuthority() + ": connect timed out", e); |
216 | 221 | } |
@@ -244,6 +249,7 @@ public void testTwoStepWriteConnectTimeout() throws Exception { |
244 | 249 | os = fs.create(new Path("/file")); |
245 | 250 | fail("expected timeout"); |
246 | 251 | } catch (SocketTimeoutException e) { |
| 252 | + assumeBacklogConsumed(); |
247 | 253 | GenericTestUtils.assertExceptionContains( |
248 | 254 | fs.getUri().getAuthority() + ": connect timed out", e); |
249 | 255 | } finally { |
@@ -357,6 +363,28 @@ private void consumeConnectionBacklog() throws IOException { |
357 | 363 | client.connect(nnHttpAddress); |
358 | 364 | clients.add(client); |
359 | 365 | } |
| 366 | + try { |
| 367 | + GenericTestUtils.waitFor(() -> { |
| 368 | + try (SocketChannel c = SocketChannel.open()) { |
| 369 | + c.socket().connect(nnHttpAddress, 100); |
| 370 | + } catch (SocketTimeoutException e) { |
| 371 | + return true; |
| 372 | + } catch (IOException e) { |
| 373 | + LOG.debug("unexpected exception: " + e); |
| 374 | + } |
| 375 | + return false; |
| 376 | + }, 100, 10000); |
| 377 | + } catch (TimeoutException | InterruptedException e) { |
| 378 | + failedToConsumeBacklog = true; |
| 379 | + assumeBacklogConsumed(); |
| 380 | + } |
| 381 | + } |
| 382 | + |
| 383 | + private void assumeBacklogConsumed() { |
| 384 | + if (failedToConsumeBacklog) { |
| 385 | + throw new AssumptionViolatedException( |
| 386 | + "failed to fill up connection backlog."); |
| 387 | + } |
360 | 388 | } |
361 | 389 |
|
362 | 390 | /** |
|
0 commit comments