Skip to content

Commit 3499d2f

Browse files
committed
Log unfinished multi-threaded topology recovery tasks
References #370
1 parent f892aaa commit 3499d2f

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
719719
/**
720720
* Get the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
721721
* @return thread pool executor
722+
* @since 4.7.0
722723
*/
723724
public ExecutorService getTopologyRecoveryExecutor() {
724725
return topologyRecoveryExecutor;
@@ -727,8 +728,10 @@ public ExecutorService getTopologyRecoveryExecutor() {
727728
/**
728729
* Set the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
729730
* It is recommended to pass a ThreadPoolExecutor that will allow its core threads to timeout so these threads can die when recovery is complete.
731+
* It's developer's responsibility to shut down the executor when it is no longer needed.
730732
* Note: your {@link ExceptionHandler#handleTopologyRecoveryException(Connection, Channel, TopologyRecoveryException)} method should be thread-safe.
731733
* @param topologyRecoveryExecutor thread pool executor
734+
* @since 4.7.0
732735
*/
733736
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
734737
this.topologyRecoveryExecutor = topologyRecoveryExecutor;

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ExecutorService;
3434
import java.util.concurrent.Executors;
35+
import java.util.concurrent.Future;
3536
import java.util.concurrent.ThreadFactory;
37+
import java.util.concurrent.TimeUnit;
3638
import java.util.concurrent.TimeoutException;
3739
import java.util.concurrent.locks.Lock;
3840
import java.util.concurrent.locks.ReentrantLock;
@@ -638,10 +640,10 @@ private void recoverTopology(final ExecutorService executor) {
638640
// We also need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example
639641
// Note: invokeAll will block until all callables are completed and all returned futures will be complete
640642
try {
641-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
642-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
643-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
644-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
643+
recoverEntitiesAsynchronously(executor, Utility.copy(recordedExchanges).values());
644+
recoverEntitiesAsynchronously(executor, Utility.copy(recordedQueues).values());
645+
recoverEntitiesAsynchronously(executor, Utility.copy(recordedBindings));
646+
recoverEntitiesAsynchronously(executor, Utility.copy(consumers).values());
645647
} catch (final Exception cause) {
646648
final String message = "Caught an exception while recovering toplogy: " + cause.getMessage();
647649
final TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
@@ -748,7 +750,22 @@ private void propagateQueueNameChangeToConsumers(String oldName, String newName)
748750
}
749751
}
750752
}
751-
753+
754+
private void recoverEntitiesAsynchronously(ExecutorService executor, Collection<? extends RecordedEntity> recordedEntities) throws InterruptedException {
755+
List<Future<Object>> tasks = executor.invokeAll(groupEntitiesByChannel(recordedEntities));
756+
for (Future<Object> task : tasks) {
757+
if (!task.isDone()) {
758+
LOGGER.warn("Recovery task should be done {}", task);
759+
} else {
760+
try {
761+
task.get(1, TimeUnit.MILLISECONDS);
762+
} catch (Exception e) {
763+
LOGGER.warn("Recovery task is done but returned an exception", e);
764+
}
765+
}
766+
}
767+
}
768+
752769
private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel(final Collection<E> entities) {
753770
// map entities by channel
754771
final Map<AutorecoveringChannel, List<E>> map = new LinkedHashMap<AutorecoveringChannel, List<E>>();

0 commit comments

Comments
 (0)