Skip to content

Commit 9034a61

Browse files
Merge pull request #388 from rabbitmq/rabbitmq-java-client-387-topology-recovery-retry
Add optional retry logic to topology recovery
2 parents 26a4a96 + 34e33ea commit 9034a61

18 files changed

+1160
-155
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.rabbitmq.client.impl.nio.NioParams;
3030
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
3131
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
32+
import com.rabbitmq.client.impl.recovery.RetryHandler;
3233
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
3334

3435
import java.io.IOException;
@@ -192,6 +193,13 @@ public class ConnectionFactory implements Cloneable {
192193
*/
193194
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
194195

196+
/**
197+
* Retry handler for topology recovery.
198+
* Default is no retry.
199+
* @since 5.4.0
200+
*/
201+
private RetryHandler topologyRecoveryRetryHandler;
202+
195203
/** @return the default host to use for connections */
196204
public String getHost() {
197205
return host;
@@ -1087,6 +1095,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10871095
result.setErrorOnWriteListener(errorOnWriteListener);
10881096
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
10891097
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
1098+
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
10901099
return result;
10911100
}
10921101

@@ -1454,4 +1463,13 @@ public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalExc
14541463
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
14551464
}
14561465

1466+
/**
1467+
* Set retry handler for topology recovery.
1468+
* Default is no retry.
1469+
* @param topologyRecoveryRetryHandler
1470+
* @since 5.4.0
1471+
*/
1472+
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
1473+
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
1474+
}
14571475
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
2121
import com.rabbitmq.client.SaslConfig;
2222
import com.rabbitmq.client.ShutdownSignalException;
23+
import com.rabbitmq.client.impl.recovery.RetryHandler;
2324
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2425

2526
import java.util.Map;
@@ -51,6 +52,7 @@ public class ConnectionParams {
5152
private int workPoolTimeout = -1;
5253
private TopologyRecoveryFilter topologyRecoveryFilter;
5354
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
55+
private RetryHandler topologyRecoveryRetryHandler;
5456

5557
private ExceptionHandler exceptionHandler;
5658
private ThreadFactory threadFactory;
@@ -257,4 +259,11 @@ public Predicate<ShutdownSignalException> getConnectionRecoveryTriggeringConditi
257259
return connectionRecoveryTriggeringCondition;
258260
}
259261

262+
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
263+
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
264+
}
265+
266+
public RetryHandler getTopologyRecoveryRetryHandler() {
267+
return topologyRecoveryRetryHandler;
268+
}
260269
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 87 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
9595

9696
private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
9797

98+
private final RetryHandler retryHandler;
99+
98100
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
99101
this(params, f, new ListAddressResolver(addrs));
100102
}
@@ -115,6 +117,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
115117
this.channels = new ConcurrentHashMap<>();
116118
this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
117119
letAllPassFilter() : params.getTopologyRecoveryFilter();
120+
121+
this.retryHandler = params.getTopologyRecoveryRetryHandler();
118122
}
119123

