Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ public Map<String, RecordedExchange> getRecordedExchanges() {
public List<RecordedBinding> getRecordedBindings() {
return recordedBindings;
}

public Map<String, RecordedConsumer> getRecordedConsumers() {
return consumers;
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@ public abstract class TopologyRecoveryRetryLogic {
* Recover a binding.
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING = context -> {
context.binding().recover();
if (context.entity() instanceof RecordedQueueBinding) {
// recover all bindings for the queue.
// need to do this incase some bindings have already been recovered successfully before this binding failed
String queue = context.binding().getDestination();
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
recordedBinding.recover();
}
}
} else if (context.entity() instanceof RecordedExchangeBinding) {
// recover all bindings for the exchange
// need to do this incase some bindings have already been recovered successfully before this binding failed
String exchange = context.binding().getDestination();
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
if (recordedBinding instanceof RecordedExchangeBinding && exchange.equals(recordedBinding.getDestination())) {
recordedBinding.recover();
}
}
} else {
// should't be possible to get here, but just in case recover just this binding
context.binding().recover();
}
return null;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,24 @@

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.RecordedBinding;
import com.rabbitmq.client.impl.recovery.RecordedConsumer;
import com.rabbitmq.client.test.BrokerTestCase;
import com.rabbitmq.client.test.TestUtils;
import com.rabbitmq.tools.Host;
import org.junit.After;
import org.junit.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER;
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
Expand All @@ -32,6 +45,13 @@
*/
public class TopologyRecoveryRetry extends BrokerTestCase {

private volatile Consumer<Integer> backoffConsumer;

@After
public void cleanup() {
backoffConsumer = null;
}

@Test
public void topologyRecoveryRetry() throws Exception {
int nbQueues = 200;
Expand All @@ -40,18 +60,149 @@ public void topologyRecoveryRetry() throws Exception {
String queue = prefix + i;
channel.queueDeclare(queue, false, false, true, new HashMap<>());
channel.queueBind(queue, "amq.direct", queue);
channel.queueBind(queue, "amq.direct", queue + "2");
channel.basicConsume(queue, true, new DefaultConsumer(channel));
}

closeAllConnectionsAndWaitForRecovery(this.connection);

assertTrue(channel.isOpen());
}

@Test
public void topologyRecoveryBindingFailure() throws Exception {
final String queue = "topology-recovery-retry-binding-failure" + System.currentTimeMillis();
channel.queueDeclare(queue, false, false, true, new HashMap<>());
channel.queueBind(queue, "amq.topic", "topic1");
channel.queueBind(queue, "amq.topic", "topic2");
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
System.out.println("Got message=" + new String(body));
messagesReceivedLatch.countDown();
}
});
final CountDownLatch recoveryLatch = new CountDownLatch(1);
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// no-op
}
@Override
public void handleRecovery(Recoverable recoverable) {
recoveryLatch.countDown();
}
});

// we want recovery to fail when recovering the 2nd binding
// give the 2nd recorded binding a bad queue name so it fails
final RecordedBinding binding2 = ((AutorecoveringConnection)connection).getRecordedBindings().get(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is brutal :D

binding2.destination(UUID.randomUUID().toString());

// use the backoffConsumer to know that it has failed
// then delete the real queue & fix the recorded binding
// it should fail once more because queue is gone, and then succeed
final CountDownLatch backoffLatch = new CountDownLatch(1);
backoffConsumer = attempt -> {
if (attempt == 1) {
binding2.destination(queue);
try {
Host.rabbitmqctl("delete_queue " + queue);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
backoffLatch.countDown();
};

// close connection
Host.closeAllConnections();

// assert backoff was called
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
// wait for full recovery
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));

// publish messages to verify both bindings were recovered
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");

assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
}

@Test
public void topologyRecoveryConsumerFailure() throws Exception {
final String queue = "topology-recovery-retry-consumer-failure" + System.currentTimeMillis();
channel.queueDeclare(queue, false, false, true, new HashMap<>());
channel.queueBind(queue, "amq.topic", "topic1");
channel.queueBind(queue, "amq.topic", "topic2");
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
System.out.println("Got message=" + new String(body));
messagesReceivedLatch.countDown();
}
});
final CountDownLatch recoveryLatch = new CountDownLatch(1);
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// no-op
}
@Override
public void handleRecovery(Recoverable recoverable) {
recoveryLatch.countDown();
}
});

// we want recovery to fail when recovering the consumer
// give the recorded consumer a bad queue name so it fails
final RecordedConsumer consumer = ((AutorecoveringConnection)connection).getRecordedConsumers().values().iterator().next();
consumer.setQueue(UUID.randomUUID().toString());

// use the backoffConsumer to know that it has failed
// then delete the real queue & fix the recorded consumer
// it should fail once more because queue is gone, and then succeed
final CountDownLatch backoffLatch = new CountDownLatch(1);
backoffConsumer = attempt -> {
if (attempt == 1) {
consumer.setQueue(queue);
try {
Host.rabbitmqctl("delete_queue " + queue);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
backoffLatch.countDown();
};

// close connection
Host.closeAllConnections();

// assert backoff was called
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
// wait for full recovery
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));

// publish messages to verify both bindings & consumer were recovered
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");

assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
}

@Override
protected ConnectionFactory newConnectionFactory() {
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build());
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.backoffPolicy(attempt -> {
if (backoffConsumer != null) {
backoffConsumer.accept(attempt);
}
}).build());
connectionFactory.setNetworkRecoveryInterval(1000);
return connectionFactory;
}
Expand Down