Skip to content

Commit e1988c6

Browse files
authored
Add support for redis 7 streams features (#2157)
* xadd * streams redis 7 * linters * test xinfo stream * test xinfo stream * test xclaim
1 parent fdb9075 commit e1988c6

File tree

2 files changed

+55
-10
lines changed

2 files changed

+55
-10
lines changed

redis/commands/core.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -3505,6 +3505,7 @@ def xgroup_create(
35053505
groupname: GroupT,
35063506
id: StreamIdT = "$",
35073507
mkstream: bool = False,
3508+
entries_read: Optional[int] = None,
35083509
) -> ResponseT:
35093510
"""
35103511
Create a new consumer group associated with a stream.
@@ -3517,6 +3518,9 @@ def xgroup_create(
35173518
pieces: list[EncodableT] = ["XGROUP CREATE", name, groupname, id]
35183519
if mkstream:
35193520
pieces.append(b"MKSTREAM")
3521+
if entries_read is not None:
3522+
pieces.extend(["ENTRIESREAD", entries_read])
3523+
35203524
return self.execute_command(*pieces)
35213525

35223526
def xgroup_delconsumer(
@@ -3572,6 +3576,7 @@ def xgroup_setid(
35723576
name: KeyT,
35733577
groupname: GroupT,
35743578
id: StreamIdT,
3579+
entries_read: Optional[int] = None,
35753580
) -> ResponseT:
35763581
"""
35773582
Set the consumer group last delivered ID to something else.
@@ -3581,7 +3586,10 @@ def xgroup_setid(
35813586
35823587
For more information see https://redis.io/commands/xgroup-setid
35833588
"""
3584-
return self.execute_command("XGROUP SETID", name, groupname, id)
3589+
pieces = [name, groupname, id]
3590+
if entries_read is not None:
3591+
pieces.extend(["ENTRIESREAD", entries_read])
3592+
return self.execute_command("XGROUP SETID", *pieces)
35853593

35863594
def xinfo_consumers(self, name: KeyT, groupname: GroupT) -> ResponseT:
35873595
"""

tests/test_commands.py

+46-9
Original file line numberDiff line numberDiff line change
@@ -3735,6 +3735,13 @@ def test_xadd_minlen_and_limit(self, r):
37353735
r.xadd(stream, {"foo": "bar"})
37363736
assert r.xadd(stream, {"foo": "bar"}, approximate=True, minid=m3)
37373737

3738+
@skip_if_server_version_lt("7.0.0")
3739+
def test_xadd_explicit_ms(self, r: redis.Redis):
3740+
stream = "stream"
3741+
message_id = r.xadd(stream, {"foo": "bar"}, "9999999999999999999-*")
3742+
ms = message_id[: message_id.index(b"-")]
3743+
assert ms == b"9999999999999999999"
3744+
37383745
@skip_if_server_version_lt("6.2.0")
37393746
def test_xautoclaim(self, r):
37403747
stream = "stream"
@@ -3820,7 +3827,7 @@ def test_xclaim(self, r):
38203827
== [message_id]
38213828
)
38223829

3823-
@skip_if_server_version_lt("5.0.0")
3830+
@skip_if_server_version_lt("7.0.0")
38243831
def test_xclaim_trimmed(self, r):
38253832
# xclaim should not raise an exception if the item is not there
38263833
stream = "stream"
@@ -3841,9 +3848,8 @@ def test_xclaim_trimmed(self, r):
38413848
# xclaim them from consumer2
38423849
# the item that is still in the stream should be returned
38433850
item = r.xclaim(stream, group, "consumer2", 0, [sid1, sid2])
3844-
assert len(item) == 2
3845-
assert item[0] == (None, None)
3846-
assert item[1][0] == sid2
3851+
assert len(item) == 1
3852+
assert item[0][0] == sid2
38473853

38483854
@skip_if_server_version_lt("5.0.0")
38493855
def test_xdel(self, r):
@@ -3860,7 +3866,7 @@ def test_xdel(self, r):
38603866
assert r.xdel(stream, m1) == 1
38613867
assert r.xdel(stream, m2, m3) == 2
38623868

3863-
@skip_if_server_version_lt("5.0.0")
3869+
@skip_if_server_version_lt("7.0.0")
38643870
def test_xgroup_create(self, r):
38653871
# tests xgroup_create and xinfo_groups
38663872
stream = "stream"
@@ -3877,11 +3883,13 @@ def test_xgroup_create(self, r):
38773883
"consumers": 0,
38783884
"pending": 0,
38793885
"last-delivered-id": b"0-0",
3886+
"entries-read": None,
3887+
"lag": 1,
38803888
}
38813889
]
38823890
assert r.xinfo_groups(stream) == expected
38833891

