diff --git a/src/main/java/com/rabbitmq/client/TopologyRecoveryException.java b/src/main/java/com/rabbitmq/client/TopologyRecoveryException.java index bdd8b7f807..bb8163a096 100644 --- a/src/main/java/com/rabbitmq/client/TopologyRecoveryException.java +++ b/src/main/java/com/rabbitmq/client/TopologyRecoveryException.java @@ -15,6 +15,8 @@ package com.rabbitmq.client; +import com.rabbitmq.client.impl.recovery.RecordedEntity; + /** * Indicates an exception thrown during topology recovery. * @@ -22,7 +24,19 @@ * @since 3.3.0 */ public class TopologyRecoveryException extends Exception { + + private final RecordedEntity recordedEntity; + public TopologyRecoveryException(String message, Throwable cause) { + this(message, cause, null); + } + + public TopologyRecoveryException(String message, Throwable cause, final RecordedEntity recordedEntity) { super(message, cause); + this.recordedEntity = recordedEntity; + } + + public RecordedEntity getRecordedEntity() { + return recordedEntity; } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 52e100ad57..97cacc81b0 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -900,7 +900,11 @@ private void recordConsumer(String result, this.connection.recordConsumer(result, consumer); } - private void deleteRecordedConsumer(String consumerTag) { + /** + * Delete the recorded consumer from this channel and accompanying connection + * @param consumerTag consumer tag to delete + */ + public void deleteRecordedConsumer(String consumerTag) { this.consumerTags.remove(consumerTag); RecordedConsumer c = this.connection.deleteRecordedConsumer(consumerTag); if (c != null) { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index f217bd4dc4..98d5a4d610 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -143,7 +143,7 @@ private void setupErrorOnWriteListenerForPotentialRecovery() { }); } - private TopologyRecoveryFilter letAllPassFilter() { + private static TopologyRecoveryFilter letAllPassFilter() { return new TopologyRecoveryFilter() {}; } @@ -644,7 +644,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) { } } - void recoverChannel(AutorecoveringChannel channel) throws IOException { + public void recoverChannel(AutorecoveringChannel channel) throws IOException { channel.automaticallyRecover(this, this.delegate); } @@ -666,6 +666,38 @@ private void notifyTopologyRecoveryListenersStarted() { } } + /** + * Recover a closed channel and all topology (i.e. RecordedEntities) associated to it. + * Any errors will be sent to the {@link #getExceptionHandler()}. + * @param channel channel to recover + * @throws IllegalArgumentException if this channel is not owned by this connection + */ + public void recoverChannelAndTopology(final AutorecoveringChannel channel) { + if (!channels.containsValue(channel)) { + throw new IllegalArgumentException("This channel is not owned by this connection"); + } + try { + LOGGER.debug("Recovering channel={}", channel); + recoverChannel(channel); + LOGGER.debug("Recovered channel={}. Now recovering its topology", channel); + Utility.copy(recordedExchanges).values().stream() + .filter(e -> e.getChannel() == channel) + .forEach(e -> recoverExchange(e, false)); + Utility.copy(recordedQueues).values().stream() + .filter(q -> q.getChannel() == channel) + .forEach(q -> recoverQueue(q.getName(), q, false)); + Utility.copy(recordedBindings).stream() + .filter(b -> b.getChannel() == channel) + .forEach(b -> recoverBinding(b, false)); + Utility.copy(consumers).values().stream() + .filter(c -> c.getChannel() == channel) + .forEach(c -> recoverConsumer(c.getConsumerTag(), c, false)); + LOGGER.debug("Recovered topology for channel={}", channel); + } catch (Exception e) { + getExceptionHandler().handleChannelRecoveryException(channel, e); + } + } + private void recoverTopology(final ExecutorService executor) { // The recovery sequence is the following: // 1. Recover exchanges @@ -704,7 +736,7 @@ private void recoverTopology(final ExecutorService executor) { } } - private void recoverExchange(RecordedExchange x, boolean retry) { + public void recoverExchange(RecordedExchange x, boolean retry) { // recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK. try { if (topologyRecoveryFilter.filterExchange(x)) { @@ -722,7 +754,7 @@ private void recoverExchange(RecordedExchange x, boolean retry) { } catch (Exception cause) { final String message = "Caught an exception while recovering exchange " + x.getName() + ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause, x); this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e); } } @@ -766,12 +798,12 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) { } catch (Exception cause) { final String message = "Caught an exception while recovering queue " + oldName + ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause, q); this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e); } } - private void recoverBinding(RecordedBinding b, boolean retry) { + public void recoverBinding(RecordedBinding b, boolean retry) { try { if (this.topologyRecoveryFilter.filterBinding(b)) { if (retry) { @@ -788,7 +820,7 @@ private void recoverBinding(RecordedBinding b, boolean retry) { } catch (Exception cause) { String message = "Caught an exception while recovering binding between " + b.getSource() + " and " + b.getDestination() + ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause, b); this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e); } } @@ -800,7 +832,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean String newTag = null; if (retry) { final RecordedConsumer entity = consumer; - RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover()); + RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover); consumer = (RecordedConsumer) retryResult.getRecordedEntity(); newTag = (String) retryResult.getResult(); } else { @@ -824,7 +856,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean } catch (Exception cause) { final String message = "Caught an exception while recovering consumer " + tag + ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause, consumer); this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e); } } @@ -889,14 +921,10 @@ private void recoverEntitiesAsynchronously(ExecutorService executor, Collection< private List> groupEntitiesByChannel(final Collection entities) { // map entities by channel - final Map> map = new LinkedHashMap>(); + final Map> map = new LinkedHashMap<>(); for (final E entity : entities) { final AutorecoveringChannel channel = entity.getChannel(); - List list = map.get(channel); - if (list == null) { - map.put(channel, list = new ArrayList()); - } - list.add(entity); + map.computeIfAbsent(channel, c -> new ArrayList<>()).add(entity); } // now create a runnable per channel final List> callables = new ArrayList<>(); @@ -1083,7 +1111,7 @@ boolean hasMoreConsumersOnQueue(Collection consumers, String q } Set removeBindingsWithDestination(String s) { - final Set result = new HashSet(); + final Set result = new LinkedHashSet<>(); synchronized (this.recordedBindings) { for (Iterator it = this.recordedBindings.iterator(); it.hasNext(); ) { RecordedBinding b = it.next(); diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java index dc55fc7ed4..2e2890c05b 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java @@ -40,19 +40,19 @@ public class DefaultRetryHandler implements RetryHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRetryHandler.class); - private final BiPredicate queueRecoveryRetryCondition; - private final BiPredicate exchangeRecoveryRetryCondition; - private final BiPredicate bindingRecoveryRetryCondition; - private final BiPredicate consumerRecoveryRetryCondition; + protected final BiPredicate queueRecoveryRetryCondition; + protected final BiPredicate exchangeRecoveryRetryCondition; + protected final BiPredicate bindingRecoveryRetryCondition; + protected final BiPredicate consumerRecoveryRetryCondition; - private final RetryOperation queueRecoveryRetryOperation; - private final RetryOperation exchangeRecoveryRetryOperation; - private final RetryOperation bindingRecoveryRetryOperation; - private final RetryOperation consumerRecoveryRetryOperation; + protected final RetryOperation queueRecoveryRetryOperation; + protected final RetryOperation exchangeRecoveryRetryOperation; + protected final RetryOperation bindingRecoveryRetryOperation; + protected final RetryOperation consumerRecoveryRetryOperation; - private final int retryAttempts; + protected final int retryAttempts; - private final BackoffPolicy backoffPolicy; + protected final BackoffPolicy backoffPolicy; public DefaultRetryHandler(BiPredicate queueRecoveryRetryCondition, BiPredicate exchangeRecoveryRetryCondition, diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java index bed71f9f0b..b8dfdff7bc 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java @@ -30,19 +30,19 @@ */ public class TopologyRecoveryRetryHandlerBuilder { - private BiPredicate queueRecoveryRetryCondition = (q, e) -> false; - private BiPredicate exchangeRecoveryRetryCondition = (ex, e) -> false; - private BiPredicate bindingRecoveryRetryCondition = (b, e) -> false; - private BiPredicate consumerRecoveryRetryCondition = (c, e) -> false; + protected BiPredicate queueRecoveryRetryCondition = (q, e) -> false; + protected BiPredicate exchangeRecoveryRetryCondition = (ex, e) -> false; + protected BiPredicate bindingRecoveryRetryCondition = (b, e) -> false; + protected BiPredicate consumerRecoveryRetryCondition = (c, e) -> false; - private DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation = context -> null; - private DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation = context -> null; - private DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation = context -> null; - private DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation = context -> null; + protected DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation = context -> null; + protected DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation = context -> null; + protected DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation = context -> null; + protected DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation = context -> null; - private int retryAttempts = 2; + protected int retryAttempts = 2; - private BackoffPolicy backoffPolicy = nbAttempts -> { + protected BackoffPolicy backoffPolicy = nbAttempts -> { }; public static TopologyRecoveryRetryHandlerBuilder builder() { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java index 94c347f7fa..b93ba7a647 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -18,7 +18,6 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.utility.Utility; - import java.util.function.BiPredicate; import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; @@ -55,6 +54,18 @@ public abstract class TopologyRecoveryRetryLogic { } return null; }; + + /** + * Recover a queue + */ + public static final DefaultRetryHandler.RetryOperation RECOVER_QUEUE = context -> { + if (context.entity() instanceof RecordedQueue) { + final RecordedQueue recordedQueue = context.queue(); + AutorecoveringConnection connection = context.connection(); + connection.recoverQueue(recordedQueue.getName(), recordedQueue, false); + } + return null; + }; /** * Recover the destination queue of a binding. @@ -138,18 +149,52 @@ public abstract class TopologyRecoveryRetryLogic { * Recover a consumer. */ public static final DefaultRetryHandler.RetryOperation RECOVER_CONSUMER = context -> context.consumer().recover(); + + /** + * Recover earlier consumers that share the same channel as this retry context + */ + public static final DefaultRetryHandler.RetryOperation RECOVER_PREVIOUS_CONSUMERS = context -> { + if (context.entity() instanceof RecordedConsumer) { + // recover all consumers for the same channel that were recovered before this current + // consumer. need to do this incase some consumers had already been recovered + // successfully on a different queue before this one failed + final AutorecoveringChannel channel = context.consumer().getChannel(); + for (RecordedConsumer consumer : Utility.copy(context.connection().getRecordedConsumers()).values()) { + if (consumer == context.entity()) { + break; + } else if (consumer.getChannel() == channel) { + final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection()); + RECOVER_CONSUMER_QUEUE.call(retryContext); + consumer.recover(); + RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext); + } + } + } + return null; + }; /** * Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers * when their respective queue is not found. + * * This retry handler can be useful for long recovery processes, whereby auto-delete queues * can be deleted between queue recovery and binding/consumer recovery. + * + * Also useful to retry channel-closed 404 errors that may arise with auto-delete queues during a cluster cycle. */ public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder() + .queueRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) .bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) .consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) - .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING) + .queueRecoveryRetryOperation(RECOVER_CHANNEL + .andThen(RECOVER_QUEUE)) + .bindingRecoveryRetryOperation(RECOVER_CHANNEL + .andThen(RECOVER_BINDING_QUEUE) + .andThen(RECOVER_BINDING) .andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)) - .consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER) - .andThen(RECOVER_CONSUMER_QUEUE_BINDINGS))); + .consumerRecoveryRetryOperation(RECOVER_CHANNEL + .andThen(RECOVER_CONSUMER_QUEUE) + .andThen(RECOVER_CONSUMER) + .andThen(RECOVER_CONSUMER_QUEUE_BINDINGS) + .andThen(RECOVER_PREVIOUS_CONSUMERS)); }