Skip to content

Problem with using max connections limit #53

@stinovlas

Description

@stinovlas

Problem

Both BaseRedisBroker and RedisScheduleSource have argument max_connection_pool_size which is passed to ConnectionPool. However, ConnectionPool implementation throws redis.exceptions.ConnectionError when maximum amount of connections is exceeded. This exception is not caught and bubbles all the way up, which kills the scheduler (and
broker).

# Minimal working example (with scheduler)
import asyncio

from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq_redis.schedule_source import RedisScheduleSource

def get_scheduled_task():
    return ScheduledTask(
        task_name="test_task", labels={}, args=[], kwargs={}, cron="1 1 0 0 0"
    )

source = RedisScheduleSource("redis://127.0.0.1:6379", max_connection_pool_size=5)

async def subtest():
    task = get_scheduled_task()
    await source.add_schedule(task)
    print("task added")
    await source.delete_schedule(task.schedule_id)
    print("task deleted")

async def test():
    await asyncio.gather(*[subtest() for _ in range(10)])

if __name__ == "__main__":
    asyncio.run(test())

Suggestions

I found out that redis provides redis.asyncio.BlockingConnectionPool which waits for connection instead of throwing the exception. There's a configurable timeout (after which the exception is raised). Despite the name, the asyncio variant of BlockingConnectionPool does not actually block the whole program, context is correctly switched on async sleep.

We could leverage this class to provide easier processing of max connections limit. Otherwise, user would need to override taskiq-redis classes and replace ConnectionPool with BlockingConnectionPool manually.

I see following possibilities:

  1. Add new argument connection_pool_cls: Type[ConnectionPool] for RedisScheduleSource and BaseRedisBroker. This would contain any ConnectionPool subclass (including BlockingConnectionPool). This is the one I prefer.
  2. Add new argument connection_pool: ConnectionPool for RedisScheduleSource and BaseRedisBroker. This would contain an instance of any ConnectionPool subclass (including BlockingConnection). The URL would have to be duplicated in this case (passed both to the ConnectionPool instance and RedisScheduleSource itself (even if not used, in order to maintain compatible API).
  3. Add new argument blocking: bool for RedisScheduleSource and BaseRedisBroker. Based on the value, we'd internally decide whether to use ConnectionPool or BlockingConnectionPool. This is the least flexible, because behaviour cannot be easily changed from outside (e.g. by subclassing ConnectionPool).

In all cases, the change can be made backwards compatible (although I'd argue that current behaviour with uncaught exception doesn't make sense and BlockingConnectionPool is a good default). Alternatively, we could:

  1. Change the implementation to BlockingConnectionPool and throw away ConnectionPool altoghether. This would minimize the changes (just replace ConnectionPool with BlockingConnectionPool), but it's a breaking change.

Notes

redis.asyncio.RedisCluster does not suffer the same problem, because it has it's own connection pool handling mechanism and already allows for retries.
*EDIT: There is actually some problem with cluster reconnects. I created redis/redis-py#3135 to resolve it.

We should also consider some modification of RedisAsyncResultBackend and RedisAsyncClusterResultBackend. These classes don't accept any argument to limit number of simultaneous connections.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions