Skip to content

Commit c8cdbfa

Browse files
committed
HADOOP-19569. S3A: stream write/close fails badly once FS is closed
Executors in hadoop-common to - pick up shutdown of inner executor and shut themselves down. - semaphore executor to decrement counters in this process so that queue state is updated - semaphored delegating executor unit test in common This stops callers being able to submit work when the inner executor has shut down. WriteOperationHelper * make all calls through its callback interface, rather than given a ref to S3AFS. * Move WriteOperationHelper callbacks to S3Store layer, Multipart IO operations * move nearly all Multpart IO operationss out of s3afs and into a new mulitpart service interface and impl * Multipart service retrieved and invoked as appropriate * StoreImpl stores a map of ServiceName -> service. with a lookupService() method in S3AStore interface, it's possible to retrieve services through the API just by knowing their name and type * registering all current services this way StoreImpl to IllegalStateException on method invocation whene the service isn't running. Some methods are kept open as they do seem needed.
1 parent c4abdd7 commit c8cdbfa

32 files changed

+1576
-878
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance(
130130
slower than enqueueing. */
131131
final BlockingQueue<Runnable> workQueue =
132132
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
133+
final InnerExecutorRejection rejection = new InnerExecutorRejection();
133134
ThreadPoolExecutor eventProcessingExecutor =
134135
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
135136
workQueue, newDaemonThreadFactory(prefixName),
136-
new RejectedExecutionHandler() {
137-
@Override
138-
public void rejectedExecution(Runnable r,
139-
ThreadPoolExecutor executor) {
140-
// This is not expected to happen.
141-
LOG.error("Could not submit task to executor {}",
142-
executor.toString());
143-
}
144-
});
137+
rejection);
145138
eventProcessingExecutor.allowCoreThreadTimeOut(true);
146-
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
147-
eventProcessingExecutor);
139+
final BlockingThreadPoolExecutorService service =
140+
new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
141+
eventProcessingExecutor);
142+
rejection.setDelegate((r, executor) -> {
143+
service.shutdown();
144+
});
145+
146+
return service;
148147
}
149148

