diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py index abe1892..da5d56e 100644 --- a/taskiq_redis/redis_cluster_broker.py +++ b/taskiq_redis/redis_cluster_broker.py @@ -55,7 +55,8 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ - await self.redis.lpush(self.queue_name, message.message) # type: ignore + queue_name = message.labels.get("queue_name") or self.queue_name + await self.redis.lpush(queue_name, message.message) # type: ignore async def listen(self) -> AsyncGenerator[bytes, None]: """ @@ -162,8 +163,9 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ + queue_name = message.labels.get("queue_name") or self.queue_name await self.redis.xadd( - self.queue_name, + queue_name, {b"data": message.message}, maxlen=self.maxlen, approximate=self.approximate,