Skip to content

Add support for redis 7 streams features #2157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3505,6 +3505,7 @@ def xgroup_create(
groupname: GroupT,
id: StreamIdT = "$",
mkstream: bool = False,
entries_read: Optional[int] = None,
) -> ResponseT:
"""
Create a new consumer group associated with a stream.
Expand All @@ -3517,6 +3518,9 @@ def xgroup_create(
pieces: list[EncodableT] = ["XGROUP CREATE", name, groupname, id]
if mkstream:
pieces.append(b"MKSTREAM")
if entries_read is not None:
pieces.extend(["ENTRIESREAD", entries_read])

return self.execute_command(*pieces)

def xgroup_delconsumer(
Expand Down Expand Up @@ -3572,6 +3576,7 @@ def xgroup_setid(
name: KeyT,
groupname: GroupT,
id: StreamIdT,
entries_read: Optional[int] = None,
) -> ResponseT:
"""
Set the consumer group last delivered ID to something else.
Expand All @@ -3581,7 +3586,10 @@ def xgroup_setid(

For more information see https://redis.io/commands/xgroup-setid
"""
return self.execute_command("XGROUP SETID", name, groupname, id)
pieces = [name, groupname, id]
if entries_read is not None:
pieces.extend(["ENTRIESREAD", entries_read])
return self.execute_command("XGROUP SETID", *pieces)

def xinfo_consumers(self, name: KeyT, groupname: GroupT) -> ResponseT:
"""
Expand Down
55 changes: 46 additions & 9 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3735,6 +3735,13 @@ def test_xadd_minlen_and_limit(self, r):
r.xadd(stream, {"foo": "bar"})
assert r.xadd(stream, {"foo": "bar"}, approximate=True, minid=m3)

@skip_if_server_version_lt("7.0.0")
def test_xadd_explicit_ms(self, r: redis.Redis):
stream = "stream"
message_id = r.xadd(stream, {"foo": "bar"}, "9999999999999999999-*")
ms = message_id[: message_id.index(b"-")]
assert ms == b"9999999999999999999"

@skip_if_server_version_lt("6.2.0")
def test_xautoclaim(self, r):
stream = "stream"
Expand Down Expand Up @@ -3820,7 +3827,7 @@ def test_xclaim(self, r):
== [message_id]
)

@skip_if_server_version_lt("5.0.0")
@skip_if_server_version_lt("7.0.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

def test_xclaim_trimmed(self, r):
# xclaim should not raise an exception if the item is not there
stream = "stream"
Expand All @@ -3841,9 +3848,8 @@ def test_xclaim_trimmed(self, r):
# xclaim them from consumer2
# the item that is still in the stream should be returned
item = r.xclaim(stream, group, "consumer2", 0, [sid1, sid2])
assert len(item) == 2
assert item[0] == (None, None)
assert item[1][0] == sid2
assert len(item) == 1
assert item[0][0] == sid2

@skip_if_server_version_lt("5.0.0")
def test_xdel(self, r):
Expand All @@ -3860,7 +3866,7 @@ def test_xdel(self, r):
assert r.xdel(stream, m1) == 1
assert r.xdel(stream, m2, m3) == 2

@skip_if_server_version_lt("5.0.0")
@skip_if_server_version_lt("7.0.0")
def test_xgroup_create(self, r):
# tests xgroup_create and xinfo_groups
stream = "stream"
Expand All @@ -3877,11 +3883,13 @@ def test_xgroup_create(self, r):
"consumers": 0,
"pending": 0,
"last-delivered-id": b"0-0",
"entries-read": None,
"lag": 1,
}
]
assert r.xinfo_groups(stream) == expected

@skip_if_server_version_lt("5.0.0")
@skip_if_server_version_lt("7.0.0")
def test_xgroup_create_mkstream(self, r):
# tests xgroup_create and xinfo_groups
stream = "stream"
Expand All @@ -3901,6 +3909,30 @@ def test_xgroup_create_mkstream(self, r):
"consumers": 0,
"pending": 0,
"last-delivered-id": b"0-0",
"entries-read": None,
"lag": 0,
}
]
assert r.xinfo_groups(stream) == expected

@skip_if_server_version_lt("7.0.0")
def test_xgroup_create_entriesread(self, r: redis.Redis):
stream = "stream"
group = "group"
r.xadd(stream, {"foo": "bar"})

# no group is setup yet, no info to obtain
assert r.xinfo_groups(stream) == []

assert r.xgroup_create(stream, group, 0, entries_read=7)
expected = [
{
"name": group.encode(),
"consumers": 0,
"pending": 0,
"last-delivered-id": b"0-0",
"entries-read": 7,
"lag": -6,
}
]
assert r.xinfo_groups(stream) == expected
Expand Down Expand Up @@ -3951,21 +3983,23 @@ def test_xgroup_destroy(self, r):
r.xgroup_create(stream, group, 0)
assert r.xgroup_destroy(stream, group)

@skip_if_server_version_lt("5.0.0")
@skip_if_server_version_lt("7.0.0")
def test_xgroup_setid(self, r):
stream = "stream"
group = "group"
message_id = r.xadd(stream, {"foo": "bar"})

r.xgroup_create(stream, group, 0)
# advance the last_delivered_id to the message_id
r.xgroup_setid(stream, group, message_id)
r.xgroup_setid(stream, group, message_id, entries_read=2)
expected = [
{
"name": group.encode(),
"consumers": 0,
"pending": 0,
"last-delivered-id": message_id,
"entries-read": 2,
"lag": -1,
}
]
assert r.xinfo_groups(stream) == expected
Expand Down Expand Up @@ -3995,7 +4029,7 @@ def test_xinfo_consumers(self, r):
assert isinstance(info[1].pop("idle"), int)
assert info == expected

@skip_if_server_version_lt("5.0.0")
@skip_if_server_version_lt("7.0.0")
def test_xinfo_stream(self, r):
stream = "stream"
m1 = r.xadd(stream, {"foo": "bar"})
Expand All @@ -4005,6 +4039,9 @@ def test_xinfo_stream(self, r):
assert info["length"] == 2
assert info["first-entry"] == get_stream_message(r, stream, m1)
assert info["last-entry"] == get_stream_message(r, stream, m2)
assert info["max-deleted-entry-id"] == b"0-0"
assert info["entries-added"] == 2
assert info["recorded-first-entry-id"] == m1

@skip_if_server_version_lt("6.0.0")
def test_xinfo_stream_full(self, r):
Expand Down