150149
/**
@@ -164,4 +163,28 @@ public String toString() {
164163
.append('}');
165164
return sb.toString();
166165
}
166+
167+
private static class InnerExecutorRejection implements RejectedExecutionHandler {
168+
169+
private RejectedExecutionHandler delegate;
170+
171+
private RejectedExecutionHandler getDelegate() {
172+
return delegate;
173+
}
174+
175+
private void setDelegate(final RejectedExecutionHandler delegate) {
176+
this.delegate = delegate;
177+
}
178+
179+
@Override
180+
public void rejectedExecution(Runnable r,
181+
ThreadPoolExecutor executor) {
182+
// This is not expected to happen.
183+
LOG.error("Could not submit task to executor {}",
184+
executor.toString());
185+
if (getDelegate() != null) {
186+
delegate.rejectedExecution(r, executor);
187+
}
188+
}
189+
}
167190
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.concurrent.ExecutorService;
3333
import java.util.concurrent.Future;
34+
import java.util.concurrent.RejectedExecutionException;
3435
import java.util.concurrent.Semaphore;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
@@ -127,6 +128,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
127128

128129
@Override
129130
public <T> Future<T> submit(Callable<T> task) {
131+
rejectWhenShutdown();
130132
try (DurationTracker ignored =
131133
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
132134
queueingPermits.acquire();
@@ -139,6 +141,7 @@ public <T> Future<T> submit(Callable<T> task) {
139141

140142
@Override
141143
public <T> Future<T> submit(Runnable task, T result) {
144+
rejectWhenShutdown();
142145
try (DurationTracker ignored =
143146
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
144147
queueingPermits.acquire();
@@ -151,6 +154,7 @@ public <T> Future<T> submit(Runnable task, T result) {
151154

152155
@Override
153156
public Future<?> submit(Runnable task) {
157+
rejectWhenShutdown();
154158
try (DurationTracker ignored =
155159
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
156160
queueingPermits.acquire();
@@ -163,6 +167,7 @@ public Future<?> submit(Runnable task) {
163167

164168
@Override
165169
public void execute(Runnable command) {
170+
rejectWhenShutdown();
166171
try (DurationTracker ignored =
167172
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
168173
queueingPermits.acquire();
@@ -208,6 +213,16 @@ public String toString() {
208213
return sb.toString();
209214
}
210215

216+
/**
217+
* Raise an exception if invoked when the executor is shut down.
218+
* @throws RejectedExecutionException if the executor is shut down.
219+
*/
220+
private void rejectWhenShutdown() throws RejectedExecutionException{
221+
if (isShutdown()) {
222+
throw new RejectedExecutionException("ExecutorService is shutdown");
223+
}
224+
}
225+
211226
/**
212227
* Releases a permit after the task is executed.
213228
*/
@@ -222,6 +237,7 @@ class RunnableWithPermitRelease implements Runnable {
222237
@Override
223238
public void run() {
224239
try {
240+
rejectWhenShutdown();
225241
delegatee.run();
226242
} finally {
227243
queueingPermits.release();
@@ -244,6 +260,7 @@ class CallableWithPermitRelease<T> implements Callable<T> {
244260
@Override
245261
public T call() throws Exception {
246262
try {
263+
rejectWhenShutdown();
247264
return delegatee.call();
248265
} finally {
249266
queueingPermits.release();
Lines changed: 63 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,71 +16,77 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hadoop.fs.s3a;
20-
21-
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
22-
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
23-
import org.apache.hadoop.util.StopWatch;
24-
25-
import org.junit.AfterClass;
26-
import org.junit.Rule;
27-
import org.junit.Test;
28-
import org.junit.rules.Timeout;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
19+
package org.apache.hadoop.util;
3120

3221
import java.util.concurrent.Callable;
3322
import java.util.concurrent.CountDownLatch;
3423
import java.util.concurrent.ExecutorService;
3524
import java.util.concurrent.Future;
25+
import java.util.concurrent.RejectedExecutionException;
3626
import java.util.concurrent.TimeUnit;
3727

38-
import static org.junit.Assert.assertEquals;
28+
import org.assertj.core.api.Assertions;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import org.apache.hadoop.test.AbstractHadoopTestBase;
36+
37+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3938

4039
/**
41-
* Basic test for S3A's blocking executor service.
40+
* Test for the blocking executor service.
4241
*/
43-
public class ITestBlockingThreadPoolExecutorService {
42+
public class TestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {
4443

4544
private static final Logger LOG = LoggerFactory.getLogger(
46-
ITestBlockingThreadPoolExecutorService.class);
45+
TestBlockingThreadPoolExecutorService.class);
4746

4847
private static final int NUM_ACTIVE_TASKS = 4;
48+
4949
private static final int NUM_WAITING_TASKS = 2;
50+
5051
private static final int TASK_SLEEP_MSEC = 100;
52+
5153
private static final int SHUTDOWN_WAIT_MSEC = 200;
54+
5255
private static final int SHUTDOWN_WAIT_TRIES = 5;
56+
5357
private static final int BLOCKING_THRESHOLD_MSEC = 50;
5458

5559
private static final Integer SOME_VALUE = 1337;
5660

57-
private static BlockingThreadPoolExecutorService tpe;
61+
private BlockingThreadPoolExecutorService tpe;
62+
5863

59-
@Rule
60-
public Timeout testTimeout = new Timeout(60, TimeUnit.SECONDS);
64+
@BeforeEach
65+
public void setup() throws Exception {
66+
ensureCreated();
67+
}
6168

62-
@AfterClass
63-
public static void afterClass() throws Exception {
69+
@AfterEach
70+
public void teardown() throws Exception {
6471
ensureDestroyed();
6572
}
6673

74+
6775
/**
6876
* Basic test of running one trivial task.
6977
*/
7078
@Test
7179
public void testSubmitCallable() throws Exception {
72-
ensureCreated();
7380
Future<Integer> f = tpe.submit(callableSleeper);
7481
Integer v = f.get();
75-
assertEquals(SOME_VALUE, v);
82+
Assertions.assertThat(v).isEqualTo(SOME_VALUE);
7683
}
7784

7885
/**
7986
* More involved test, including detecting blocking when at capacity.
8087
*/
8188
@Test
8289
public void testSubmitRunnable() throws Exception {
83-
ensureCreated();
8490
verifyQueueSize(tpe, NUM_ACTIVE_TASKS + NUM_WAITING_TASKS);
8591
}
8692

@@ -102,27 +108,30 @@ protected void verifyQueueSize(ExecutorService executorService,
102108
assertDidBlock(stopWatch);
103109
}
104110

105-
@Test
106-
public void testShutdown() throws Exception {
107-
// Cover create / destroy, regardless of when this test case runs
108-
ensureCreated();
109-
ensureDestroyed();
110-
111-
// Cover create, execute, destroy, regardless of when test case runs
112-
ensureCreated();
113-
testSubmitRunnable();
114-
ensureDestroyed();
115-
}
116-
117111
@Test
118112
public void testChainedQueue() throws Throwable {
119-
ensureCreated();
120113
int size = 2;
121114
ExecutorService wrapper = new SemaphoredDelegatingExecutor(tpe,
122115
size, true);
123116
verifyQueueSize(wrapper, size);
124117
}
125118

119+
@Test
120+
public void testShutdownQueueRejectsOperations() throws Throwable {
121+
tpe.shutdown();
122+
Assertions.assertThat(tpe.isShutdown())
123+
.describedAs("%s should be shutdown", tpe)
124+
.isTrue();
125+
// runnable
126+
intercept(RejectedExecutionException.class, () ->
127+
tpe.submit(failToRun));
128+
// callable
129+
intercept(RejectedExecutionException.class, () ->
130+
tpe.submit(() -> 0));
131+
intercept(RejectedExecutionException.class, () ->
132+
tpe.execute(failToRun));
133+
}
134+
126135
// Helper functions, etc.
127136

128137
private void assertDidBlock(StopWatch sw) {
@@ -135,28 +144,27 @@ private void assertDidBlock(StopWatch sw) {
135144
}
136145
}
137146

138-
private Runnable sleeper = new Runnable() {
139-
@Override
140-
public void run() {
141-
String name = Thread.currentThread().getName();
142-
try {
143-
Thread.sleep(TASK_SLEEP_MSEC);
144-
} catch (InterruptedException e) {
145-
LOG.info("Thread {} interrupted.", name);
146-
Thread.currentThread().interrupt();
147-
}
148-
}
147+
private Runnable failToRun = () -> {
148+
throw new RuntimeException("This runnable raises and exception");
149149
};
150150

151-
private Callable<Integer> callableSleeper = new Callable<Integer>() {
152-
@Override
153-
public Integer call() throws Exception {
154-
sleeper.run();
155-
return SOME_VALUE;
151+
private Runnable sleeper = () -> {
152+
String name = Thread.currentThread().getName();
153+
try {
154+
Thread.sleep(TASK_SLEEP_MSEC);
155+
} catch (InterruptedException e) {
156+
LOG.info("Thread {} interrupted.", name);
157+
Thread.currentThread().interrupt();
156158
}
157159
};
158160

161+
private Callable<Integer> callableSleeper = () -> {
162+
sleeper.run();
163+
return SOME_VALUE;
164+
};
165+
159166
private class LatchedSleeper implements Runnable {
167+
160168
private final CountDownLatch latch;
161169

162170
LatchedSleeper(CountDownLatch latch) {
@@ -178,7 +186,7 @@ public void run() {
178186
/**
179187
* Helper function to create thread pool under test.
180188
*/
181-
private static void ensureCreated() throws Exception {
189+
private void ensureCreated() throws Exception {
182190
if (tpe == null) {
183191
LOG.debug("Creating thread pool");
184192
tpe = BlockingThreadPoolExecutorService.newInstance(
@@ -191,7 +199,7 @@ private static void ensureCreated() throws Exception {
191199
* Helper function to terminate thread pool under test, asserting that
192200
* shutdown -> terminate works as expected.
193201
*/
194-
private static void ensureDestroyed() throws Exception {
202+
private void ensureDestroyed() throws Exception {
195203
if (tpe == null) {
196204
return;
197205
}

0 commit comments

Comments
 (0)