120124
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -125,12 +129,9 @@ private void setupErrorOnWriteListenerForPotentialRecovery() {
125129
// we should trigger the error handling and the recovery only once
126130
if (errorOnWriteLock.tryLock()) {
127131
try {
128-
Thread recoveryThread = threadFactory.newThread(new Runnable() {
129-
@Override
130-
public void run() {
131-
AMQConnection c = (AMQConnection) connection;
132-
c.handleIoError(exception);
133-
}
132+
Thread recoveryThread = threadFactory.newThread(() -> {
133+
AMQConnection c = (AMQConnection) connection;
134+
c.handleIoError(exception);
134135
});
135136
recoveryThread.setName("RabbitMQ Error On Write Thread");
136137
recoveryThread.start();
@@ -630,6 +631,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
630631
}
631632
}
632633

634+
void recoverChannel(AutorecoveringChannel channel) throws IOException {
635+
channel.automaticallyRecover(this, this.delegate);
636+
}
637+
633638
private void notifyRecoveryListenersComplete() {
634639
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
635640
f.handleRecovery(this);
@@ -651,16 +656,16 @@ private void recoverTopology(final ExecutorService executor) {
651656
if (executor == null) {
652657
// recover entities in serial on the main connection thread
653658
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
654-
recoverExchange(exchange);
659+
recoverExchange(exchange, true);
655660
}
656661
for (final Map.Entry<String, RecordedQueue> entry : Utility.copy(recordedQueues).entrySet()) {
657-
recoverQueue(entry.getKey(), entry.getValue());
662+
recoverQueue(entry.getKey(), entry.getValue(), true);
658663
}
659664
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
660-
recoverBinding(b);
665+
recoverBinding(b, true);
661666
}
662667
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
663-
recoverConsumer(entry.getKey(), entry.getValue());
668+
recoverConsumer(entry.getKey(), entry.getValue(), true);
664669
}
665670
} else {
666671
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
@@ -680,11 +685,19 @@ private void recoverTopology(final ExecutorService executor) {
680685
}
681686
}
682687

683-
private void recoverExchange(final RecordedExchange x) {
688+
private void recoverExchange(RecordedExchange x, boolean retry) {
684689
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
685690
try {
686691
if (topologyRecoveryFilter.filterExchange(x)) {
687-
x.recover();
692+
if (retry) {
693+
final RecordedExchange entity = x;
694+
x = (RecordedExchange) wrapRetryIfNecessary(x, () -> {
695+
entity.recover();
696+
return null;
697+
}).getRecordedEntity();
698+
} else {
699+
x.recover();
700+
}
688701
LOGGER.debug("{} has recovered", x);
689702
}
690703
} catch (Exception cause) {
@@ -695,12 +708,20 @@ private void recoverExchange(final RecordedExchange x) {
695708
}
696709
}
697710

698-
private void recoverQueue(final String oldName, final RecordedQueue q) {
699711

712+
void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
700713
try {
701714
if (topologyRecoveryFilter.filterQueue(q)) {
702715
LOGGER.debug("Recovering {}", q);
703-
q.recover();
716+
if (retry) {
717+
final RecordedQueue entity = q;
718+
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
719+
entity.recover();
720+
return null;
721+
}).getRecordedEntity();
722+
} else {
723+
q.recover();
724+
}
704725
String newName = q.getName();
705726
if (!oldName.equals(newName)) {
706727
// make sure server-named queues are re-added with
@@ -731,10 +752,18 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
731752
}
732753
}
733754

734-
private void recoverBinding(final RecordedBinding b) {
755+
private void recoverBinding(RecordedBinding b, boolean retry) {
735756
try {
736757
if (this.topologyRecoveryFilter.filterBinding(b)) {
737-
b.recover();
758+
if (retry) {
759+
final RecordedBinding entity = b;
760+
b = (RecordedBinding) wrapRetryIfNecessary(b, () -> {
761+
entity.recover();
762+
return null;
763+
}).getRecordedEntity();
764+
} else {
765+
b.recover();
766+
}
738767
LOGGER.debug("{} has recovered", b);
739768
}
740769
} catch (Exception cause) {
@@ -745,11 +774,20 @@ private void recoverBinding(final RecordedBinding b) {
745774
}
746775
}
747776

748-
private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
777+
private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
749778
try {
750779
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
751780
LOGGER.debug("Recovering {}", consumer);
752-
String newTag = consumer.recover();
781+
String newTag = null;
782+
if (retry) {
783+
final RecordedConsumer entity = consumer;
784+
RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover());
785+
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
786+
newTag = (String) retryResult.getResult();
787+
} else {
788+
newTag = consumer.recover();
789+
}
790+
753791
// make sure server-generated tags are re-added. MK.
754792
if(tag != null && !tag.equals(newTag)) {
755793
synchronized (this.consumers) {
@@ -772,6 +810,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer)
772810
}
773811
}
774812

813+
private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
814+
if (this.retryHandler == null) {
815+
T result = recoveryAction.call();
816+
return new RetryResult(entity, result);
817+
} else {
818+
try {
819+
T result = recoveryAction.call();
820+
return new RetryResult(entity, result);
821+
} catch (Exception e) {
822+
RetryContext retryContext = new RetryContext(entity, e, this);
823+
RetryResult retryResult;
824+
if (entity instanceof RecordedQueue) {
825+
retryResult = this.retryHandler.retryQueueRecovery(retryContext);
826+
} else if (entity instanceof RecordedExchange) {
827+
retryResult = this.retryHandler.retryExchangeRecovery(retryContext);
828+
} else if (entity instanceof RecordedBinding) {
829+
retryResult = this.retryHandler.retryBindingRecovery(retryContext);
830+
} else if (entity instanceof RecordedConsumer) {
831+
retryResult = this.retryHandler.retryConsumerRecovery(retryContext);
832+
} else {
833+
throw new IllegalArgumentException("Unknown type of recorded entity: " + entity);
834+
}
835+
return retryResult;
836+
}
837+
}
838+
}
839+
775840
private void propagateQueueNameChangeToBindings(String oldName, String newName) {
776841
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
777842
if (b.getDestination().equals(oldName)) {
@@ -820,15 +885,15 @@ private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel
820885
callables.add(Executors.callable(() -> {
821886
for (final E entity : entityList) {
822887
if (entity instanceof RecordedExchange) {
823-
recoverExchange((RecordedExchange)entity);
888+
recoverExchange((RecordedExchange)entity, true);
824889
} else if (entity instanceof RecordedQueue) {
825890
final RecordedQueue q = (RecordedQueue) entity;
826-
recoverQueue(q.getName(), q);
891+
recoverQueue(q.getName(), q, true);
827892
} else if (entity instanceof RecordedBinding) {
828-
recoverBinding((RecordedBinding) entity);
893+
recoverBinding((RecordedBinding) entity, true);
829894
} else if (entity instanceof RecordedConsumer) {
830895
final RecordedConsumer c = (RecordedConsumer) entity;
831-
recoverConsumer(c.getConsumerTag(), c);
896+
recoverConsumer(c.getConsumerTag(), c, true);
832897
}
833898
}
834899
}));
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl.recovery;
17+
18+
/**
19+
* Backoff policy for topology recovery retry attempts.
20+
*
21+
* @see DefaultRetryHandler
22+
* @see TopologyRecoveryRetryHandlerBuilder
23+
* @since 5.4.0
24+
*/
25+
@FunctionalInterface
26+
public interface BackoffPolicy {
27+
28+
/**
29+
* Wait depending on the current attempt number (1, 2, 3, etc)
30+
* @param attemptNumber current attempt number
31+
* @throws InterruptedException
32+
*/
33+
void backoff(int attemptNumber) throws InterruptedException;
34+
}

0 commit comments

Comments
 (0)