From 41fe84a9560698214649941a7f73a97b37ba48b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien?= Date: Wed, 7 Feb 2024 09:17:26 +0100 Subject: [PATCH] feat(broker): allow to set queue name dynamically when kicking --- taskiq_redis/redis_broker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 75ba868..0712b85 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -60,8 +60,9 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to send. """ + queue_name = message.labels.get("queue_name") or self.queue_name async with Redis(connection_pool=self.connection_pool) as redis_conn: - await redis_conn.publish(self.queue_name, message.message) + await redis_conn.publish(queue_name, message.message) async def listen(self) -> AsyncGenerator[bytes, None]: """ @@ -95,8 +96,9 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ + queue_name = message.labels.get("queue_name") or self.queue_name async with Redis(connection_pool=self.connection_pool) as redis_conn: - await redis_conn.lpush(self.queue_name, message.message) + await redis_conn.lpush(queue_name, message.message) async def listen(self) -> AsyncGenerator[bytes, None]: """