diff --git a/redis/commands/core.py b/redis/commands/core.py index ac1b6c78e4..8bbcda3a69 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3505,6 +3505,7 @@ def xgroup_create( groupname: GroupT, id: StreamIdT = "$", mkstream: bool = False, + entries_read: Optional[int] = None, ) -> ResponseT: """ Create a new consumer group associated with a stream. @@ -3517,6 +3518,9 @@ def xgroup_create( pieces: list[EncodableT] = ["XGROUP CREATE", name, groupname, id] if mkstream: pieces.append(b"MKSTREAM") + if entries_read is not None: + pieces.extend(["ENTRIESREAD", entries_read]) + return self.execute_command(*pieces) def xgroup_delconsumer( @@ -3572,6 +3576,7 @@ def xgroup_setid( name: KeyT, groupname: GroupT, id: StreamIdT, + entries_read: Optional[int] = None, ) -> ResponseT: """ Set the consumer group last delivered ID to something else. @@ -3581,7 +3586,10 @@ def xgroup_setid( For more information see https://redis.io/commands/xgroup-setid """ - return self.execute_command("XGROUP SETID", name, groupname, id) + pieces = [name, groupname, id] + if entries_read is not None: + pieces.extend(["ENTRIESREAD", entries_read]) + return self.execute_command("XGROUP SETID", *pieces) def xinfo_consumers(self, name: KeyT, groupname: GroupT) -> ResponseT: """ diff --git a/tests/test_commands.py b/tests/test_commands.py index 11c9939901..59754123ac 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -3735,6 +3735,13 @@ def test_xadd_minlen_and_limit(self, r): r.xadd(stream, {"foo": "bar"}) assert r.xadd(stream, {"foo": "bar"}, approximate=True, minid=m3) + @skip_if_server_version_lt("7.0.0") + def test_xadd_explicit_ms(self, r: redis.Redis): + stream = "stream" + message_id = r.xadd(stream, {"foo": "bar"}, "9999999999999999999-*") + ms = message_id[: message_id.index(b"-")] + assert ms == b"9999999999999999999" + @skip_if_server_version_lt("6.2.0") def test_xautoclaim(self, r): stream = "stream" @@ -3820,7 +3827,7 @@ def test_xclaim(self, r): == [message_id] ) - @skip_if_server_version_lt("5.0.0") + @skip_if_server_version_lt("7.0.0") def test_xclaim_trimmed(self, r): # xclaim should not raise an exception if the item is not there stream = "stream" @@ -3841,9 +3848,8 @@ def test_xclaim_trimmed(self, r): # xclaim them from consumer2 # the item that is still in the stream should be returned item = r.xclaim(stream, group, "consumer2", 0, [sid1, sid2]) - assert len(item) == 2 - assert item[0] == (None, None) - assert item[1][0] == sid2 + assert len(item) == 1 + assert item[0][0] == sid2 @skip_if_server_version_lt("5.0.0") def test_xdel(self, r): @@ -3860,7 +3866,7 @@ def test_xdel(self, r): assert r.xdel(stream, m1) == 1 assert r.xdel(stream, m2, m3) == 2 - @skip_if_server_version_lt("5.0.0") + @skip_if_server_version_lt("7.0.0") def test_xgroup_create(self, r): # tests xgroup_create and xinfo_groups stream = "stream" @@ -3877,11 +3883,13 @@ def test_xgroup_create(self, r): "consumers": 0, "pending": 0, "last-delivered-id": b"0-0", + "entries-read": None, + "lag": 1, } ] assert r.xinfo_groups(stream) == expected - @skip_if_server_version_lt("5.0.0") + @skip_if_server_version_lt("7.0.0") def test_xgroup_create_mkstream(self, r): # tests xgroup_create and xinfo_groups stream = "stream" @@ -3901,6 +3909,30 @@ def test_xgroup_create_mkstream(self, r): "consumers": 0, "pending": 0, "last-delivered-id": b"0-0", + "entries-read": None, + "lag": 0, + } + ] + assert r.xinfo_groups(stream) == expected + + @skip_if_server_version_lt("7.0.0") + def test_xgroup_create_entriesread(self, r: redis.Redis): + stream = "stream" + group = "group" + r.xadd(stream, {"foo": "bar"}) + + # no group is setup yet, no info to obtain + assert r.xinfo_groups(stream) == [] + + assert r.xgroup_create(stream, group, 0, entries_read=7) + expected = [ + { + "name": group.encode(), + "consumers": 0, + "pending": 0, + "last-delivered-id": b"0-0", + "entries-read": 7, + "lag": -6, } ] assert r.xinfo_groups(stream) == expected @@ -3951,7 +3983,7 @@ def test_xgroup_destroy(self, r): r.xgroup_create(stream, group, 0) assert r.xgroup_destroy(stream, group) - @skip_if_server_version_lt("5.0.0") + @skip_if_server_version_lt("7.0.0") def test_xgroup_setid(self, r): stream = "stream" group = "group" @@ -3959,13 +3991,15 @@ def test_xgroup_setid(self, r): r.xgroup_create(stream, group, 0) # advance the last_delivered_id to the message_id - r.xgroup_setid(stream, group, message_id) + r.xgroup_setid(stream, group, message_id, entries_read=2) expected = [ { "name": group.encode(), "consumers": 0, "pending": 0, "last-delivered-id": message_id, + "entries-read": 2, + "lag": -1, } ] assert r.xinfo_groups(stream) == expected @@ -3995,7 +4029,7 @@ def test_xinfo_consumers(self, r): assert isinstance(info[1].pop("idle"), int) assert info == expected - @skip_if_server_version_lt("5.0.0") + @skip_if_server_version_lt("7.0.0") def test_xinfo_stream(self, r): stream = "stream" m1 = r.xadd(stream, {"foo": "bar"}) @@ -4005,6 +4039,9 @@ def test_xinfo_stream(self, r): assert info["length"] == 2 assert info["first-entry"] == get_stream_message(r, stream, m1) assert info["last-entry"] == get_stream_message(r, stream, m2) + assert info["max-deleted-entry-id"] == b"0-0" + assert info["entries-added"] == 2 + assert info["recorded-first-entry-id"] == m1 @skip_if_server_version_lt("6.0.0") def test_xinfo_stream_full(self, r):