Skip to content

Commit 9c60670

Browse files
add idle to xpending (#1523)
1 parent 238f69e commit 9c60670

File tree

2 files changed

+74
-14
lines changed

2 files changed

+74
-14
lines changed

redis/client.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2747,7 +2747,7 @@ def xpending(self, name, groupname):
27472747
return self.execute_command('XPENDING', name, groupname)
27482748

27492749
def xpending_range(self, name, groupname, min, max, count,
2750-
consumername=None):
2750+
consumername=None, idle=None):
27512751
"""
27522752
Returns information about pending messages, in a range.
27532753
name: name of the stream.
@@ -2756,21 +2756,35 @@ def xpending_range(self, name, groupname, min, max, count,
27562756
max: maximum stream ID.
27572757
count: number of messages to return
27582758
consumername: name of a consumer to filter by (optional).
2759+
idle: available from version 6.2. filter entries by their
2760+
idle-time, given in milliseconds (optional).
27592761
"""
2762+
if {min, max, count} == {None}:
2763+
if idle is not None or consumername is not None:
2764+
raise DataError("if XPENDING is provided with idle time"
2765+
" or consumername, it must be provided"
2766+
" with min, max and count parameters")
2767+
return self.xpending(name, groupname)
2768+
27602769
pieces = [name, groupname]
2761-
if min is not None or max is not None or count is not None:
2762-
if min is None or max is None or count is None:
2763-
raise DataError("XPENDING must be provided with min, max "
2764-
"and count parameters, or none of them. ")
2765-
if not isinstance(count, int) or count < -1:
2766-
raise DataError("XPENDING count must be a integer >= -1")
2767-
pieces.extend((min, max, str(count)))
2768-
if consumername is not None:
2769-
if min is None or max is None or count is None:
2770-
raise DataError("if XPENDING is provided with consumername,"
2771-
" it must be provided with min, max and"
2772-
" count parameters")
2773-
pieces.append(consumername)
2770+
if min is None or max is None or count is None:
2771+
raise DataError("XPENDING must be provided with min, max "
2772+
"and count parameters, or none of them.")
2773+
# idle
2774+
try:
2775+
if int(idle) < 0:
2776+
raise DataError("XPENDING idle must be a integer >= 0")
2777+
pieces.extend(['IDLE', idle])
2778+
except TypeError:
2779+
pass
2780+
# count
2781+
try:
2782+
if int(count) < 0:
2783+
raise DataError("XPENDING count must be a integer >= 0")
2784+
pieces.extend([min, max, count])
2785+
except TypeError:
2786+
pass
2787+
27742788
return self.execute_command('XPENDING', *pieces, parse_detail=True)
27752789

27762790
def xrange(self, name, min='-', max='+', count=None):

tests/test_commands.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2636,6 +2636,52 @@ def test_xpending_range(self, r):
26362636
assert response[1]['message_id'] == m2
26372637
assert response[1]['consumer'] == consumer2.encode()
26382638

2639+
@skip_if_server_version_lt('6.2.0')
2640+
def test_xpending_range_idle(self, r):
2641+
stream = 'stream'
2642+
group = 'group'
2643+
consumer1 = 'consumer1'
2644+
consumer2 = 'consumer2'
2645+
r.xadd(stream, {'foo': 'bar'})
2646+
r.xadd(stream, {'foo': 'bar'})
2647+
r.xgroup_create(stream, group, 0)
2648+
2649+
# read 1 message from the group with each consumer
2650+
r.xreadgroup(group, consumer1, streams={stream: '>'}, count=1)
2651+
r.xreadgroup(group, consumer2, streams={stream: '>'}, count=1)
2652+
2653+
response = r.xpending_range(stream, group,
2654+
min='-', max='+', count=5)
2655+
assert len(response) == 2
2656+
response = r.xpending_range(stream, group,
2657+
min='-', max='+', count=5, idle=1000)
2658+
assert len(response) == 0
2659+
2660+
def test_xpending_range_negative(self, r):
2661+
stream = 'stream'
2662+
group = 'group'
2663+
with pytest.raises(redis.DataError):
2664+
r.xpending_range(stream, group, min='-', max='+', count=None)
2665+
with pytest.raises(ValueError):
2666+
r.xpending_range(stream, group, min='-', max='+', count="one")
2667+
with pytest.raises(redis.DataError):
2668+
r.xpending_range(stream, group, min='-', max='+', count=-1)
2669+
with pytest.raises(ValueError):
2670+
r.xpending_range(stream, group, min='-', max='+', count=5,
2671+
idle="one")
2672+
with pytest.raises(redis.exceptions.ResponseError):
2673+
r.xpending_range(stream, group, min='-', max='+', count=5,
2674+
idle=1.5)
2675+
with pytest.raises(redis.DataError):
2676+
r.xpending_range(stream, group, min='-', max='+', count=5,
2677+
idle=-1)
2678+
with pytest.raises(redis.DataError):
2679+
r.xpending_range(stream, group, min=None, max=None, count=None,
2680+
idle=0)
2681+
with pytest.raises(redis.DataError):
2682+
r.xpending_range(stream, group, min=None, max=None, count=None,
2683+
consumername=0)
2684+
26392685
@skip_if_server_version_lt('5.0.0')
26402686
def test_xrange(self, r):
26412687
stream = 'stream'

0 commit comments

Comments
 (0)