Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions src/main/java/redis/clients/jedis/JedisThreadFactoryBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package redis.clients.jedis;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
* JedisThreadFactoryBuilder is a class that builds a ThreadFactory for Jedis.
*/
public class JedisThreadFactoryBuilder {
private String namePrefix = null;
private boolean daemon = false;
private int priority = Thread.NORM_PRIORITY;
private ThreadFactory backingThreadFactory = null;
private UncaughtExceptionHandler uncaughtExceptionHandler = null;

/**
* Sets the name prefix for the threads created by the ThreadFactory.
*
* @param namePrefix the name prefix for the threads
* @return the JedisThreadFactoryBuilder instance
* @throws NullPointerException if namePrefix is null
*/
public JedisThreadFactoryBuilder setNamePrefix(String namePrefix) {
if (namePrefix == null) {
throw new NullPointerException();
}
this.namePrefix = namePrefix;
return this;
}

/**
* Sets whether the threads created by the ThreadFactory are daemon threads.
*
* @param daemon true if the threads are daemon threads, false otherwise
* @return the JedisThreadFactoryBuilder instance
*/
public JedisThreadFactoryBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}

/**
* Sets the priority for the threads created by the ThreadFactory.
*
* @param priority the priority for the threads
* @return the JedisThreadFactoryBuilder instance
* @throws IllegalArgumentException if priority is not in the range of Thread.MIN_PRIORITY to Thread.MAX_PRIORITY
*/
public JedisThreadFactoryBuilder setPriority(int priority) {
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(String.format(
"Thread priority (%s) must be in %d ~ %d", priority,
Thread.MIN_PRIORITY, Thread.MAX_PRIORITY));
}

this.priority = priority;
return this;
}

/**
* Sets the UncaughtExceptionHandler for the threads created by the ThreadFactory.
*
* @param uncaughtExceptionHandler the UncaughtExceptionHandler for the threads
* @return the JedisThreadFactoryBuilder instance
* @throws NullPointerException if uncaughtExceptionHandler is null
*/
public JedisThreadFactoryBuilder setUncaughtExceptionHandler(
UncaughtExceptionHandler uncaughtExceptionHandler) {
if (uncaughtExceptionHandler == null) {
throw new NullPointerException(
"UncaughtExceptionHandler cannot be null");
}
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
return this;
}

/**
* Sets the backing ThreadFactory for the JedisThreadFactoryBuilder.
*
* @param backingThreadFactory the backing ThreadFactory
* @return the JedisThreadFactoryBuilder instance
* @throws NullPointerException if backingThreadFactory is null
*/
public JedisThreadFactoryBuilder setThreadFactory(
ThreadFactory backingThreadFactory) {
if (uncaughtExceptionHandler == null) {
throw new NullPointerException(
"BackingThreadFactory cannot be null");
}
this.backingThreadFactory = backingThreadFactory;
return this;
}

/**
* Builds a ThreadFactory using the JedisThreadFactoryBuilder instance.
*
* @return the ThreadFactory
*/
public ThreadFactory build() {
return build(this);
}

/**
* Builds a ThreadFactory by JedisThreadFactoryBuilder.
*
* @param builder JedisThreadFactoryBuilder
* @return ThreadFactory
*/
private static ThreadFactory build(JedisThreadFactoryBuilder builder) {
final String namePrefix = builder.namePrefix;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
final ThreadFactory backingThreadFactory = (builder.backingThreadFactory != null) ? builder.backingThreadFactory
: Executors.defaultThreadFactory();
final AtomicLong count = new AtomicLong(0);

return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = backingThreadFactory.newThread(runnable);
if (daemon) {
thread.setDaemon(daemon);
}
if (priority != Thread.NORM_PRIORITY) {
thread.setPriority(priority);
}
if (namePrefix != null) {
thread.setName(namePrefix + "-" + count.getAndIncrement());
}
if (uncaughtExceptionHandler != null) {
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
return thread;
}
};
}
}
120 changes: 120 additions & 0 deletions src/main/java/redis/clients/jedis/JedisThreadPoolBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package redis.clients.jedis;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used to build a thread pool for Jedis.
*/
public class JedisThreadPoolBuilder {
private static final Logger log = LoggerFactory.getLogger(JedisThreadPoolBuilder.class);

private static final RejectedExecutionHandler defaultRejectHandler = new AbortPolicy();

public static PoolBuilder pool() {
return new PoolBuilder();
}

/**
* Custom thread factory or use default
* @param threadNamePrefix the thread name prefix
* @param daemon daemon
* @return ThreadFactory
*/
private static ThreadFactory createThreadFactory(String threadNamePrefix, boolean daemon) {
if (threadNamePrefix != null) {
return new JedisThreadFactoryBuilder().setNamePrefix(threadNamePrefix).setDaemon(daemon)
.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error(String.format("Thread %s threw exception %s", t.getName(), e.getMessage()));
}
}).build();
}

