Skip to content

Commit f249e71

Browse files
lhaiespjagadish-v0
authored andcommitted
SAMZA-1741: fix issue that EH consumer taking too long to shutdown
1. lower the shutdown timeout from 1 min to 15 seconds 2. make sure EventHubManagers are shutdown in parallel 3. print a thread dump when we do fail during shutdown Author: Hai Lu <[email protected]> Reviewers: Jagadish <[email protected]>, Prateek <[email protected]> Closes apache#548 from lhaiesp/master
1 parent 161d1c4 commit f249e71

File tree

3 files changed

+165
-16
lines changed

3 files changed

+165
-16
lines changed

samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,13 @@
2828
import com.microsoft.azure.eventhubs.impl.ClientConstants;
2929
import java.time.Duration;
3030
import java.time.Instant;
31-
import java.util.ArrayList;
3231
import java.util.List;
3332
import java.util.Map;
3433
import java.util.Set;
3534
import java.util.concurrent.BlockingQueue;
36-
import java.util.concurrent.CompletableFuture;
3735
import java.util.concurrent.ConcurrentHashMap;
38-
import java.util.concurrent.ExecutionException;
3936
import java.util.concurrent.LinkedBlockingQueue;
4037
import java.util.concurrent.TimeUnit;
41-
import java.util.concurrent.TimeoutException;
4238
import java.util.concurrent.atomic.AtomicReference;
4339
import java.util.function.Function;
4440
import java.util.stream.Collectors;
@@ -57,6 +53,7 @@
5753
import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
5854
import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
5955
import org.apache.samza.util.BlockingEnvelopeMap;
56+
import org.apache.samza.util.ShutdownUtil;
6057
import org.slf4j.Logger;
6158
import org.slf4j.LoggerFactory;
6259

@@ -102,7 +99,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
10299

103100
// Overall timeout for EventHubClient exponential backoff policy
104101
private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
105-
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
102+
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofSeconds(15).toMillis();
106103

107104
public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1
108105
public static final String END_OF_STREAM = "-2";
@@ -352,17 +349,32 @@ private void renewPartitionReceiver(SystemStreamPartition ssp) {
352349
@Override
353350
public void stop() {
354351
LOG.info("Stopping event hub system consumer...");
355-
List<CompletableFuture<Void>> futures = new ArrayList<>();
356-
streamPartitionReceivers.values().forEach((receiver) -> futures.add(receiver.close()));
357-
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
358-
try {
359-
future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
360-
} catch (ExecutionException | InterruptedException | TimeoutException e) {
361-
LOG.warn("Failed to close receivers", e);
362-
}
363-
perPartitionEventHubManagers.values()
364-
.parallelStream()
365-
.forEach(ehClientManager -> ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
352+
353+
// There could be potentially many Receivers and EventHubManagers, so close the managers in parallel
354+
LOG.info("Start shutting down eventhubs receivers");
355+
ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver -> new Runnable() {
356+
@Override
357+
public void run() {
358+
try {
359+
receiver.close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
360+
} catch (Exception e) {
361+
LOG.error("Failed to shutdown receiver.", e);
362+
}
363+
}
364+
}).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
365+
366+
LOG.info("Start shutting down eventhubs managers");
367+
ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager -> new Runnable() {
368+
@Override
369+
public void run() {
370+
try {
371+
manager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
372+
} catch (Exception e) {
373+
LOG.error("Failed to shutdown eventhubs manager.", e);
374+
}
375+
}
376+
}).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
377+
366378
perPartitionEventHubManagers.clear();
367379
perStreamEventHubManagers.clear();
368380
isStarted = false;
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.util;
21+
22+
import java.util.List;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
31+
32+
33+
/**
34+
* Shutdown related utils
35+
*/
36+
public class ShutdownUtil {
37+
private static final Logger LOG = LoggerFactory.getLogger(ShutdownUtil.class);
38+
39+
/**
40+
* A helper to facilitate shutting down a set of resources in parallel to enforce a bounded shutdown time.
41+
* The helper function instantiates an {@link ExecutorService} to execute a list of shutdown tasks, and will
42+
* await the termination for given timeout. If shutdown remains unfinished in the end, the whole thread dump
43+
* will be printed to help debugging.
44+
*
45+
* The shutdown is performed with best-effort. Depending on the implementation of the shutdown function, resource
46+
* leak might be possible.
47+
*
48+
* @param shutdownTasks the list of shutdown tasks that need to be executed in parallel
49+
* @param message message that will show in the thread name and the thread dump
50+
* @param timeoutMs timeout in ms
51+
* @return true if all tasks terminate in the end
52+
*/
53+
public static boolean boundedShutdown(List<Runnable> shutdownTasks, String message, long timeoutMs) {
54+
ExecutorService shutdownExecutorService = Executors.newCachedThreadPool(
55+
new ThreadFactoryBuilder().setNameFormat(message + "-%d").setDaemon(true).build());
56+
shutdownTasks.forEach(shutdownExecutorService::submit);
57+
shutdownExecutorService.shutdown();
58+
try {
59+
shutdownExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
60+
} catch (InterruptedException e) {
61+
LOG.error("Shutdown was interrupted for " + message, e);
62+
}
63+
64+
if (shutdownExecutorService.isTerminated()) {
65+
LOG.info("Shutdown complete for {}", message);
66+
return true;
67+
} else {
68+
LOG.error("Shutdown function for {} remains unfinished after timeout({}ms) or interruption", message, timeoutMs);
69+
Util.logThreadDump(message);
70+
shutdownExecutorService.shutdownNow();
71+
return false;
72+
}
73+
}
74+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.util;
21+
22+
import java.time.Duration;
23+
import java.util.Collections;
24+
25+
import org.junit.Assert;
26+
import org.junit.Test;
27+
28+
29+
public class TestShutdownUtil {
30+
@Test
31+
public void testBoundedShutdown() throws Exception {
32+
long longTimeout = Duration.ofSeconds(60).toMillis();
33+
long shortTimeout = Duration.ofMillis(100).toMillis();
34+
35+
Runnable shortRunnable = () -> {
36+
try {
37+
Thread.sleep(shortTimeout);
38+
} catch (Exception e) {
39+
Assert.fail(e.getMessage());
40+
}
41+
};
42+
long start = System.currentTimeMillis();
43+
Assert.assertTrue("expect the shutdown task to terminate",
44+
ShutdownUtil.boundedShutdown(Collections.singletonList(shortRunnable), "testLongTimeout", longTimeout));
45+
long end = System.currentTimeMillis();
46+
Assert.assertTrue("boundedShutdown should complete if the shutdown function completes earlier",
47+
(end - start) < longTimeout / 2);
48+
49+
Runnable longRunnable = () -> {
50+
try {
51+
Thread.sleep(longTimeout);
52+
} catch (Exception e) {
53+
Assert.fail(e.getMessage());
54+
}
55+
};
56+
start = System.currentTimeMillis();
57+
Assert.assertFalse("expect the shutdown task to be unfinished",
58+
ShutdownUtil.boundedShutdown(Collections.singletonList(longRunnable), "testShortTimeout", shortTimeout));
59+
end = System.currentTimeMillis();
60+
Assert.assertTrue("boundedShutdown should complete even if the shutdown function takes long time",
61+
(end - start) < longTimeout / 2);
62+
}
63+
}

0 commit comments

Comments
 (0)