Skip to content

Commit a0efc76

Browse files
author
R-J Lim
committed
Proof-of-concept for automatic key prefixing
1 parent 5733cd6 commit a0efc76

File tree

5 files changed

+164
-3
lines changed

5 files changed

+164
-3
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package redis.clients.jedis;
2+
3+
import redis.clients.jedis.args.Rawable;
4+
import redis.clients.jedis.args.RawableFactory;
5+
import redis.clients.jedis.commands.ProtocolCommand;
6+
import redis.clients.jedis.util.SafeEncoder;
7+
8+
public class ClusterCommandArgumentsWithPrefixedKeys extends ClusterCommandArguments {
9+
private final byte[] prefix;
10+
private final String prefixString;
11+
12+
public ClusterCommandArgumentsWithPrefixedKeys(ProtocolCommand command, String prefixString) {
13+
super(command);
14+
this.prefixString = prefixString;
15+
prefix = SafeEncoder.encode(prefixString);
16+
}
17+
18+
public CommandArguments key(Object key) {
19+
return super.key(namespacedKey(key));
20+
}
21+
22+
private Object namespacedKey(Object key) {
23+
if (key instanceof Rawable) {
24+
byte[] raw = ((Rawable) key).getRaw();
25+
return RawableFactory.from(namespacedKeyBytes(raw));
26+
}
27+
28+
if (key instanceof byte[]) {
29+
return namespacedKeyBytes((byte[]) key);
30+
}
31+
32+
if (key instanceof String) {
33+
String raw = (String) key;
34+
return prefixString + raw;
35+
}
36+
37+
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
38+
}
39+
40+
private byte[] namespacedKeyBytes(byte[] key) {
41+
byte[] namespaced = new byte[prefix.length + key.length];
42+
System.arraycopy(prefix, 0, namespaced, 0, prefix.length);
43+
System.arraycopy(key, 0, namespaced, prefix.length, key.length);
44+
return namespaced;
45+
}
46+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package redis.clients.jedis;
2+
3+
import redis.clients.jedis.commands.ProtocolCommand;
4+
5+
public class ClusterCommandObjectsWithPrefixedKeys extends ClusterCommandObjects {
6+
// For the purposes of this demonstration, the prefix is assigned statically.
7+
// Additional changes are required to prevent the parent class CommandObjects
8+
// from calling commandArguments in its constructor, which would be a prerequisite
9+
// to making this field into an instance field.
10+
public static String PREFIX_STRING;
11+
12+
@Override
13+
protected ClusterCommandArguments commandArguments(ProtocolCommand command) {
14+
return new ClusterCommandArgumentsWithPrefixedKeys(command, PREFIX_STRING);
15+
}
16+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package redis.clients.jedis;
2+
3+
import redis.clients.jedis.executors.ClusterCommandExecutor;
4+
import redis.clients.jedis.providers.ClusterConnectionProvider;
5+
import redis.clients.jedis.util.JedisClusterCRC16;
6+
7+
import java.time.Duration;
8+
import java.util.Collections;
9+
import java.util.Map;
10+
11+
public class JedisClusterWithPrefixedKeys extends UnifiedJedis {
12+
13+
public JedisClusterWithPrefixedKeys(HostAndPort node, JedisClientConfig clientConfig) {
14+
this(new ClusterConnectionProvider(Collections.singleton(node), clientConfig), clientConfig);
15+
}
16+
17+
public JedisClusterWithPrefixedKeys(ClusterConnectionProvider provider, JedisClientConfig clientConfig) {
18+
super(new ClusterCommandExecutor(provider, 5, Duration.ofSeconds(100)), provider, new ClusterCommandObjectsWithPrefixedKeys(), clientConfig.getRedisProtocol());
19+
20+
}
21+
22+
public Map<String, ConnectionPool> getClusterNodes() {
23+
return ((ClusterConnectionProvider) provider).getNodes();
24+
}
25+
26+
public Connection getConnectionFromSlot(int slot) {
27+
return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
28+
}
29+
30+
// commands
31+
public long spublish(String channel, String message) {
32+
return executeCommand(commandObjects.spublish(channel, message));
33+
}
34+
35+
public long spublish(byte[] channel, byte[] message) {
36+
return executeCommand(commandObjects.spublish(channel, message));
37+
}
38+
39+
public void ssubscribe(final JedisShardedPubSub jedisPubSub, final String... channels) {
40+
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
41+
jedisPubSub.proceed(connection, channels);
42+
}
43+
}
44+
45+
public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... channels) {
46+
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
47+
jedisPubSub.proceed(connection, channels);
48+
}
49+
}
50+
// commands
51+
52+
@Override
53+
public ClusterPipeline pipelined() {
54+
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
55+
}
56+
57+
@Override
58+
public Transaction multi() {
59+
throw new UnsupportedOperationException();
60+
}
61+
}

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,12 @@ public UnifiedJedis(CommandExecutor executor) {
206206
this(executor, (ConnectionProvider) null);
207207
}
208208

209-
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
209+
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
210210
this(executor, provider, new CommandObjects());
211211
}
212212

213213
// Uses a fetched connection to process protocol. Should be avoided if possible.
214-
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
214+
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
215215
this(executor, provider, commandObjects, null);
216216
if (this.provider != null) {
217217
try (Connection conn = this.provider.getConnection()) {
@@ -223,7 +223,7 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, Comm
223223
}
224224
}
225225

226-
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
226+
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
227227
RedisProtocol protocol) {
228228
this.provider = provider;
229229
this.executor = executor;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package redis.clients.jedis;
2+
3+
import org.junit.Test;
4+
import redis.clients.jedis.resps.Tuple;
5+
6+
import java.nio.charset.StandardCharsets;
7+
import java.util.Collections;
8+
9+
import static junit.framework.TestCase.assertEquals;
10+
11+
public class JedisClusterWithPrefixedKeysTest extends JedisClusterTestBase {
12+
private static final DefaultJedisClientConfig DEFAULT_CLIENT_CONFIG = DefaultJedisClientConfig.builder().password("cluster").build();
13+
private static final int DEFAULT_TIMEOUT = 2000;
14+
private static final int DEFAULT_REDIRECTIONS = 5;
15+
private static final ConnectionPoolConfig DEFAULT_POOL_CONFIG = new ConnectionPoolConfig();
16+
17+
@Test
18+
public void hasPrefixedKeys() {
19+
HostAndPort hp = new HostAndPort("127.0.0.1", 7379);
20+
ClusterCommandObjectsWithPrefixedKeys.PREFIX_STRING = "test-prefix:";
21+
22+
try (JedisClusterWithPrefixedKeys cluster = new JedisClusterWithPrefixedKeys(hp, DEFAULT_CLIENT_CONFIG)) {
23+
cluster.set("foo1", "bar1");
24+
cluster.set("foo2".getBytes(StandardCharsets.UTF_8), "bar2".getBytes(StandardCharsets.UTF_8));
25+
ClusterPipeline pipeline = cluster.pipelined();
26+
pipeline.incr("foo3");
27+
pipeline.zadd("foo4", 1234, "bar4");
28+
pipeline.sync();
29+
}
30+
31+
try (JedisCluster cluster = new JedisCluster(hp, DEFAULT_CLIENT_CONFIG, DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
32+
assertEquals("bar1", cluster.get("test-prefix:foo1"));
33+
assertEquals("bar2", cluster.get("test-prefix:foo2"));
34+
assertEquals("1", cluster.get("test-prefix:foo3"));
35+
assertEquals(new Tuple("bar4", 1234d), cluster.zpopmax("test-prefix:foo4"));
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)