return Executors.defaultThreadFactory();
}

/**
* This class is used to build a thread pool.
*/
public static class PoolBuilder {
private int coreSize = 0;
private int maxSize = Integer.MAX_VALUE;
private long keepAliveMillSecs = 10;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private boolean daemon;
private RejectedExecutionHandler rejectHandler;
private BlockingQueue<Runnable> workQueue;

public PoolBuilder setCoreSize(int coreSize) {
this.coreSize = coreSize;
return this;
}

public PoolBuilder setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}

public PoolBuilder setKeepAliveMillSecs(long keepAliveMillSecs) {
this.keepAliveMillSecs = keepAliveMillSecs;
return this;
}

public PoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}

public PoolBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}

public PoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}

public PoolBuilder setRejectHandler(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}

public PoolBuilder setWorkQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
return this;
}

public ExecutorService build() {
if (threadFactory == null) {
threadFactory = createThreadFactory(threadNamePrefix, daemon);
}

if (workQueue == null) {
throw new IllegalArgumentException("workQueue can't be null");
}

if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}

ExecutorService executorService = new ThreadPoolExecutor(coreSize, maxSize, keepAliveMillSecs,
TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectHandler);

return executorService;
}
}
}
79 changes: 50 additions & 29 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,25 +26,44 @@
public abstract class MultiNodePipelineBase extends PipelineBase
implements PipelineCommands, PipelineBinaryCommands, RedisModulePipelineCommands, Closeable {

private final Logger log = LoggerFactory.getLogger(getClass());

/**
* The number of processes for {@code sync()}. If you have enough cores for client (and you have
* more than 3 cluster nodes), you may increase this number of workers.
* Suggestion:&nbsp;&le;&nbsp;cluster&nbsp;nodes.
*/
public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3;
private static final Logger log = LoggerFactory.getLogger(MultiNodePipelineBase.class);

private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
private final Map<HostAndPort, Connection> connections;
private volatile boolean syncing = false;
/**
* The following are the default parameters for the multi node pipeline executor
* Since Redis query is usually a slower IO operation (requires more threads),
* so we set DEFAULT_CORE_POOL_SIZE to be the same as the core
*/
private static final long DEFAULT_KEEPALIVE_TIME_MS = 60000L;
private static final int DEFAULT_BLOCKING_QUEUE_SIZE = Protocol.CLUSTER_HASHSLOTS;
private static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static ExecutorService executorService = JedisThreadPoolBuilder.pool()
.setCoreSize(DEFAULT_CORE_POOL_SIZE)
.setMaxSize(DEFAULT_MAXIMUM_POOL_SIZE)
.setKeepAliveMillSecs(DEFAULT_KEEPALIVE_TIME_MS)
.setThreadNamePrefix("jedis-multi-node-pipeline")
.setWorkQueue(new ArrayBlockingQueue<>(DEFAULT_BLOCKING_QUEUE_SIZE)).build();

public MultiNodePipelineBase(CommandObjects commandObjects) {
super(commandObjects);
pipelinedResponses = new LinkedHashMap<>();
connections = new LinkedHashMap<>();
}

/**
* Provide an interface for users to set executors themselves.
* @param executor the executor
*/
public static void setExecutorService(ExecutorService executor) {

This comment was marked as off-topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dengliming This is set once, works every time approach :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, I suggest that it is better to supply this parameter directly to the constructor method rather than the setter method. : )

We need to pass the executor when new JedisCluster or new JedisSharding, and need a clear name to avoid misunderstanding by users, and there are many modifications(JedisCluster->ClusterPipeline->MultiNodePipelineBase).

But the benefit is that we can create the executor lazily, if the user passes it, we use it; otherwise, we can create a static object.

But I personally prefer the current code, and it is one-time and clear to set directly through the MultiNodePipelineBase#setExecutorService interface (About cost: there is only a part of memory usage, if the user is not using it, no thread will be created).

if (executorService != executor && executorService != null) {
executorService.shutdown();
}
executorService = executor;
}

/**
* Sub-classes must call this method, if graph commands are going to be used.
* @param connectionProvider connection provider
Expand Down Expand Up @@ -102,8 +122,6 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
Expand All @@ -112,32 +130,35 @@ public final void sync() {
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
try {
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
});
});
} catch (RejectedExecutionException e) {
log.error("Get a reject exception when submitting, it is recommended that you use the "
+ "MultiNodePipelineBase#setExecutorService method to customize the executor", e);
throw e;
}
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();

syncing = false;
}

Expand Down
Loading