3884-
@skip_if_server_version_lt("5.0.0")
3892+
@skip_if_server_version_lt("7.0.0")
38853893
def test_xgroup_create_mkstream(self, r):
38863894
# tests xgroup_create and xinfo_groups
38873895
stream = "stream"
@@ -3901,6 +3909,30 @@ def test_xgroup_create_mkstream(self, r):
39013909
"consumers": 0,
39023910
"pending": 0,
39033911
"last-delivered-id": b"0-0",
3912+
"entries-read": None,
3913+
"lag": 0,
3914+
}
3915+
]
3916+
assert r.xinfo_groups(stream) == expected
3917+
3918+
@skip_if_server_version_lt("7.0.0")
3919+
def test_xgroup_create_entriesread(self, r: redis.Redis):
3920+
stream = "stream"
3921+
group = "group"
3922+
r.xadd(stream, {"foo": "bar"})
3923+
3924+
# no group is setup yet, no info to obtain
3925+
assert r.xinfo_groups(stream) == []
3926+
3927+
assert r.xgroup_create(stream, group, 0, entries_read=7)
3928+
expected = [
3929+
{
3930+
"name": group.encode(),
3931+
"consumers": 0,
3932+
"pending": 0,
3933+
"last-delivered-id": b"0-0",
3934+
"entries-read": 7,
3935+
"lag": -6,
39043936
}
39053937
]
39063938
assert r.xinfo_groups(stream) == expected
@@ -3951,21 +3983,23 @@ def test_xgroup_destroy(self, r):
39513983
r.xgroup_create(stream, group, 0)
39523984
assert r.xgroup_destroy(stream, group)
39533985

3954-
@skip_if_server_version_lt("5.0.0")
3986+
@skip_if_server_version_lt("7.0.0")
39553987
def test_xgroup_setid(self, r):
39563988
stream = "stream"
39573989
group = "group"
39583990
message_id = r.xadd(stream, {"foo": "bar"})
39593991

39603992
r.xgroup_create(stream, group, 0)
39613993
# advance the last_delivered_id to the message_id
3962-
r.xgroup_setid(stream, group, message_id)
3994+
r.xgroup_setid(stream, group, message_id, entries_read=2)
39633995
expected = [
39643996
{
39653997
"name": group.encode(),
39663998
"consumers": 0,
39673999
"pending": 0,
39684000
"last-delivered-id": message_id,
4001+
"entries-read": 2,
4002+
"lag": -1,
39694003
}
39704004
]
39714005
assert r.xinfo_groups(stream) == expected
@@ -3995,7 +4029,7 @@ def test_xinfo_consumers(self, r):
39954029
assert isinstance(info[1].pop("idle"), int)
39964030
assert info == expected
39974031

3998-
@skip_if_server_version_lt("5.0.0")
4032+
@skip_if_server_version_lt("7.0.0")
39994033
def test_xinfo_stream(self, r):
40004034
stream = "stream"
40014035
m1 = r.xadd(stream, {"foo": "bar"})
@@ -4005,6 +4039,9 @@ def test_xinfo_stream(self, r):
40054039
assert info["length"] == 2
40064040
assert info["first-entry"] == get_stream_message(r, stream, m1)
40074041
assert info["last-entry"] == get_stream_message(r, stream, m2)
4042+
assert info["max-deleted-entry-id"] == b"0-0"
4043+
assert info["entries-added"] == 2
4044+
assert info["recorded-first-entry-id"] == m1
40084045

40094046
@skip_if_server_version_lt("6.0.0")
40104047
def test_xinfo_stream_full(self, r):

0 commit comments

Comments
 (0)