Skip to content

Added support for MONITOR in clusters #1756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,23 @@ def set_default_node(self, node):
log.info(f"Changed the default cluster node to {node}")
return True

def monitor(self, target_node=None):
"""
Returns a Monitor object for the specified target node.
The default cluster node will be selected if no target node was
specified.
Monitor is useful for handling the MONITOR command to the redis server.
next_command() method returns one command from monitor
listen() method yields commands from monitor.
"""
if target_node is None:
target_node = self.get_default_node()
if target_node.redis_connection is None:
raise RedisClusterException(
f"Cluster Node {target_node.name} has no redis_connection"
)
return target_node.redis_connection.monitor()

def pubsub(self, node=None, host=None, port=None, **kwargs):
"""
Allows passing a ClusterNode, or host&port, to get a pubsub instance
Expand Down
20 changes: 11 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ def skip_ifmodversion_lt(min_version: str, module_name: str):
raise AttributeError(f"No redis module named {module_name}")


def skip_if_redis_enterprise(func):
def skip_if_redis_enterprise():
check = REDIS_INFO["enterprise"] is True
return pytest.mark.skipif(check, reason="Redis enterprise")


def skip_ifnot_redis_enterprise(func):
def skip_ifnot_redis_enterprise():
check = REDIS_INFO["enterprise"] is False
return pytest.mark.skipif(check, reason="Not running in redis enterprise")

Expand Down Expand Up @@ -324,16 +324,18 @@ def master_host(request):
yield parts.hostname, parts.port


def wait_for_command(client, monitor, command):
def wait_for_command(client, monitor, command, key=None):
# issue a command with a key name that's local to this process.
# if we find a command with our key before the command we're waiting
# for, something went wrong
redis_version = REDIS_INFO["version"]
if LooseVersion(redis_version) >= LooseVersion("5.0.0"):
id_str = str(client.client_id())
else:
id_str = f"{random.randrange(2 ** 32):08x}"
key = f"__REDIS-PY-{id_str}__"
if key is None:
# generate key
redis_version = REDIS_INFO["version"]
if LooseVersion(redis_version) >= LooseVersion("5.0.0"):
id_str = str(client.client_id())
else:
id_str = f"{random.randrange(2 ** 32):08x}"
key = f"__REDIS-PY-{id_str}__"
client.get(key)
while True:
monitor_response = monitor.next_command()
Expand Down
54 changes: 53 additions & 1 deletion tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
skip_if_redis_enterprise,
skip_if_server_version_lt,
skip_unless_arch_bits,
wait_for_command,
)

default_host = "127.0.0.1"
Expand Down Expand Up @@ -1771,7 +1772,7 @@ def test_cluster_randomkey(self, r):
assert r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c")

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_log(self, r, request):
key = "{cache}:"
node = r.get_node_from_key(key)
Expand Down Expand Up @@ -2613,3 +2614,54 @@ def test_readonly_pipeline_from_readonly_client(self, request):
if executed_on_replica:
break
assert executed_on_replica is True


@pytest.mark.onlycluster
class TestClusterMonitor:
def test_wait_command_not_found(self, r):
"Make sure the wait_for_command func works when command is not found"
key = "foo"
node = r.get_node_from_key(key)
with r.monitor(target_node=node) as m:
response = wait_for_command(r, m, "nothing", key=key)
assert response is None

def test_response_values(self, r):
db = 0
key = "foo"
node = r.get_node_from_key(key)
with r.monitor(target_node=node) as m:
r.ping(target_nodes=node)
response = wait_for_command(r, m, "PING", key=key)
assert isinstance(response["time"], float)
assert response["db"] == db
assert response["client_type"] in ("tcp", "unix")
assert isinstance(response["client_address"], str)
assert isinstance(response["client_port"], str)
assert response["command"] == "PING"

def test_command_with_quoted_key(self, r):
key = "{foo}1"
node = r.get_node_from_key(key)
with r.monitor(node) as m:
r.get('{foo}"bar')
response = wait_for_command(r, m, 'GET {foo}"bar', key=key)
assert response["command"] == 'GET {foo}"bar'

def test_command_with_binary_data(self, r):
key = "{foo}1"
node = r.get_node_from_key(key)
with r.monitor(target_node=node) as m:
byte_string = b"{foo}bar\x92"
r.get(byte_string)
response = wait_for_command(r, m, "GET {foo}bar\\x92", key=key)
assert response["command"] == "GET {foo}bar\\x92"

def test_command_with_escaped_data(self, r):
key = "{foo}1"
node = r.get_node_from_key(key)
with r.monitor(target_node=node) as m:
byte_string = b"{foo}bar\\x92"
r.get(byte_string)
response = wait_for_command(r, m, "GET {foo}bar\\\\x92", key=key)
assert response["command"] == "GET {foo}bar\\\\x92"
53 changes: 27 additions & 26 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_acl_cat_with_category(self, r):
assert "get" in commands

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_deluser(self, r, request):
username = "redis-py-user"

Expand All @@ -109,7 +109,7 @@ def teardown():
assert r.acl_getuser(users[4]) is None

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_genpass(self, r):
password = r.acl_genpass()
assert isinstance(password, str)
Expand All @@ -123,7 +123,7 @@ def test_acl_genpass(self, r):
assert isinstance(password, str)

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_getuser_setuser(self, r, request):
username = "redis-py-user"

Expand Down Expand Up @@ -236,7 +236,7 @@ def test_acl_help(self, r):
assert len(res) != 0

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_list(self, r, request):
username = "redis-py-user"

Expand All @@ -250,7 +250,8 @@ def teardown():
assert len(users) == 2

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
@pytest.mark.onlynoncluster
def test_acl_log(self, r, request):
username = "redis-py-user"

Expand Down Expand Up @@ -292,7 +293,7 @@ def teardown():
assert r.acl_log_reset()

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_setuser_categories_without_prefix_fails(self, r, request):
username = "redis-py-user"

Expand All @@ -305,7 +306,7 @@ def teardown():
r.acl_setuser(username, categories=["list"])

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_setuser_commands_without_prefix_fails(self, r, request):
username = "redis-py-user"

Expand All @@ -318,7 +319,7 @@ def teardown():
r.acl_setuser(username, commands=["get"])

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request):
username = "redis-py-user"

Expand Down Expand Up @@ -363,7 +364,7 @@ def test_client_list_types_not_replica(self, r):
clients = r.client_list(_type=client_type)
assert isinstance(clients, list)

@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_client_list_replica(self, r):
clients = r.client_list(_type="replica")
assert isinstance(clients, list)
Expand Down Expand Up @@ -529,7 +530,7 @@ def test_client_kill_filter_by_laddr(self, r, r2):
assert r.client_kill_filter(laddr=client_2_addr)

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_client_kill_filter_by_user(self, r, request):
killuser = "user_to_kill"
r.acl_setuser(
Expand All @@ -549,7 +550,7 @@ def test_client_kill_filter_by_user(self, r, request):

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("2.9.50")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_client_pause(self, r):
assert r.client_pause(1)
assert r.client_pause(timeout=1)
Expand All @@ -558,7 +559,7 @@ def test_client_pause(self, r):

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.2.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_client_unpause(self, r):
assert r.client_unpause() == b"OK"

Expand All @@ -578,7 +579,7 @@ def test_client_reply(self, r, r_timeout):

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_client_getredir(self, r):
assert isinstance(r.client_getredir(), int)
assert r.client_getredir() == -1
Expand All @@ -590,7 +591,7 @@ def test_config_get(self, r):
# assert data['maxmemory'].isdigit()

@pytest.mark.onlynoncluster
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_config_resetstat(self, r):
r.ping()
prior_commands_processed = int(r.info()["total_commands_processed"])
Expand All @@ -599,7 +600,7 @@ def test_config_resetstat(self, r):
reset_commands_processed = int(r.info()["total_commands_processed"])
assert reset_commands_processed < prior_commands_processed

@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_config_set(self, r):
r.config_set("timeout", 70)
assert r.config_get()["timeout"] == "70"
Expand All @@ -626,7 +627,7 @@ def test_info(self, r):
assert "redis_version" in info.keys()

@pytest.mark.onlynoncluster
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_lastsave(self, r):
assert isinstance(r.lastsave(), datetime.datetime)

Expand Down Expand Up @@ -724,7 +725,7 @@ def test_time(self, r):
assert isinstance(t[0], int)
assert isinstance(t[1], int)

@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_bgsave(self, r):
assert r.bgsave()
time.sleep(0.3)
Expand Down Expand Up @@ -1305,7 +1306,7 @@ def test_stralgo_lcs(self, r):
value2 = "mynewtext"
res = "mytext"

if skip_if_redis_enterprise(None).args[0] is True:
if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
assert r.stralgo("LCS", value1, value2) == res
return
Expand Down Expand Up @@ -1347,7 +1348,7 @@ def test_strlen(self, r):
def test_substr(self, r):
r["a"] = "0123456789"

if skip_if_redis_enterprise(None).args[0] is True:
if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
assert r.substr("a", 0) == b"0123456789"
return
Expand Down Expand Up @@ -2658,7 +2659,7 @@ def test_cluster_slaves(self, mock_cluster_resp_slaves):

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("3.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_readwrite(self, r):
assert r.readwrite()

Expand Down Expand Up @@ -4009,7 +4010,7 @@ def test_memory_doctor(self, r):

@skip_if_server_version_lt("4.0.0")
def test_memory_malloc_stats(self, r):
if skip_if_redis_enterprise(None).args[0] is True:
if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
assert r.memory_malloc_stats()
return
Expand All @@ -4022,7 +4023,7 @@ def test_memory_stats(self, r):
# has data
r.set("foo", "bar")

if skip_if_redis_enterprise(None).args[0] is True:
if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
stats = r.memory_stats()
return
Expand All @@ -4040,7 +4041,7 @@ def test_memory_usage(self, r):

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("4.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_module_list(self, r):
assert isinstance(r.module_list(), list)
for x in r.module_list():
Expand Down Expand Up @@ -4081,7 +4082,7 @@ def test_command(self, r):

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("4.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_module(self, r):
with pytest.raises(redis.exceptions.ModuleError) as excinfo:
r.module_load("/some/fake/path")
Expand Down Expand Up @@ -4137,7 +4138,7 @@ def test_restore_frequency(self, r):

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("5.0.0")
@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_replicaof(self, r):
with pytest.raises(redis.ResponseError):
assert r.replicaof("NO ONE")
Expand Down Expand Up @@ -4219,7 +4220,7 @@ def test_22_info(self, r):
assert "6" in parsed["allocation_stats"]
assert ">=256" in parsed["allocation_stats"]

@skip_if_redis_enterprise
@skip_if_redis_enterprise()
def test_large_responses(self, r):
"The PythonParser has some special cases for return values > 1MB"
# load up 5MB of data into a key
Expand Down
Loading