Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,39 @@
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.IOUtils;

// TODO: RESP3
public class ClusterPipeline extends MultiNodePipelineBase {

private final ClusterConnectionProvider provider;
private AutoCloseable closeable = null;

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig));
this(new ClusterConnectionProvider(clusterNodes, clientConfig),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig));
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}

public ClusterPipeline(ClusterConnectionProvider provider) {
super(new ClusterCommandObjects());
this(provider, new ClusterCommandObjects());
}

public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects) {
super(commandObjects);
this.provider = provider;
}

private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) {
ClusterCommandObjects cco = new ClusterCommandObjects();
if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol);
return cco;
}

@Override
public void close() {
try {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -4178,8 +4178,8 @@ public final CommandObject<List<String>> graphExplain(String graphName, String q
return new CommandObject<>(commandArguments(GraphCommand.EXPLAIN).key(graphName).add(query), BuilderFactory.STRING_LIST);
}

public final CommandObject<List<List<String>>> graphSlowlog(String graphName) {
return new CommandObject<>(commandArguments(GraphCommand.SLOWLOG).key(graphName), BuilderFactory.STRING_LIST_LIST);
public final CommandObject<List<List<Object>>> graphSlowlog(String graphName) {
return new CommandObject<>(commandArguments(GraphCommand.SLOWLOG).key(graphName), BuilderFactory.ENCODED_OBJECT_LIST_LIST);
}

public final CommandObject<String> graphConfigSet(String configName, Object value) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientC
initializeFromClientConfig(clientConfig);
}

public Connection(final JedisSocketFactory socketFactory) {
this.socketFactory = socketFactory;
}

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
}

public Connection(final JedisSocketFactory socketFactory) {
this.socketFactory = socketFactory;
}

@Override
public String toString() {
return "Connection{" + socketFactory + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ public DefaultJedisClientConfig build() {
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper);
}

/**
* Shortcut to {@link Builder#protocol(redis.clients.jedis.RedisProtocol)} with {@link RedisProtocol#RESP3}.
*/
public Builder resp3() {
return protocol(RedisProtocol.RESP3);
}

public Builder protocol(RedisProtocol protocol) {
this.redisProtocol = protocol;
return this;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public Connection getConnectionFromSlot(int slot) {

@Override
public ClusterPipeline pipelined() {
return new ClusterPipeline((ClusterConnectionProvider) provider);
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
}

/**
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/redis/clients/jedis/JedisSharding.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
* @deprecated Sharding/Sharded feature will be removed in next major release.
*/
@Deprecated
// TODO: RESP3
public class JedisSharding extends UnifiedJedis {

public static final Pattern DEFAULT_KEY_TAG_PATTERN = Pattern.compile("\\{(.+?)\\}");
Expand All @@ -21,20 +20,24 @@ public JedisSharding(List<HostAndPort> shards) {

public JedisSharding(List<HostAndPort> shards, JedisClientConfig clientConfig) {
this(new ShardedConnectionProvider(shards, clientConfig));
setProtocol(clientConfig);
}

public JedisSharding(List<HostAndPort> shards, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ShardedConnectionProvider(shards, clientConfig, poolConfig));
setProtocol(clientConfig);
}

public JedisSharding(List<HostAndPort> shards, JedisClientConfig clientConfig, Hashing algo) {
this(new ShardedConnectionProvider(shards, clientConfig, algo));
setProtocol(clientConfig);
}

public JedisSharding(List<HostAndPort> shards, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Hashing algo) {
this(new ShardedConnectionProvider(shards, clientConfig, poolConfig, algo));
setProtocol(clientConfig);
}

public JedisSharding(ShardedConnectionProvider provider) {
Expand All @@ -45,6 +48,11 @@ public JedisSharding(ShardedConnectionProvider provider, Pattern tagPattern) {
super(provider, tagPattern);
}

private void setProtocol(JedisClientConfig clientConfig) {
RedisProtocol proto = clientConfig.getRedisProtocol();
if (proto == RedisProtocol.RESP3) commandObjects.setProtocol(proto);
}

@Override
public ShardedPipeline pipelined() {
return new ShardedPipeline((ShardedConnectionProvider) provider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import redis.clients.jedis.util.IOUtils;
import redis.clients.jedis.util.KeyValue;

// TODO: RESP3
public abstract class MultiNodePipelineBase implements PipelineCommands, PipelineBinaryCommands,
RedisModulePipelineCommands, Closeable {

Expand Down
32 changes: 30 additions & 2 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import redis.clients.jedis.commands.SampleBinaryKeyedCommands;
import redis.clients.jedis.commands.SampleKeyedCommands;
import redis.clients.jedis.commands.RedisModuleCommands;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.executors.*;
import redis.clients.jedis.graph.GraphCommandObjects;
import redis.clients.jedis.graph.ResultSet;
Expand All @@ -41,9 +42,10 @@ public class UnifiedJedis implements JedisCommands, JedisBinaryCommands,
SampleKeyedCommands, SampleBinaryKeyedCommands, RedisModuleCommands,
AutoCloseable {

protected RedisProtocol protocol = null;
protected final ConnectionProvider provider;
protected final CommandExecutor executor;
private final CommandObjects commandObjects;
protected final CommandObjects commandObjects;
private final GraphCommandObjects graphCommandObjects;
private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;

Expand Down Expand Up @@ -91,6 +93,15 @@ public UnifiedJedis(ConnectionProvider provider) {
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
try (Connection conn = this.provider.getConnection()) {
if (conn != null) {
RedisProtocol proto = conn.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
}
//} catch (JedisAccessControlException ace) {
} catch (JedisException je) { // TODO: use specific exception(s)
// use default protocol
}
}

/**
Expand All @@ -103,6 +114,16 @@ public UnifiedJedis(JedisSocketFactory socketFactory) {
this(new Connection(socketFactory));
}

/**
* The constructor to directly use a custom {@link JedisSocketFactory}.
* <p>
* WARNING: Using this constructor means a {@link NullPointerException} will be occurred if
* {@link UnifiedJedis#provider} is accessed.
*/
public UnifiedJedis(JedisSocketFactory socketFactory, JedisClientConfig clientConfig) {
this(new Connection(socketFactory, clientConfig));
}

/**
* The constructor to directly use a {@link Connection}.
* <p>
Expand All @@ -114,6 +135,8 @@ public UnifiedJedis(Connection connection) {
this.executor = new SimpleCommandExecutor(connection);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
RedisProtocol proto = connection.getRedisProtocol();
if (proto == RedisProtocol.RESP3) this.commandObjects.setProtocol(proto);
}

public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts) {
Expand Down Expand Up @@ -195,6 +218,11 @@ public void close() {
IOUtils.closeQuietly(this.executor);
}

protected final void setProtocol(RedisProtocol protocol) {
this.protocol = protocol;
this.commandObjects.setProtocol(this.protocol);
}

public final <T> T executeCommand(CommandObject<T> commandObject) {
return executor.executeCommand(commandObject);
}
Expand Down Expand Up @@ -4685,7 +4713,7 @@ public List<String> graphExplain(String graphName, String query) {
}

@Override
public List<List<String>> graphSlowlog(String graphName) {
public List<List<Object>> graphSlowlog(String graphName) {
return executeCommand(commandObjects.graphSlowlog(graphName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public interface RedisGraphCommands {
/**
* Returns a list containing up to 10 of the slowest queries issued against the given graph ID.
*/
// TODO: RESP3 --> List<List<Object>> ?
List<List<String>> graphSlowlog(String graphName);
List<List<Object>> graphSlowlog(String graphName);

String graphConfigSet(String configName, Object value);

Expand Down
18 changes: 11 additions & 7 deletions src/test/java/redis/clients/jedis/JedisPooledTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package redis.clients.jedis;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.Test;
Expand Down Expand Up @@ -196,9 +200,9 @@ public void testResetValidCredentials() {
public void testCredentialsProvider() {
final AtomicInteger prepareCount = new AtomicInteger();
final AtomicInteger cleanupCount = new AtomicInteger();
final AtomicBoolean validPassword = new AtomicBoolean(false);

RedisCredentialsProvider credentialsProvider = new RedisCredentialsProvider() {
boolean firstCall = true;

@Override
public void prepare() {
Expand All @@ -207,8 +211,7 @@ public void prepare() {

@Override
public RedisCredentials get() {
if (firstCall) {
firstCall = false;
if (!validPassword.get()) {
return new RedisCredentials() { };
}

Expand Down Expand Up @@ -244,12 +247,13 @@ public void cleanUp() {
} catch (JedisException e) {
}
assertEquals(0, pool.getPool().getNumActive() + pool.getPool().getNumIdle() + pool.getPool().getNumWaiters());
assertEquals(1, prepareCount.get());
assertEquals(1, cleanupCount.get());
assertThat(prepareCount.getAndSet(0), greaterThanOrEqualTo(1));
assertThat(cleanupCount.getAndSet(0), greaterThanOrEqualTo(1));

validPassword.set(true);
assertNull(pool.get("foo"));
assertEquals(2, prepareCount.get());
assertEquals(2, cleanupCount.get());
assertThat(prepareCount.get(), equalTo(1));
assertThat(cleanupCount.get(), equalTo(1));
}
}
}
16 changes: 16 additions & 0 deletions src/test/java/redis/clients/jedis/UdsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,29 @@ public void jedisConnectsToUds() {
}
}

@Test
public void jedisConnectsToUdsResp3() {
try (Jedis jedis = new Jedis(new UdsJedisSocketFactory(),
DefaultJedisClientConfig.builder().resp3().build())) {
assertEquals("PONG", jedis.ping());
}
}

@Test
public void unifiedJedisConnectsToUds() {
try (UnifiedJedis jedis = new UnifiedJedis(new UdsJedisSocketFactory())) {
assertEquals("PONG", jedis.ping());
}
}

@Test
public void unifiedJedisConnectsToUdsResp3() {
try (UnifiedJedis jedis = new UnifiedJedis(new UdsJedisSocketFactory(),
DefaultJedisClientConfig.builder().resp3().build())) {
assertEquals("PONG", jedis.ping());
}
}

private static class UdsJedisSocketFactory implements JedisSocketFactory {

private static final File UDS_SOCKET = new File("/tmp/redis_uds.sock");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,12 +766,11 @@ public void explain() {
}

@Test
// TODO: RESP3
public void slowlog() {
assertNotNull(client.graphProfile("social", "CREATE (:person{name:'roi',age:32})"));
assertNotNull(client.graphProfile("social", "CREATE (:person{name:'amit',age:30})"));

List<List<String>> slowlogs = client.graphSlowlog("social");
List<List<Object>> slowlogs = client.graphSlowlog("social");
assertEquals(2, slowlogs.size());
slowlogs.forEach(sl -> assertFalse(sl.isEmpty()));
slowlogs.forEach(sl -> sl.forEach(Assert::assertNotNull));
Expand Down