Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public class ConnectionFactory implements Cloneable {

private boolean automaticRecovery = true;
private boolean topologyRecovery = true;

private ExecutorService topologyRecoveryExecutor;

// long is used to make sure the users can use both ints
// and longs safely. It is unlikely that anybody'd need
// to use recovery intervals > Integer.MAX_VALUE in practice.
Expand Down Expand Up @@ -339,7 +340,7 @@ public void setUri(String uriString)
setUri(new URI(uriString));
}

private String uriDecode(String s) {
private static String uriDecode(String s) {
try {
// URLDecode decodes '+' to a space, as for
// form encoding. So protect plus signs.
Expand Down Expand Up @@ -523,7 +524,6 @@ public void setSocketFactory(SocketFactory factory) {
*
* @see #setSocketConfigurator(SocketConfigurator)
*/
@SuppressWarnings("unused")
public SocketConfigurator getSocketConfigurator() {
return socketConf;
}
Expand Down Expand Up @@ -701,7 +701,6 @@ public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
* @return true if topology recovery is enabled, false otherwise
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
@SuppressWarnings("unused")
public boolean isTopologyRecoveryEnabled() {
return topologyRecovery;
}
Expand All @@ -714,6 +713,24 @@ public boolean isTopologyRecoveryEnabled() {
public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
}

/**
* Get the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
* @return thread pool executor
*/
public ExecutorService getTopologyRecoveryExecutor() {
return topologyRecoveryExecutor;
}

/**
* Set the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
* It is recommended to pass a ThreadPoolExecutor that will allow its core threads to timeout so these threads can die when recovery is complete.
* Note: your {@link ExceptionHandler#handleTopologyRecoveryException(Connection, Channel, TopologyRecoveryException)} method should be thread-safe.
* @param topologyRecoveryExecutor thread pool executor
*/
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
}

public void setMetricsCollector(MetricsCollector metricsCollector) {
this.metricsCollector = metricsCollector;
Expand Down Expand Up @@ -1013,6 +1030,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setNetworkRecoveryInterval(networkRecoveryInterval);
result.setRecoveryDelayHandler(recoveryDelayHandler);
result.setTopologyRecovery(topologyRecovery);
result.setTopologyRecoveryExecutor(topologyRecoveryExecutor);
result.setExceptionHandler(exceptionHandler);
result.setThreadFactory(threadFactory);
result.setHandshakeTimeout(handshakeTimeout);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/client/ExceptionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void handleConsumerException(Channel channel,
* during topology (exchanges, queues, bindings, consumers) recovery
* that it can't otherwise deal with.
* @param conn the Connection that caught the exception
* @param ch the Channel that caught the exception
* @param ch the Channel that caught the exception. May be null.
* @param exception the exception caught in the driver thread
*/

Expand Down
17 changes: 15 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ConnectionParams {
private long networkRecoveryInterval;
private RecoveryDelayHandler recoveryDelayHandler;
private boolean topologyRecovery;
private ExecutorService topologyRecoveryExecutor;
private int channelRpcTimeout;
private boolean channelShouldCheckRpcResponseType;
private ErrorOnWriteListener errorOnWriteListener;
Expand Down Expand Up @@ -114,10 +115,18 @@ public RecoveryDelayHandler getRecoveryDelayHandler() {
public boolean isTopologyRecoveryEnabled() {
return topologyRecovery;
}

/**
* Get the topology recovery executor. If null, the main connection thread should be used.
* @return executor. May be null.
*/
public ExecutorService getTopologyRecoveryExecutor() {
return topologyRecoveryExecutor;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
return threadFactory;
}

public int getChannelRpcTimeout() {
return channelRpcTimeout;
Expand Down Expand Up @@ -174,6 +183,10 @@ public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHand
public void setTopologyRecovery(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
}

public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
}

public void setExceptionHandler(ExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
Expand Down
Loading