Skip to content

Commit 0d34a45

Browse files
committed
feat: add maxlen to stream sentinel and cluster brokers
1 parent 513b77e commit 0d34a45

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

taskiq_redis/redis_cluster_broker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def __init__(
9292
consumer_id: str = "$",
9393
mkstream: bool = True,
9494
xread_block: int = 10000,
95+
maxlen: Optional[int] = None,
9596
additional_streams: Optional[Dict[str, str]] = None,
9697
**connection_kwargs: Any,
9798
) -> None:
@@ -111,6 +112,8 @@ def __init__(
111112
:param mkstream: create stream if it does not exist.
112113
:param xread_block: block time in ms for xreadgroup.
113114
Better to set it to a bigger value, to avoid unnecessary calls.
115+
:param maxlen: sets the maximum length of the stream
116+
trims (the old values of) the stream each time a new element is added
114117
:param additional_streams: additional streams to read from.
115118
Each key is a stream name, value is a consumer id.
116119
"""
@@ -125,6 +128,7 @@ def __init__(
125128
self.consumer_id = consumer_id
126129
self.mkstream = mkstream
127130
self.block = xread_block
131+
self.maxlen = maxlen
128132
self.additional_streams = additional_streams or {}
129133

130134
async def _declare_consumer_group(self) -> None:
@@ -154,7 +158,11 @@ async def kick(self, message: BrokerMessage) -> None:
154158
155159
:param message: message to append.
156160
"""
157-
await self.redis.xadd(self.queue_name, {b"data": message.message})
161+
await self.redis.xadd(
162+
self.queue_name,
163+
{b"data": message.message},
164+
maxlen=self.maxlen,
165+
)
158166

159167
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
160168
async def _ack() -> None:

taskiq_redis/redis_sentinel_broker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ def __init__(
157157
consumer_id: str = "$",
158158
mkstream: bool = True,
159159
xread_block: int = 10000,
160+
maxlen: Optional[int] = None,
160161
additional_streams: Optional[Dict[str, str]] = None,
161162
**connection_kwargs: Any,
162163
) -> None:
@@ -176,6 +177,8 @@ def __init__(
176177
:param mkstream: create stream if it does not exist.
177178
:param xread_block: block time in ms for xreadgroup.
178179
Better to set it to a bigger value, to avoid unnecessary calls.
180+
:param maxlen: sets the maximum length of the stream
181+
trims (the old values of) the stream each time a new element is added
179182
:param additional_streams: additional streams to read from.
180183
Each key is a stream name, value is a consumer id.
181184
"""
@@ -193,6 +196,7 @@ def __init__(
193196
self.consumer_id = consumer_id
194197
self.mkstream = mkstream
195198
self.block = xread_block
199+
self.maxlen = maxlen
196200
self.additional_streams = additional_streams or {}
197201

198202
async def _declare_consumer_group(self) -> None:
@@ -223,7 +227,11 @@ async def kick(self, message: BrokerMessage) -> None:
223227
:param message: message to append.
224228
"""
225229
async with self._acquire_master_conn() as redis_conn:
226-
await redis_conn.xadd(self.queue_name, {b"data": message.message})
230+
await redis_conn.xadd(
231+
self.queue_name,
232+
{b"data": message.message},
233+
maxlen=self.maxlen,
234+
)
227235

228236
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
229237
async def _ack() -> None:

0 commit comments

Comments
 (0)