diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 75ba868..0712b85 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -60,8 +60,9 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to send. """ + queue_name = message.labels.get("queue_name") or self.queue_name async with Redis(connection_pool=self.connection_pool) as redis_conn: - await redis_conn.publish(self.queue_name, message.message) + await redis_conn.publish(queue_name, message.message) async def listen(self) -> AsyncGenerator[bytes, None]: """ @@ -95,8 +96,9 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ + queue_name = message.labels.get("queue_name") or self.queue_name async with Redis(connection_pool=self.connection_pool) as redis_conn: - await redis_conn.lpush(self.queue_name, message.message) + await redis_conn.lpush(queue_name, message.message) async def listen(self) -> AsyncGenerator[bytes, None]: """