diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index 9546be9f07..ce04a0d83c 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -127,7 +127,8 @@ public class ConnectionFactory implements Cloneable { private boolean automaticRecovery = true; private boolean topologyRecovery = true; - + private ExecutorService topologyRecoveryExecutor; + // long is used to make sure the users can use both ints // and longs safely. It is unlikely that anybody'd need // to use recovery intervals > Integer.MAX_VALUE in practice. @@ -339,7 +340,7 @@ public void setUri(String uriString) setUri(new URI(uriString)); } - private String uriDecode(String s) { + private static String uriDecode(String s) { try { // URLDecode decodes '+' to a space, as for // form encoding. So protect plus signs. @@ -523,7 +524,6 @@ public void setSocketFactory(SocketFactory factory) { * * @see #setSocketConfigurator(SocketConfigurator) */ - @SuppressWarnings("unused") public SocketConfigurator getSocketConfigurator() { return socketConf; } @@ -701,7 +701,6 @@ public void setAutomaticRecoveryEnabled(boolean automaticRecovery) { * @return true if topology recovery is enabled, false otherwise * @see Automatic Recovery */ - @SuppressWarnings("unused") public boolean isTopologyRecoveryEnabled() { return topologyRecovery; } @@ -714,6 +713,24 @@ public boolean isTopologyRecoveryEnabled() { public void setTopologyRecoveryEnabled(boolean topologyRecovery) { this.topologyRecovery = topologyRecovery; } + + /** + * Get the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread. + * @return thread pool executor + */ + public ExecutorService getTopologyRecoveryExecutor() { + return topologyRecoveryExecutor; + } + + /** + * Set the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread. + * It is recommended to pass a ThreadPoolExecutor that will allow its core threads to timeout so these threads can die when recovery is complete. + * Note: your {@link ExceptionHandler#handleTopologyRecoveryException(Connection, Channel, TopologyRecoveryException)} method should be thread-safe. + * @param topologyRecoveryExecutor thread pool executor + */ + public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) { + this.topologyRecoveryExecutor = topologyRecoveryExecutor; + } public void setMetricsCollector(MetricsCollector metricsCollector) { this.metricsCollector = metricsCollector; @@ -1013,6 +1030,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setNetworkRecoveryInterval(networkRecoveryInterval); result.setRecoveryDelayHandler(recoveryDelayHandler); result.setTopologyRecovery(topologyRecovery); + result.setTopologyRecoveryExecutor(topologyRecoveryExecutor); result.setExceptionHandler(exceptionHandler); result.setThreadFactory(threadFactory); result.setHandshakeTimeout(handshakeTimeout); diff --git a/src/main/java/com/rabbitmq/client/ExceptionHandler.java b/src/main/java/com/rabbitmq/client/ExceptionHandler.java index 958bb1c5df..b840c6d50e 100644 --- a/src/main/java/com/rabbitmq/client/ExceptionHandler.java +++ b/src/main/java/com/rabbitmq/client/ExceptionHandler.java @@ -108,7 +108,7 @@ void handleConsumerException(Channel channel, * during topology (exchanges, queues, bindings, consumers) recovery * that it can't otherwise deal with. * @param conn the Connection that caught the exception - * @param ch the Channel that caught the exception + * @param ch the Channel that caught the exception. May be null. * @param exception the exception caught in the driver thread */ diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java index 05a3d096b7..08c123bd81 100644 --- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java @@ -41,6 +41,7 @@ public class ConnectionParams { private long networkRecoveryInterval; private RecoveryDelayHandler recoveryDelayHandler; private boolean topologyRecovery; + private ExecutorService topologyRecoveryExecutor; private int channelRpcTimeout; private boolean channelShouldCheckRpcResponseType; private ErrorOnWriteListener errorOnWriteListener; @@ -114,10 +115,18 @@ public RecoveryDelayHandler getRecoveryDelayHandler() { public boolean isTopologyRecoveryEnabled() { return topologyRecovery; } + + /** + * Get the topology recovery executor. If null, the main connection thread should be used. + * @return executor. May be null. + */ + public ExecutorService getTopologyRecoveryExecutor() { + return topologyRecoveryExecutor; + } public ThreadFactory getThreadFactory() { - return threadFactory; - } + return threadFactory; + } public int getChannelRpcTimeout() { return channelRpcTimeout; @@ -174,6 +183,10 @@ public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHand public void setTopologyRecovery(boolean topologyRecovery) { this.topologyRecovery = topologyRecovery; } + + public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) { + this.topologyRecoveryExecutor = topologyRecoveryExecutor; + } public void setExceptionHandler(ExceptionHandler exceptionHandler) { this.exceptionHandler = exceptionHandler; diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 0f976b5ca3..4c2f103dca 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -28,8 +28,10 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; @@ -525,7 +527,7 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) { this.consumerRecoveryListeners.remove(listener); } - synchronized private void beginAutomaticRecovery() throws InterruptedException { + private synchronized void beginAutomaticRecovery() throws InterruptedException { Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(0)); this.notifyRecoveryListenersStarted(); @@ -534,18 +536,16 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException { if (newConn == null) { return; } - + LOGGER.debug("Connection {} has recovered", newConn); this.addAutomaticRecoveryListener(newConn); this.recoverShutdownListeners(newConn); this.recoverBlockedListeners(newConn); this.recoverChannels(newConn); // don't assign new delegate connection until channel recovery is complete this.delegate = newConn; - if(this.params.isTopologyRecoveryEnabled()) { - this.recoverEntities(); - this.recoverConsumers(); + if (this.params.isTopologyRecoveryEnabled()) { + recoverTopology(params.getTopologyRecoveryExecutor()); } - this.notifyRecoveryListenersComplete(); } @@ -593,6 +593,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) { for (AutorecoveringChannel ch : this.channels.values()) { try { ch.automaticallyRecover(this, newConn); + LOGGER.debug("Channel {} has recovered", ch); } catch (Throwable t) { newConn.getExceptionHandler().handleChannelRecoveryException(ch, t); } @@ -610,113 +611,125 @@ private void notifyRecoveryListenersStarted() { f.handleRecoveryStarted(this); } } - - private void recoverEntities() { + + private void recoverTopology(final ExecutorService executor) { // The recovery sequence is the following: - // // 1. Recover exchanges // 2. Recover queues // 3. Recover bindings // 4. Recover consumers - recoverExchanges(); - recoverQueues(); - recoverBindings(); - } - - private void recoverExchanges() { - // recorded exchanges are guaranteed to be - // non-predefined (we filter out predefined ones - // in exchangeDeclare). MK. - for (RecordedExchange x : Utility.copy(this.recordedExchanges).values()) { + if (executor == null) { + // recover entities in serial on the main connection thread + for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) { + recoverExchange(exchange); + } + for (final Map.Entry entry : Utility.copy(recordedQueues).entrySet()) { + recoverQueue(entry.getKey(), entry.getValue()); + } + for (final RecordedBinding b : Utility.copy(recordedBindings)) { + recoverBinding(b); + } + for (final Map.Entry entry : Utility.copy(consumers).entrySet()) { + recoverConsumer(entry.getKey(), entry.getValue()); + } + } else { + // Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers + // A channel is single threaded, so group things by channel and recover 1 entity at a time per channel + // We also need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example + // Note: invokeAll will block until all callables are completed and all returned futures will be complete try { - x.recover(); - } catch (Exception cause) { - final String message = "Caught an exception while recovering exchange " + x.getName() + - ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); - this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e); + executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values())); + executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values())); + executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings))); + executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values())); + } catch (final Exception cause) { + final String message = "Caught an exception while recovering toplogy: " + cause.getMessage(); + final TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + getExceptionHandler().handleTopologyRecoveryException(delegate, null, e); } } } - private void recoverQueues() { - for (Map.Entry entry : Utility.copy(this.recordedQueues).entrySet()) { - String oldName = entry.getKey(); - RecordedQueue q = entry.getValue(); - try { - q.recover(); - String newName = q.getName(); - if (!oldName.equals(newName)) { - // make sure server-named queues are re-added with - // their new names. MK. - synchronized (this.recordedQueues) { - this.propagateQueueNameChangeToBindings(oldName, newName); - this.propagateQueueNameChangeToConsumers(oldName, newName); - // bug26552: - // remove old name after we've updated the bindings and consumers, - // plus only for server-named queues, both to make sure we don't lose - // anything to recover. MK. - if(q.isServerNamed()) { - deleteRecordedQueue(oldName); - } - this.recordedQueues.put(newName, q); + private void recoverExchange(final RecordedExchange x) { + // recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK. + try { + x.recover(); + LOGGER.debug("{} has recovered", x); + } catch (Exception cause) { + final String message = "Caught an exception while recovering exchange " + x.getName() + + ": " + cause.getMessage(); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e); + } + } + + private void recoverQueue(final String oldName, final RecordedQueue q) { + LOGGER.debug("Recovering {}", q); + try { + q.recover(); + String newName = q.getName(); + if (!oldName.equals(newName)) { + // make sure server-named queues are re-added with + // their new names. MK. + synchronized (this.recordedQueues) { + this.propagateQueueNameChangeToBindings(oldName, newName); + this.propagateQueueNameChangeToConsumers(oldName, newName); + // bug26552: + // remove old name after we've updated the bindings and consumers, + // plus only for server-named queues, both to make sure we don't lose + // anything to recover. MK. + if(q.isServerNamed()) { + deleteRecordedQueue(oldName); } + this.recordedQueues.put(newName, q); } - for(QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) { - qrl.queueRecovered(oldName, newName); - } - } catch (Exception cause) { - final String message = "Caught an exception while recovering queue " + oldName + - ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); - this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e); } + for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) { + qrl.queueRecovered(oldName, newName); + } + LOGGER.debug("{} has recovered", q); + } catch (Exception cause) { + final String message = "Caught an exception while recovering queue " + oldName + + ": " + cause.getMessage(); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e); } } - private void recoverBindings() { - for (RecordedBinding b : Utility.copy(this.recordedBindings)) { - try { - b.recover(); - } catch (Exception cause) { - String message = "Caught an exception while recovering binding between " + b.getSource() + - " and " + b.getDestination() + ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); - this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e); - } + private void recoverBinding(final RecordedBinding b) { + try { + b.recover(); + LOGGER.debug("{} has recovered", b); + } catch (Exception cause) { + String message = "Caught an exception while recovering binding between " + b.getSource() + + " and " + b.getDestination() + ": " + cause.getMessage(); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e); } } - private void recoverConsumers() { - for (Map.Entry entry : Utility.copy(this.consumers).entrySet()) { - String tag = entry.getKey(); - RecordedConsumer consumer = entry.getValue(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Recovering consumer {}", consumer); - } - try { - String newTag = consumer.recover(); - // make sure server-generated tags are re-added. MK. - if(tag != null && !tag.equals(newTag)) { - synchronized (this.consumers) { - this.consumers.remove(tag); - this.consumers.put(newTag, consumer); - } - consumer.getChannel().updateConsumerTag(tag, newTag); + private void recoverConsumer(final String tag, final RecordedConsumer consumer) { + LOGGER.debug("Recovering {}", consumer); + try { + String newTag = consumer.recover(); + // make sure server-generated tags are re-added. MK. + if(tag != null && !tag.equals(newTag)) { + synchronized (this.consumers) { + this.consumers.remove(tag); + this.consumers.put(newTag, consumer); } + consumer.getChannel().updateConsumerTag(tag, newTag); + } - for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) { - crl.consumerRecovered(tag, newTag); - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Consumer {} has recovered", consumer); - } - } catch (Exception cause) { - final String message = "Caught an exception while recovering consumer " + tag + - ": " + cause.getMessage(); - TopologyRecoveryException e = new TopologyRecoveryException(message, cause); - this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e); + for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) { + crl.consumerRecovered(tag, newTag); } + LOGGER.debug("{} has recovered", consumer); + } catch (Exception cause) { + final String message = "Caught an exception while recovering consumer " + tag + + ": " + cause.getMessage(); + TopologyRecoveryException e = new TopologyRecoveryException(message, cause); + this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e); } } @@ -735,6 +748,42 @@ private void propagateQueueNameChangeToConsumers(String oldName, String newName) } } } + + private List> groupEntitiesByChannel(final Collection entities) { + // map entities by channel + final Map> map = new LinkedHashMap>(); + for (final E entity : entities) { + final AutorecoveringChannel channel = entity.getChannel(); + List list = map.get(channel); + if (list == null) { + map.put(channel, list = new ArrayList()); + } + list.add(entity); + } + // now create a runnable per channel + final List> callables = new ArrayList>(); + for (final List entityList : map.values()) { + callables.add(Executors.callable(new Runnable() { + @Override + public void run() { + for (final E entity : entityList) { + if (entity instanceof RecordedExchange) { + recoverExchange((RecordedExchange)entity); + } else if (entity instanceof RecordedQueue) { + final RecordedQueue q = (RecordedQueue) entity; + recoverQueue(q.getName(), q); + } else if (entity instanceof RecordedBinding) { + recoverBinding((RecordedBinding) entity); + } else if (entity instanceof RecordedConsumer) { + final RecordedConsumer c = (RecordedConsumer) entity; + recoverConsumer(c.getConsumerTag(), c); + } + } + } + })); + } + return callables; + } void recordQueueBinding(AutorecoveringChannel ch, String queue, diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedConsumer.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedConsumer.java index 3cb1698732..1eb9c7a943 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedConsumer.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedConsumer.java @@ -77,4 +77,9 @@ public void setQueue(String queue) { public String getConsumerTag() { return consumerTag; } + + @Override + public String toString() { + return "RecordedConsumer[tag=" + consumerTag + ", queue=" + queue + ", autoAck=" + autoAck + ", exclusive=" + exclusive + ", arguments=" + arguments + ", consumer=" + consumer + ", channel=" + channel + "]"; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchange.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchange.java index 7ff9604a94..2884604fdc 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchange.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchange.java @@ -58,4 +58,9 @@ public RecordedExchange arguments(Map value) { public boolean isAutoDelete() { return autoDelete; } + + @Override + public String toString() { + return "RecordedExchange[name=" + name + ", type=" + type + ", durable=" + durable + ", autoDelete=" + autoDelete + ", arguments=" + arguments + ", channel=" + channel + "]"; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchangeBinding.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchangeBinding.java index 209d37f4b0..3b6d72881d 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchangeBinding.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedExchangeBinding.java @@ -29,4 +29,9 @@ public RecordedExchangeBinding(AutorecoveringChannel channel) { public void recover() throws IOException { this.channel.getDelegate().exchangeBind(this.destination, this.source, this.routingKey, this.arguments); } + + @Override + public String toString() { + return "RecordedExchangeBinding[source=" + source + ", destination=" + destination + ", routingKey=" + routingKey + ", arguments=" + arguments + ", channel=" + channel + "]"; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java index 855350825b..f446e1682c 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java @@ -79,4 +79,9 @@ public RecordedQueue arguments(Map value) { this.arguments = value; return this; } + + @Override + public String toString() { + return "RecordedQueue[name=" + name + ", durable=" + durable + ", autoDelete=" + autoDelete + ", exclusive=" + exclusive + ", arguments=" + arguments + "serverNamed=" + serverNamed + ", channel=" + channel + "]"; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueueBinding.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueueBinding.java index f593c5ff86..a933ce3acd 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueueBinding.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueueBinding.java @@ -29,4 +29,9 @@ public RecordedQueueBinding(AutorecoveringChannel channel) { public void recover() throws IOException { this.channel.getDelegate().queueBind(this.getDestination(), this.getSource(), this.routingKey, this.arguments); } + + @Override + public String toString() { + return "RecordedQueueBinding[source=" + source + ", destination=" + destination + ", routingKey=" + routingKey + ", arguments=" + arguments + ", channel=" + channel + "]"; + } } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 7099c5c490..39a3984280 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -16,6 +16,7 @@ package com.rabbitmq.client.test.functional; import com.rabbitmq.client.*; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.impl.CredentialsProvider; import com.rabbitmq.client.impl.NetworkConnection; import com.rabbitmq.client.impl.recovery.*; @@ -31,10 +32,12 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -168,11 +171,13 @@ public String getPassword() { final List events = new CopyOnWriteArrayList(); final CountDownLatch latch = new CountDownLatch(2); // one when started, another when complete connection.addShutdownListener(new ShutdownListener() { + @Override public void shutdownCompleted(ShutdownSignalException cause) { events.add("shutdown hook 1"); } }); connection.addShutdownListener(new ShutdownListener() { + @Override public void shutdownCompleted(ShutdownSignalException cause) { events.add("shutdown hook 2"); } @@ -211,6 +216,7 @@ public void handleRecoveryStarted(Recoverable recoverable) { @Test public void shutdownHooksRecoveryOnConnection() throws IOException, InterruptedException { final CountDownLatch latch = new CountDownLatch(2); connection.addShutdownListener(new ShutdownListener() { + @Override public void shutdownCompleted(ShutdownSignalException cause) { latch.countDown(); } @@ -225,6 +231,7 @@ public void shutdownCompleted(ShutdownSignalException cause) { @Test public void shutdownHooksRecoveryOnChannel() throws IOException, InterruptedException { final CountDownLatch latch = new CountDownLatch(3); channel.addShutdownListener(new ShutdownListener() { + @Override public void shutdownCompleted(ShutdownSignalException cause) { latch.countDown(); } @@ -241,10 +248,12 @@ public void shutdownCompleted(ShutdownSignalException cause) { @Test public void blockedListenerRecovery() throws IOException, InterruptedException { final CountDownLatch latch = new CountDownLatch(2); connection.addBlockedListener(new BlockedListener() { + @Override public void handleBlocked(String reason) throws IOException { latch.countDown(); } + @Override public void handleUnblocked() throws IOException { latch.countDown(); } @@ -270,6 +279,7 @@ public void handleUnblocked() throws IOException { @Test public void returnListenerRecovery() throws IOException, InterruptedException { final CountDownLatch latch = new CountDownLatch(1); channel.addReturnListener(new ReturnListener() { + @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { @@ -285,10 +295,12 @@ public void handleReturn(int replyCode, String replyText, String exchange, @Test public void confirmListenerRecovery() throws IOException, InterruptedException, TimeoutException { final CountDownLatch latch = new CountDownLatch(1); channel.addConfirmListener(new ConfirmListener() { + @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { latch.countDown(); } + @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { latch.countDown(); } @@ -693,9 +705,11 @@ public void consumerRecovered(String oldConsumerTag, String newConsumerTag) { final CountDownLatch latch = new CountDownLatch(2); final CountDownLatch startLatch = new CountDownLatch(2); final RecoveryListener listener = new RecoveryListener() { + @Override public void handleRecovery(Recoverable recoverable) { latch.countDown(); } + @Override public void handleRecoveryStarted(Recoverable recoverable) { startLatch.countDown(); } @@ -795,33 +809,98 @@ public void handleDelivery(String consumerTag, connection.close(); } } + + @Test public void recoveryWithMultipleThreads() throws Exception { + // test with 8 recovery threads + final ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, new LinkedBlockingQueue()); + executor.allowCoreThreadTimeOut(true); + ConnectionFactory connectionFactory = buildConnectionFactoryWithRecoveryEnabled(false); + assertNull(connectionFactory.getTopologyRecoveryExecutor()); + connectionFactory.setTopologyRecoveryExecutor(executor); + assertEquals(executor, connectionFactory.getTopologyRecoveryExecutor()); + RecoverableConnection testConnection = (RecoverableConnection) connectionFactory.newConnection(); + try { + final List channels = new ArrayList(); + final List exchanges = new ArrayList(); + final List queues = new ArrayList(); + // create 16 channels + final int channelCount = 16; + final int queuesPerChannel = 20; + final CountDownLatch latch = new CountDownLatch(channelCount * queuesPerChannel); + for (int i=0; i < channelCount; i++) { + final Channel testChannel = testConnection.createChannel(); + channels.add(testChannel); + String x = "tmp-x-topic-" + i; + exchanges.add(x); + testChannel.exchangeDeclare(x, "topic"); + // create 20 queues and bindings per channel + for (int j=0; j < queuesPerChannel; j++) { + String q = "tmp-q-" + i + "-" + j; + queues.add(q); + testChannel.queueDeclare(q, false, false, true, null); + testChannel.queueBind(q, x, "tmp-key-" + i + "-" + j); + testChannel.basicConsume(q, new DefaultConsumer(testChannel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) + throws IOException { + testChannel.basicAck(envelope.getDeliveryTag(), false); + latch.countDown(); + } + }); + } + } + // now do recovery + closeAndWaitForRecovery(testConnection); + + // verify channels & topology recovered by publishing a message to each + for (int i=0; i < channelCount; i++) { + Channel ch = channels.get(i); + expectChannelRecovery(ch); + // publish message to each queue/consumer + for (int j=0; j < queuesPerChannel; j++) { + ch.basicPublish("tmp-x-topic-" + i, "tmp-key-" + i + "-" + j, null, "msg".getBytes()); + } + } + // verify all queues/consumers got it + assertTrue(latch.await(30, TimeUnit.SECONDS)); + + // cleanup + Channel cleanupChannel = testConnection.createChannel(); + for (String q : queues) + cleanupChannel.queueDelete(q); + for (String x : exchanges) + cleanupChannel.exchangeDelete(x); + } finally { + testConnection.close(); + } + } private void assertConsumerCount(int exp, String q) throws IOException { assertEquals(exp, channel.queueDeclarePassive(q).getConsumerCount()); } - private AMQP.Queue.DeclareOk declareClientNamedQueue(Channel ch, String q) throws IOException { + private static AMQP.Queue.DeclareOk declareClientNamedQueue(Channel ch, String q) throws IOException { return ch.queueDeclare(q, true, false, false, null); } - private AMQP.Queue.DeclareOk declareClientNamedAutoDeleteQueue(Channel ch, String q) throws IOException { + private static AMQP.Queue.DeclareOk declareClientNamedAutoDeleteQueue(Channel ch, String q) throws IOException { return ch.queueDeclare(q, true, false, true, null); } - private void declareClientNamedQueueNoWait(Channel ch, String q) throws IOException { + private static void declareClientNamedQueueNoWait(Channel ch, String q) throws IOException { ch.queueDeclareNoWait(q, true, false, false, null); } - private AMQP.Exchange.DeclareOk declareExchange(Channel ch, String x) throws IOException { + private static AMQP.Exchange.DeclareOk declareExchange(Channel ch, String x) throws IOException { return ch.exchangeDeclare(x, "fanout", false); } - private void declareExchangeNoWait(Channel ch, String x) throws IOException { + private static void declareExchangeNoWait(Channel ch, String x) throws IOException { ch.exchangeDeclareNoWait(x, "fanout", false, false, false, null); } - private void expectQueueRecovery(Channel ch, String q) throws IOException, InterruptedException, TimeoutException { + private static void expectQueueRecovery(Channel ch, String q) throws IOException, InterruptedException, TimeoutException { ch.confirmSelect(); ch.queuePurge(q); AMQP.Queue.DeclareOk ok1 = declareClientNamedQueue(ch, q); @@ -832,7 +911,7 @@ private void expectQueueRecovery(Channel ch, String q) throws IOException, Inter assertEquals(1, ok2.getMessageCount()); } - private void expectAutoDeleteQueueAndBindingRecovery(Channel ch, String x, String q) throws IOException, InterruptedException, + private static void expectAutoDeleteQueueAndBindingRecovery(Channel ch, String x, String q) throws IOException, InterruptedException, TimeoutException { ch.confirmSelect(); ch.queuePurge(q); @@ -845,7 +924,7 @@ private void expectAutoDeleteQueueAndBindingRecovery(Channel ch, String x, Strin assertEquals(1, ok2.getMessageCount()); } - private void expectExchangeRecovery(Channel ch, String x) throws IOException, InterruptedException, TimeoutException { + private static void expectExchangeRecovery(Channel ch, String x) throws IOException, InterruptedException, TimeoutException { ch.confirmSelect(); String q = ch.queueDeclare().getQueue(); final String rk = "routing-key"; @@ -855,12 +934,14 @@ private void expectExchangeRecovery(Channel ch, String x) throws IOException, In ch.exchangeDeclarePassive(x); } - private CountDownLatch prepareForRecovery(Connection conn) { + private static CountDownLatch prepareForRecovery(Connection conn) { final CountDownLatch latch = new CountDownLatch(1); ((AutorecoveringConnection)conn).addRecoveryListener(new RecoveryListener() { + @Override public void handleRecovery(Recoverable recoverable) { latch.countDown(); } + @Override public void handleRecoveryStarted(Recoverable recoverable) { // No-op } @@ -868,9 +949,10 @@ public void handleRecoveryStarted(Recoverable recoverable) { return latch; } - private CountDownLatch prepareForShutdown(Connection conn) throws InterruptedException { + private static CountDownLatch prepareForShutdown(Connection conn) { final CountDownLatch latch = new CountDownLatch(1); conn.addShutdownListener(new ShutdownListener() { + @Override public void shutdownCompleted(ShutdownSignalException cause) { latch.countDown(); } @@ -882,7 +964,7 @@ private void closeAndWaitForRecovery() throws IOException, InterruptedException closeAndWaitForRecovery((AutorecoveringConnection)this.connection); } - private void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { + private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { CountDownLatch latch = prepareForRecovery(connection); Host.closeConnection((NetworkConnection) connection); wait(latch); @@ -900,7 +982,7 @@ private void restartPrimaryAndWaitForRecovery(Connection connection) throws IOEx wait(latch); } - private void expectChannelRecovery(Channel ch) throws InterruptedException { + private static void expectChannelRecovery(Channel ch) { assertTrue(ch.isOpen()); } @@ -909,42 +991,42 @@ protected ConnectionFactory newConnectionFactory() { return buildConnectionFactoryWithRecoveryEnabled(false); } - private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery) + private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); return (AutorecoveringConnection) cf.newConnection(); } - private RecoverableConnection newRecoveringConnection(Address[] addresses) + private static RecoverableConnection newRecoveringConnection(Address[] addresses) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(false); // specifically use the Address[] overload return (AutorecoveringConnection) cf.newConnection(addresses); } - private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List
addresses) + private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List
addresses) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); return (AutorecoveringConnection) cf.newConnection(addresses); } - private RecoverableConnection newRecoveringConnection(List
addresses) + private static RecoverableConnection newRecoveringConnection(List
addresses) throws IOException, TimeoutException { return newRecoveringConnection(false, addresses); } - private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, String connectionName) + private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, String connectionName) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); return (RecoverableConnection) cf.newConnection(connectionName); } - private RecoverableConnection newRecoveringConnection(String connectionName) + private static RecoverableConnection newRecoveringConnection(String connectionName) throws IOException, TimeoutException { return newRecoveringConnection(false, connectionName); } - - private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) { + + private static ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) { ConnectionFactory cf = TestUtils.connectionFactory(); cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL); cf.setAutomaticRecoveryEnabled(true); @@ -960,15 +1042,15 @@ private static void wait(CountDownLatch latch) throws InterruptedException { assertTrue(latch.await(90, TimeUnit.SECONDS)); } - private void waitForConfirms(Channel ch) throws InterruptedException, TimeoutException { + private static void waitForConfirms(Channel ch) throws InterruptedException, TimeoutException { ch.waitForConfirms(30 * 60 * 1000); } - private void assertRecordedQueues(Connection conn, int size) { + private static void assertRecordedQueues(Connection conn, int size) { assertEquals(size, ((AutorecoveringConnection)conn).getRecordedQueues().size()); } - private void assertRecordedExchanges(Connection conn, int size) { + private static void assertRecordedExchanges(Connection conn, int size) { assertEquals(size, ((AutorecoveringConnection)conn).getRecordedExchanges().size()); } }