From 34e33ea80b9c71dfc0b2cb929b40e707e6e0fcac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 7 Aug 2018 11:23:21 +0200 Subject: [PATCH] Add optional retry logic to topology recovery There's no topology recovery retry by default. The default implementation is composable: not all have the recoverable entities have to retry and the retry operations don't have to be only the corresponding entity recovery, but also other operations, like recovering the corresponding channel. Fixes #387 --- .../rabbitmq/client/ConnectionFactory.java | 18 ++ .../client/impl/ConnectionParams.java | 9 + .../recovery/AutorecoveringConnection.java | 109 ++++++-- .../client/impl/recovery/BackoffPolicy.java | 34 +++ .../impl/recovery/DefaultRetryHandler.java | 131 +++++++++ .../client/impl/recovery/RetryContext.java | 99 +++++++ .../client/impl/recovery/RetryHandler.java | 62 +++++ .../client/impl/recovery/RetryResult.java | 57 ++++ .../TopologyRecoveryRetryHandlerBuilder.java | 115 ++++++++ .../recovery/TopologyRecoveryRetryLogic.java | 85 ++++++ .../com/rabbitmq/client/test/ClientTests.java | 3 +- .../client/test/DefaultRetryHandlerTest.java | 257 ++++++++++++++++++ .../com/rabbitmq/client/test/TestUtils.java | 120 +++++++- .../test/functional/ConnectionRecovery.java | 36 +-- .../test/functional/FunctionalTests.java | 3 +- .../functional/TopologyRecoveryFiltering.java | 103 +------ .../functional/TopologyRecoveryRetry.java | 70 +++++ src/test/java/com/rabbitmq/tools/Host.java | 4 + 18 files changed, 1160 insertions(+), 155 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java create mode 100644 src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java create mode 100644 src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index a35a55f64d..0816abdde3 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -29,6 +29,7 @@ import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RetryHandler; import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import java.io.IOException; @@ -192,6 +193,13 @@ public class ConnectionFactory implements Cloneable { */ private Predicate connectionRecoveryTriggeringCondition; + /** + * Retry handler for topology recovery. + * Default is no retry. + * @since 5.4.0 + */ + private RetryHandler topologyRecoveryRetryHandler; + /** @return the default host to use for connections */ public String getHost() { return host; @@ -1087,6 +1095,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setErrorOnWriteListener(errorOnWriteListener); result.setTopologyRecoveryFilter(topologyRecoveryFilter); result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition); + result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler); return result; } @@ -1454,4 +1463,13 @@ public void setConnectionRecoveryTriggeringCondition(Predicate connectionRecoveryTriggeringCondition; + private RetryHandler topologyRecoveryRetryHandler; private ExceptionHandler exceptionHandler; private ThreadFactory threadFactory; @@ -257,4 +259,11 @@ public Predicate getConnectionRecoveryTriggeringConditi return connectionRecoveryTriggeringCondition; } + public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) { + this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler; + } + + public RetryHandler getTopologyRecoveryRetryHandler() { + return topologyRecoveryRetryHandler; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 9223cb11aa..c3e66e0d2f 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -95,6 +95,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC private final Predicate connectionRecoveryTriggeringCondition; + private final RetryHandler retryHandler; + public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List
addrs) { this(params, f, new ListAddressResolver(addrs)); } @@ -115,6 +117,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, this.channels = new ConcurrentHashMap<>(); this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ? letAllPassFilter() : params.getTopologyRecoveryFilter(); + + this.retryHandler = params.getTopologyRecoveryRetryHandler(); } private void setupErrorOnWriteListenerForPotentialRecovery() { @@ -125,12 +129,9 @@ private void setupErrorOnWriteListenerForPotentialRecovery() { // we should trigger the error handling and the recovery only once if (errorOnWriteLock.tryLock()) { try { - Thread recoveryThread = threadFactory.newThread(new Runnable() { - @Override - public void run() { - AMQConnection c = (AMQConnection) connection; - c.handleIoError(exception); - } + Thread recoveryThread = threadFactory.newThread(() -> { + AMQConnection c = (AMQConnection) connection; + c.handleIoError(exception); }); recoveryThread.setName("RabbitMQ Error On Write Thread"); recoveryThread.start(); @@ -630,6 +631,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) { } } + void recoverChannel(AutorecoveringChannel channel) throws IOException { + channel.automaticallyRecover(this, this.delegate); + } + private void notifyRecoveryListenersComplete() { for (RecoveryListener f : Utility.copy(this.recoveryListeners)) { f.handleRecovery(this); @@ -651,16 +656,16 @@ private void recoverTopology(final ExecutorService executor) { if (executor == null) { // recover entities in serial on the main connection thread for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) { - recoverExchange(exchange); + recoverExchange(exchange, true); } for (final Map.Entry entry : Utility.copy(recordedQueues).entrySet()) { - recoverQueue(entry.getKey(), entry.getValue()); + recoverQueue(entry.getKey(), entry.getValue(), true); } for (final RecordedBinding b : Utility.copy(recordedBindings)) { - recoverBinding(b); + recoverBinding(b, true); } for (final Map.Entry entry : Utility.copy(consumers).entrySet()) { - recoverConsumer(entry.getKey(), entry.getValue()); + recoverConsumer(entry.getKey(), entry.getValue(), true); } } else { // Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers @@ -680,11 +685,19 @@ private void recoverTopology(final ExecutorService executor) { } } - private void recoverExchange(final RecordedExchange x) { + private void recoverExchange(RecordedExchange x, boolean retry) { // recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK. try { if (topologyRecoveryFilter.filterExchange(x)) { - x.recover(); + if (retry) { + final RecordedExchange entity = x; + x = (RecordedExchange) wrapRetryIfNecessary(x, () -> { + entity.recover(); + return null; + }).getRecordedEntity(); + } else { + x.recover(); + } LOGGER.debug("{} has recovered", x); } } catch (Exception cause) { @@ -695,12 +708,20 @@ private void recoverExchange(final RecordedExchange x) { } } - private void recoverQueue(final String oldName, final RecordedQueue q) { + void recoverQueue(final String oldName, RecordedQueue q, boolean retry) { try { if (topologyRecoveryFilter.filterQueue(q)) { LOGGER.debug("Recovering {}", q); - q.recover(); + if (retry) { + final RecordedQueue entity = q; + q = (RecordedQueue) wrapRetryIfNecessary(q, () -> { + entity.recover(); + return null; + }).getRecordedEntity(); + } else { + q.recover(); + } String newName = q.getName(); if (!oldName.equals(newName)) { // make sure server-named queues are re-added with @@ -731,10 +752,18 @@ private void recoverQueue(final String oldName, final RecordedQueue q) { } } - private void recoverBinding(final RecordedBinding b) { + private void recoverBinding(RecordedBinding b, boolean retry) { try { if (this.topologyRecoveryFilter.filterBinding(b)) { - b.recover(); + if (retry) { + final RecordedBinding entity = b; + b = (RecordedBinding) wrapRetryIfNecessary(b, () -> { + entity.recover(); + return null; + }).getRecordedEntity(); + } else { + b.recover(); + } LOGGER.debug("{} has recovered", b); } } catch (Exception cause) { @@ -745,11 +774,20 @@ private void recoverBinding(final RecordedBinding b) { } } - private void recoverConsumer(final String tag, final RecordedConsumer consumer) { + private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) { try { if (this.topologyRecoveryFilter.filterConsumer(consumer)) { LOGGER.debug("Recovering {}", consumer); - String newTag = consumer.recover(); + String newTag = null; + if (retry) { + final RecordedConsumer entity = consumer; + RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover()); + consumer = (RecordedConsumer) retryResult.getRecordedEntity(); + newTag = (String) retryResult.getResult(); + } else { + newTag = consumer.recover(); + } + // make sure server-generated tags are re-added. MK. if(tag != null && !tag.equals(newTag)) { synchronized (this.consumers) { @@ -772,6 +810,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer) } } + private RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable recoveryAction) throws Exception { + if (this.retryHandler == null) { + T result = recoveryAction.call(); + return new RetryResult(entity, result); + } else { + try { + T result = recoveryAction.call(); + return new RetryResult(entity, result); + } catch (Exception e) { + RetryContext retryContext = new RetryContext(entity, e, this); + RetryResult retryResult; + if (entity instanceof RecordedQueue) { + retryResult = this.retryHandler.retryQueueRecovery(retryContext); + } else if (entity instanceof RecordedExchange) { + retryResult = this.retryHandler.retryExchangeRecovery(retryContext); + } else if (entity instanceof RecordedBinding) { + retryResult = this.retryHandler.retryBindingRecovery(retryContext); + } else if (entity instanceof RecordedConsumer) { + retryResult = this.retryHandler.retryConsumerRecovery(retryContext); + } else { + throw new IllegalArgumentException("Unknown type of recorded entity: " + entity); + } + return retryResult; + } + } + } + private void propagateQueueNameChangeToBindings(String oldName, String newName) { for (RecordedBinding b : Utility.copy(this.recordedBindings)) { if (b.getDestination().equals(oldName)) { @@ -820,15 +885,15 @@ private List> groupEntitiesByChannel callables.add(Executors.callable(() -> { for (final E entity : entityList) { if (entity instanceof RecordedExchange) { - recoverExchange((RecordedExchange)entity); + recoverExchange((RecordedExchange)entity, true); } else if (entity instanceof RecordedQueue) { final RecordedQueue q = (RecordedQueue) entity; - recoverQueue(q.getName(), q); + recoverQueue(q.getName(), q, true); } else if (entity instanceof RecordedBinding) { - recoverBinding((RecordedBinding) entity); + recoverBinding((RecordedBinding) entity, true); } else if (entity instanceof RecordedConsumer) { final RecordedConsumer c = (RecordedConsumer) entity; - recoverConsumer(c.getConsumerTag(), c); + recoverConsumer(c.getConsumerTag(), c, true); } } })); diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java new file mode 100644 index 0000000000..a05c2a8a3c --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java @@ -0,0 +1,34 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Backoff policy for topology recovery retry attempts. + * + * @see DefaultRetryHandler + * @see TopologyRecoveryRetryHandlerBuilder + * @since 5.4.0 + */ +@FunctionalInterface +public interface BackoffPolicy { + + /** + * Wait depending on the current attempt number (1, 2, 3, etc) + * @param attemptNumber current attempt number + * @throws InterruptedException + */ + void backoff(int attemptNumber) throws InterruptedException; +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java new file mode 100644 index 0000000000..8c9b1df3fd --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java @@ -0,0 +1,131 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +import java.util.Objects; +import java.util.function.BiPredicate; + +/** + * Composable topology recovery retry handler. + * This retry handler implementations let the user choose the condition + * to trigger retry and the retry operation for each type of recoverable + * entities. The number of attempts and the backoff policy (time to wait + * between retries) are also configurable. + *

+ * See also {@link TopologyRecoveryRetryHandlerBuilder} to easily create + * instances and {@link TopologyRecoveryRetryLogic} for ready-to-use + * conditions and operations. + * + * @see TopologyRecoveryRetryHandlerBuilder + * @see TopologyRecoveryRetryLogic + * @since 5.4.0 + */ +public class DefaultRetryHandler implements RetryHandler { + + private final BiPredicate queueRecoveryRetryCondition; + private final BiPredicate exchangeRecoveryRetryCondition; + private final BiPredicate bindingRecoveryRetryCondition; + private final BiPredicate consumerRecoveryRetryCondition; + + private final RetryOperation queueRecoveryRetryOperation; + private final RetryOperation exchangeRecoveryRetryOperation; + private final RetryOperation bindingRecoveryRetryOperation; + private final RetryOperation consumerRecoveryRetryOperation; + + private final int retryAttempts; + + private final BackoffPolicy backoffPolicy; + + public DefaultRetryHandler(BiPredicate queueRecoveryRetryCondition, + BiPredicate exchangeRecoveryRetryCondition, + BiPredicate bindingRecoveryRetryCondition, + BiPredicate consumerRecoveryRetryCondition, + RetryOperation queueRecoveryRetryOperation, + RetryOperation exchangeRecoveryRetryOperation, + RetryOperation bindingRecoveryRetryOperation, + RetryOperation consumerRecoveryRetryOperation, int retryAttempts, BackoffPolicy backoffPolicy) { + this.queueRecoveryRetryCondition = queueRecoveryRetryCondition; + this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition; + this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition; + this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition; + this.queueRecoveryRetryOperation = queueRecoveryRetryOperation; + this.exchangeRecoveryRetryOperation = exchangeRecoveryRetryOperation; + this.bindingRecoveryRetryOperation = bindingRecoveryRetryOperation; + this.consumerRecoveryRetryOperation = consumerRecoveryRetryOperation; + this.backoffPolicy = backoffPolicy; + if (retryAttempts <= 0) { + throw new IllegalArgumentException("Number of retry attempts must be greater than 0"); + } + this.retryAttempts = retryAttempts; + } + + @Override + public RetryResult retryQueueRecovery(RetryContext context) throws Exception { + return doRetry(queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context); + } + + @Override + public RetryResult retryExchangeRecovery(RetryContext context) throws Exception { + return doRetry(exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context); + } + + @Override + public RetryResult retryBindingRecovery(RetryContext context) throws Exception { + return doRetry(bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context); + } + + @Override + public RetryResult retryConsumerRecovery(RetryContext context) throws Exception { + return doRetry(consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context); + } + + protected RetryResult doRetry(BiPredicate condition, RetryOperation operation, T entity, RetryContext context) + throws Exception { + int attempts = 0; + Exception exception = context.exception(); + while (attempts < retryAttempts) { + if (condition.test(entity, exception)) { + backoffPolicy.backoff(attempts + 1); + try { + Object result = operation.call(context); + return new RetryResult( + entity, result == null ? null : result.toString() + ); + } catch (Exception e) { + exception = e; + attempts++; + continue; + } + } else { + throw exception; + } + } + throw context.exception(); + } + + public interface RetryOperation { + + T call(RetryContext context) throws Exception; + + default RetryOperation andThen(RetryOperation after) { + Objects.requireNonNull(after); + return (context) -> { + call(context); + return after.call(context); + }; + } + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java new file mode 100644 index 0000000000..a9bdc05e5f --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java @@ -0,0 +1,99 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * The context of a topology recovery retry operation. + * + * @since 5.4.0 + */ +public class RetryContext { + + private final RecordedEntity entity; + + private final Exception exception; + + private final AutorecoveringConnection connection; + + public RetryContext(RecordedEntity entity, Exception exception, AutorecoveringConnection connection) { + this.entity = entity; + this.exception = exception; + this.connection = connection; + } + + /** + * The underlying connection. + * + * @return + */ + public AutorecoveringConnection connection() { + return connection; + } + + /** + * The exception that triggered the retry attempt. + * + * @return + */ + public Exception exception() { + return exception; + } + + /** + * The to-be-recovered entity. + * + * @return + */ + public RecordedEntity entity() { + return entity; + } + + /** + * The to-be-recovered entity as a queue. + * + * @return + */ + public RecordedQueue queue() { + return (RecordedQueue) entity; + } + + /** + * The to-be-recovered entity as an exchange. + * + * @return + */ + public RecordedExchange exchange() { + return (RecordedExchange) entity; + } + + /** + * The to-be-recovered entity as a binding. + * + * @return + */ + public RecordedBinding binding() { + return (RecordedBinding) entity; + } + + /** + * The to-be-recovered entity as a consumer. + * + * @return + */ + public RecordedConsumer consumer() { + return (RecordedConsumer) entity; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java new file mode 100644 index 0000000000..5ed7f823f0 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java @@ -0,0 +1,62 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Contract to retry failed operations during topology recovery. + * Not all operations have to be retried, it's a decision of the + * underlying implementation. + * + * @since 5.4.0 + */ +public interface RetryHandler { + + /** + * Retry a failed queue recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryQueueRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed exchange recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryExchangeRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed binding recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryBindingRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed consumer recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryConsumerRecovery(RetryContext context) throws Exception; +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java new file mode 100644 index 0000000000..c4797c39bf --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java @@ -0,0 +1,57 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * The retry of a retried topology recovery operation. + * + * @since 5.4.0 + */ +public class RetryResult { + + /** + * The entity to recover. + */ + private final RecordedEntity recordedEntity; + + /** + * The result of the recovery operation. + * E.g. a consumer tag when recovering a consumer. + */ + private final Object result; + + public RetryResult(RecordedEntity recordedEntity, Object result) { + this.recordedEntity = recordedEntity; + this.result = result; + } + + /** + * The entity to recover. + * + * @return + */ + public RecordedEntity getRecordedEntity() { + return recordedEntity; + } + + /** + * The result of the recovery operation. + * E.g. a consumer tag when recovering a consumer. + */ + public Object getResult() { + return result; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java new file mode 100644 index 0000000000..07141b43e5 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java @@ -0,0 +1,115 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +import java.util.function.BiPredicate; + +/** + * Builder to ease creation of {@link DefaultRetryHandler} instances. + *

+ * Just override what you need. By default, retry conditions don't trigger retry, + * retry operations are no-op, the number of retry attempts is 1, and the backoff + * policy doesn't wait at all. + * + * @see DefaultRetryHandler + * @see TopologyRecoveryRetryLogic + * @since 5.4.0 + */ +public class TopologyRecoveryRetryHandlerBuilder { + + private BiPredicate queueRecoveryRetryCondition = (q, e) -> false; + private BiPredicate exchangeRecoveryRetryCondition = (ex, e) -> false; + private BiPredicate bindingRecoveryRetryCondition = (b, e) -> false; + private BiPredicate consumerRecoveryRetryCondition = (c, e) -> false; + + private DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation = context -> null; + private DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation = context -> null; + private DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation = context -> null; + private DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation = context -> null; + + private int retryAttempts = 1; + + private BackoffPolicy backoffPolicy = nbAttempts -> { + }; + + public static TopologyRecoveryRetryHandlerBuilder builder() { + return new TopologyRecoveryRetryHandlerBuilder(); + } + + public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryCondition( + BiPredicate queueRecoveryRetryCondition) { + this.queueRecoveryRetryCondition = queueRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryCondition( + BiPredicate exchangeRecoveryRetryCondition) { + this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryCondition( + BiPredicate bindingRecoveryRetryCondition) { + this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryCondition( + BiPredicate consumerRecoveryRetryCondition) { + this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryOperation(DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation) { + this.queueRecoveryRetryOperation = queueRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryOperation(DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation) { + this.exchangeRecoveryRetryOperation = exchangeRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryOperation(DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation) { + this.bindingRecoveryRetryOperation = bindingRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryOperation(DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation) { + this.consumerRecoveryRetryOperation = consumerRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder backoffPolicy(BackoffPolicy backoffPolicy) { + this.backoffPolicy = backoffPolicy; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder retryAttempts(int retryAttempts) { + this.retryAttempts = retryAttempts; + return this; + } + + public RetryHandler build() { + return new DefaultRetryHandler( + queueRecoveryRetryCondition, exchangeRecoveryRetryCondition, + bindingRecoveryRetryCondition, consumerRecoveryRetryCondition, + queueRecoveryRetryOperation, exchangeRecoveryRetryOperation, + bindingRecoveryRetryOperation, consumerRecoveryRetryOperation, + retryAttempts, + backoffPolicy); + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java new file mode 100644 index 0000000000..8e6e16a790 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -0,0 +1,85 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.ShutdownSignalException; + +import java.util.function.Predicate; + +/** + * Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}. + * They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}. + * + * @see DefaultRetryHandler + * @see RetryHandler + * @see TopologyRecoveryRetryHandlerBuilder + * @since 5.4.0 + */ +public abstract class TopologyRecoveryRetryLogic { + + public static final Predicate CHANNEL_CLOSED_NOT_FOUND = e -> { + if (e.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + if (cause.getReason() instanceof AMQP.Channel.Close) { + return ((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404; + } + } + return false; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CHANNEL = context -> { + if (!context.entity().getChannel().isOpen()) { + context.connection().recoverChannel(context.entity().getChannel()); + } + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING_QUEUE = context -> { + if (context.entity() instanceof RecordedQueueBinding) { + RecordedBinding binding = context.binding(); + AutorecoveringConnection connection = context.connection(); + RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination()); + if (recordedQueue != null) { + connection.recoverQueue( + recordedQueue.getName(), recordedQueue, false + ); + } + } + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING = context -> { + context.binding().recover(); + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CONSUMER_QUEUE = context -> { + if (context.entity() instanceof RecordedConsumer) { + RecordedConsumer consumer = context.consumer(); + AutorecoveringConnection connection = context.connection(); + RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue()); + if (recordedQueue != null) { + connection.recoverQueue( + recordedQueue.getName(), recordedQueue, false + ); + } + } + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CONSUMER = context -> context.consumer().recover(); +} diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index 3747c71be9..9c2994b671 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -62,7 +62,8 @@ StrictExceptionHandlerTest.class, NoAutoRecoveryWhenTcpWindowIsFullTest.class, JsonRpcTest.class, - AddressTest.class + AddressTest.class, + DefaultRetryHandlerTest.class }) public class ClientTests { diff --git a/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java b/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java new file mode 100644 index 0000000000..cc105304a7 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java @@ -0,0 +1,257 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.impl.recovery.BackoffPolicy; +import com.rabbitmq.client.impl.recovery.DefaultRetryHandler; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; +import com.rabbitmq.client.impl.recovery.RecordedExchange; +import com.rabbitmq.client.impl.recovery.RecordedQueue; +import com.rabbitmq.client.impl.recovery.RetryContext; +import com.rabbitmq.client.impl.recovery.RetryHandler; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.verification.VerificationMode; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiPredicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.intThat; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +/** + * + */ +public class DefaultRetryHandlerTest { + + RetryHandler handler; + + @Mock + BiPredicate queueRecoveryRetryCondition; + @Mock + BiPredicate exchangeRecoveryRetryCondition; + @Mock + BiPredicate bindingRecoveryRetryCondition; + @Mock + BiPredicate consumerRecoveryRetryCondition; + + @Mock + DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation; + + @Mock + BackoffPolicy backoffPolicy; + + @Before + public void init() { + initMocks(this); + } + + @Test + public void shouldNotRetryWhenConditionReturnsFalse() throws Exception { + conditionsReturn(false); + handler = handler(); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryQueueRecovery(retryContext()) + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryExchangeRecovery(retryContext()) + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryBindingRecovery(retryContext()) + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryConsumerRecovery(retryContext()) + ); + verifyConditionsInvocation(times(1)); + verifyOperationsInvocation(never()); + verify(backoffPolicy, never()).backoff(anyInt()); + } + + @Test + public void shouldReturnOperationResultInRetryResultWhenRetrying() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("queue"); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("exchange"); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("binding"); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("consumer"); + handler = handler(); + assertEquals( + "queue", + handler.retryQueueRecovery(retryContext()).getResult() + ); + assertEquals( + "exchange", + handler.retryExchangeRecovery(retryContext()).getResult() + ); + assertEquals( + "binding", + handler.retryBindingRecovery(retryContext()).getResult() + ); + assertEquals( + "consumer", + handler.retryConsumerRecovery(retryContext()).getResult() + ); + verifyConditionsInvocation(times(1)); + verifyOperationsInvocation(times(1)); + verify(backoffPolicy, times(1 * 4)).backoff(1); + } + + @Test + public void shouldRetryWhenOperationFailsAndConditionIsTrue() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("queue"); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("exchange"); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("binding"); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("consumer"); + handler = handler(2); + assertEquals( + "queue", + handler.retryQueueRecovery(retryContext()).getResult() + ); + assertEquals( + "exchange", + handler.retryExchangeRecovery(retryContext()).getResult() + ); + assertEquals( + "binding", + handler.retryBindingRecovery(retryContext()).getResult() + ); + assertEquals( + "consumer", + handler.retryConsumerRecovery(retryContext()).getResult() + ); + verifyConditionsInvocation(times(2)); + verifyOperationsInvocation(times(2)); + checkBackoffSequence(1, 2, 1, 2, 1, 2, 1, 2); + } + + @Test + public void shouldThrowExceptionWhenRetryAttemptsIsExceeded() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + handler = handler(3); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryQueueRecovery(retryContext()) + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryExchangeRecovery(retryContext()) + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryBindingRecovery(retryContext()) + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryConsumerRecovery(retryContext()) + ); + verifyConditionsInvocation(times(3)); + verifyOperationsInvocation(times(3)); + checkBackoffSequence(1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); + } + + private void assertExceptionIsThrown(String message, Callable action) { + try { + action.call(); + fail(message); + } catch (Exception e) { + } + } + + private void conditionsReturn(boolean shouldRetry) { + when(queueRecoveryRetryCondition.test(nullable(RecordedQueue.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(exchangeRecoveryRetryCondition.test(nullable(RecordedExchange.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(bindingRecoveryRetryCondition.test(nullable(RecordedBinding.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(consumerRecoveryRetryCondition.test(nullable(RecordedConsumer.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + } + + private void verifyConditionsInvocation(VerificationMode mode) { + verify(queueRecoveryRetryCondition, mode).test(nullable(RecordedQueue.class), any(Exception.class)); + verify(exchangeRecoveryRetryCondition, mode).test(nullable(RecordedExchange.class), any(Exception.class)); + verify(bindingRecoveryRetryCondition, mode).test(nullable(RecordedBinding.class), any(Exception.class)); + verify(consumerRecoveryRetryCondition, mode).test(nullable(RecordedConsumer.class), any(Exception.class)); + } + + private void verifyOperationsInvocation(VerificationMode mode) throws Exception { + verify(queueRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(exchangeRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(bindingRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(consumerRecoveryRetryOperation, mode).call(any(RetryContext.class)); + } + + private RetryHandler handler() { + return handler(1); + } + + private RetryHandler handler(int retryAttempts) { + return new DefaultRetryHandler( + queueRecoveryRetryCondition, exchangeRecoveryRetryCondition, + bindingRecoveryRetryCondition, consumerRecoveryRetryCondition, + queueRecoveryRetryOperation, exchangeRecoveryRetryOperation, + bindingRecoveryRetryOperation, consumerRecoveryRetryOperation, + retryAttempts, + backoffPolicy); + } + + private RetryContext retryContext() { + return new RetryContext(null, new Exception(), null); + } + + private void checkBackoffSequence(int... sequence) throws InterruptedException { + AtomicInteger count = new AtomicInteger(0); + verify(backoffPolicy, times(sequence.length)) + // for some reason Mockito calls the matchers twice as many times as the target method + .backoff(intThat(i -> i == sequence[count.getAndIncrement() % sequence.length])); + } +} diff --git a/src/test/java/com/rabbitmq/client/test/TestUtils.java b/src/test/java/com/rabbitmq/client/test/TestUtils.java index 644f3a1707..c44e8b26a4 100644 --- a/src/test/java/com/rabbitmq/client/test/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/test/TestUtils.java @@ -15,11 +15,28 @@ package com.rabbitmq.client.test; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoverableConnection; +import com.rabbitmq.client.RecoveryListener; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.impl.NetworkConnection; +import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.tools.Host; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertTrue; public class TestUtils { @@ -27,7 +44,7 @@ public class TestUtils { public static ConnectionFactory connectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); - if(USE_NIO) { + if (USE_NIO) { connectionFactory.useNio(); } else { connectionFactory.useBlockingIo(); @@ -36,7 +53,7 @@ public static ConnectionFactory connectionFactory() { } public static void close(Connection connection) { - if(connection != null) { + if (connection != null) { try { connection.close(); } catch (IOException e) { @@ -66,12 +83,108 @@ public static boolean isVersion37orLater(Connection connection) { LoggerFactory.getLogger(TestUtils.class).warn("Unable to parse broker version {}", currentVersion, e); throw e; } + } + + public static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c) + throws IOException, TimeoutException, InterruptedException { + Channel ch = c.createChannel(); + try { + ch.confirmSelect(); + final CountDownLatch latch = new CountDownLatch(1); + ch.basicConsume(queue, true, new DefaultConsumer(ch) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + latch.countDown(); + } + }); + ch.basicPublish(exchange, routingKey, null, "".getBytes()); + ch.waitForConfirmsOrDie(5000); + return latch.await(5, TimeUnit.SECONDS); + } finally { + if (ch != null && ch.isOpen()) { + ch.close(); + } + } + } + + public static boolean resourceExists(Callable callback) throws Exception { + Channel declarePassiveChannel = null; + try { + declarePassiveChannel = callback.call(); + return true; + } catch (IOException e) { + if (e.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + if (cause.getReason() instanceof AMQP.Channel.Close) { + if (((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404) { + return false; + } else { + throw e; + } + } + return false; + } else { + throw e; + } + } finally { + if (declarePassiveChannel != null && declarePassiveChannel.isOpen()) { + declarePassiveChannel.close(); + } + } + } + + public static boolean queueExists(final String queue, final Connection connection) throws Exception { + return resourceExists(() -> { + Channel channel = connection.createChannel(); + channel.queueDeclarePassive(queue); + return channel; + }); + } + + public static boolean exchangeExists(final String exchange, final Connection connection) throws Exception { + return resourceExists(() -> { + Channel channel = connection.createChannel(); + channel.exchangeDeclarePassive(exchange); + return channel; + }); + } + + public static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connection); + Host.closeConnection((NetworkConnection) connection); + wait(latch); + } + + public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connection); + Host.closeAllConnections(); + wait(latch); + } + + public static CountDownLatch prepareForRecovery(Connection conn) { + final CountDownLatch latch = new CountDownLatch(1); + ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { + + @Override + public void handleRecovery(Recoverable recoverable) { + latch.countDown(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + // No-op + } + }); + return latch; + } + + private static void wait(CountDownLatch latch) throws InterruptedException { + assertTrue(latch.await(90, TimeUnit.SECONDS)); } /** * http://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java - * */ static int versionCompare(String str1, String str2) { String[] vals1 = str1.split("\\."); @@ -90,5 +203,4 @@ static int versionCompare(String str1, String str2) { // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" return Integer.signum(vals1.length - vals2.length); } - } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index a89bb16995..04cf8b04d2 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.rabbitmq.client.test.TestUtils.prepareForRecovery; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.*; @@ -62,7 +63,7 @@ public class ConnectionRecovery extends BrokerTestCase { try { assertTrue(c.isOpen()); assertEquals(connectionName, c.getClientProvidedName()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); assertEquals(connectionName, c.getClientProvidedName()); } finally { @@ -82,7 +83,7 @@ public class ConnectionRecovery extends BrokerTestCase { RecoverableConnection c = newRecoveringConnection(addresses); try { assertTrue(c.isOpen()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); } finally { c.abort(); @@ -98,7 +99,7 @@ public class ConnectionRecovery extends BrokerTestCase { RecoverableConnection c = newRecoveringConnection(addresses); try { assertTrue(c.isOpen()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); } finally { c.abort(); @@ -157,7 +158,7 @@ public String getPassword() { assertThat(usernameRequested.get(), is(1)); assertThat(passwordRequested.get(), is(1)); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); // username is requested in AMQConnection#toString, so it can be accessed at any time assertThat(usernameRequested.get(), greaterThanOrEqualTo(2)); @@ -804,7 +805,7 @@ public void handleDelivery(String consumerTag, Connection testConnection = connectionFactory.newConnection(); try { assertTrue(testConnection.isOpen()); - closeAndWaitForRecovery((RecoverableConnection) testConnection); + TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection); assertTrue(testConnection.isOpen()); } finally { connection.close(); @@ -851,7 +852,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie } } // now do recovery - closeAndWaitForRecovery(testConnection); + TestUtils.closeAndWaitForRecovery(testConnection); // verify channels & topology recovered by publishing a message to each for (int i=0; i < channelCount; i++) { @@ -935,21 +936,6 @@ private static void expectExchangeRecovery(Channel ch, String x) throws IOExcept ch.exchangeDeclarePassive(x); } - private static CountDownLatch prepareForRecovery(Connection conn) { - final CountDownLatch latch = new CountDownLatch(1); - ((AutorecoveringConnection)conn).addRecoveryListener(new RecoveryListener() { - @Override - public void handleRecovery(Recoverable recoverable) { - latch.countDown(); - } - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - // No-op - } - }); - return latch; - } - private static CountDownLatch prepareForShutdown(Connection conn) { final CountDownLatch latch = new CountDownLatch(1); conn.addShutdownListener(new ShutdownListener() { @@ -962,13 +948,7 @@ public void shutdownCompleted(ShutdownSignalException cause) { } private void closeAndWaitForRecovery() throws IOException, InterruptedException { - closeAndWaitForRecovery((AutorecoveringConnection)this.connection); - } - - private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { - CountDownLatch latch = prepareForRecovery(connection); - Host.closeConnection((NetworkConnection) connection); - wait(latch); + TestUtils.closeAndWaitForRecovery((AutorecoveringConnection)this.connection); } private void restartPrimaryAndWaitForRecovery() throws IOException, InterruptedException { diff --git a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java index aeadb303dd..fb48f29585 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java +++ b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java @@ -78,7 +78,8 @@ Nack.class, ExceptionMessages.class, Metrics.class, - TopologyRecoveryFiltering.class + TopologyRecoveryFiltering.class, + TopologyRecoveryRetry.class }) public class FunctionalTests { diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java index 480075258f..3eb9687eea 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java @@ -21,12 +21,7 @@ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoverableConnection; -import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.client.impl.NetworkConnection; -import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; import com.rabbitmq.client.impl.recovery.RecordedBinding; import com.rabbitmq.client.impl.recovery.RecordedConsumer; import com.rabbitmq.client.impl.recovery.RecordedExchange; @@ -34,16 +29,18 @@ import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import com.rabbitmq.client.test.BrokerTestCase; import com.rabbitmq.client.test.TestUtils; -import com.rabbitmq.tools.Host; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static com.rabbitmq.client.test.TestUtils.closeAndWaitForRecovery; +import static com.rabbitmq.client.test.TestUtils.exchangeExists; +import static com.rabbitmq.client.test.TestUtils.queueExists; +import static com.rabbitmq.client.test.TestUtils.sendAndConsumeMessage; import static org.awaitility.Awaitility.waitAtMost; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertFalse; @@ -62,98 +59,6 @@ public class TopologyRecoveryFiltering extends BrokerTestCase { }; Connection c; - private static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c) - throws IOException, TimeoutException, InterruptedException { - Channel ch = c.createChannel(); - try { - ch.confirmSelect(); - final CountDownLatch latch = new CountDownLatch(1); - ch.basicConsume(queue, true, new DefaultConsumer(ch) { - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - latch.countDown(); - } - }); - ch.basicPublish(exchange, routingKey, null, "".getBytes()); - ch.waitForConfirmsOrDie(5000); - return latch.await(5, TimeUnit.SECONDS); - } finally { - if (ch != null && ch.isOpen()) { - ch.close(); - } - } - } - - private static boolean resourceExists(Callable callback) throws Exception { - Channel declarePassiveChannel = null; - try { - declarePassiveChannel = callback.call(); - return true; - } catch (IOException e) { - if (e.getCause() instanceof ShutdownSignalException) { - ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); - if (cause.getReason() instanceof AMQP.Channel.Close) { - if (((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404) { - return false; - } else { - throw e; - } - } - return false; - } else { - throw e; - } - } finally { - if (declarePassiveChannel != null && declarePassiveChannel.isOpen()) { - declarePassiveChannel.close(); - } - } - } - - private static boolean queueExists(final String queue, final Connection connection) throws Exception { - return resourceExists(() -> { - Channel channel = connection.createChannel(); - channel.queueDeclarePassive(queue); - return channel; - }); - } - - private static boolean exchangeExists(final String exchange, final Connection connection) throws Exception { - return resourceExists(() -> { - Channel channel = connection.createChannel(); - channel.exchangeDeclarePassive(exchange); - return channel; - }); - } - - private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { - CountDownLatch latch = prepareForRecovery(connection); - Host.closeConnection((NetworkConnection) connection); - wait(latch); - } - - private static CountDownLatch prepareForRecovery(Connection conn) { - final CountDownLatch latch = new CountDownLatch(1); - ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { - - @Override - public void handleRecovery(Recoverable recoverable) { - latch.countDown(); - } - - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - // No-op - } - }); - return latch; - } - - private static void wait(CountDownLatch latch) throws InterruptedException { - assertTrue(latch.await(20, TimeUnit.SECONDS)); - } - @Override protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java new file mode 100644 index 0000000000..4f8ab6054a --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java @@ -0,0 +1,70 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test.functional; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.test.BrokerTestCase; +import com.rabbitmq.client.test.TestUtils; +import org.junit.Test; + +import java.util.HashMap; + +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.CHANNEL_CLOSED_NOT_FOUND; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING_QUEUE; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CHANNEL; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER_QUEUE; +import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class TopologyRecoveryRetry extends BrokerTestCase { + + @Test + public void topologyRecoveryRetry() throws Exception { + int nbQueues = 2000; + String prefix = "topology-recovery-retry-" + System.currentTimeMillis(); + for (int i = 0; i < nbQueues; i++) { + String queue = prefix + i; + channel.queueDeclare(queue, false, false, true, new HashMap<>()); + channel.queueBind(queue, "amq.direct", queue); + channel.basicConsume(queue, true, new DefaultConsumer(channel)); + } + + closeAllConnectionsAndWaitForRecovery(this.connection); + + assertTrue(channel.isOpen()); + } + + @Override + protected ConnectionFactory newConnectionFactory() { + ConnectionFactory connectionFactory = TestUtils.connectionFactory(); + connectionFactory.setTopologyRecoveryRetryHandler( + builder().bindingRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e)) + .consumerRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e)) + .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)) + .consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER))) + .build() + ); + connectionFactory.setNetworkRecoveryInterval(1000); + return connectionFactory; + } +} diff --git a/src/test/java/com/rabbitmq/tools/Host.java b/src/test/java/com/rabbitmq/tools/Host.java index 5924e5931d..c919d78621 100644 --- a/src/test/java/com/rabbitmq/tools/Host.java +++ b/src/test/java/com/rabbitmq/tools/Host.java @@ -169,6 +169,10 @@ public static void closeConnection(String pid) throws IOException { rabbitmqctl("close_connection '" + pid + "' 'Closed via rabbitmqctl'"); } + public static void closeAllConnections() throws IOException { + rabbitmqctl("close_all_connections 'Closed via rabbitmqctl'"); + } + public static void closeConnection(NetworkConnection c) throws IOException { Host.ConnectionInfo ci = findConnectionInfoFor(Host.listConnections(), c); closeConnection(ci.getPid());