Description
First off, Taskiq is a great library! Thanks so much for bringing async to distributed task queues in Python. We're running this in production and it's doing well generally.
Overview
RedisScheduleSource.get_schedules
becomes very slow with a large number of keys (2-3 minutes with real-world usage over the network for a full scan of 600k keys).
Context
We have minutely scheduled tasks (via the LabelScheduleSource
and were seeing them fired every 2-3 mins. We had the RedisScheduleSource
configured as a second source. I identified that the RedisScheduleSource
was the cause of the issue due to the use of the scan_iter command and its default page size of 10 (search for default count on that page), therefore requiring 60k requests to Redis. Our monitoring confirms that assessment, this is one day of usage.

We ended up just removing the RedisScheduleSource
and the issue went away, we had misconfigured it and thought it was necessary for the decorator crons to work. Now I understand it is for dynamic scheduling, which we're not using yet.
Proposed solution
Add configurability for the number of keys in each scan page. This should allow fewer network hops and therefore much better performance for high numbers of small keys. I'm happy to put up a PR for this if you're accepting contributions!
Initial testing
Here's the results from a quick test of 0 keys versus 500k keys with 10 character values on a Mac Mini M4 Pro with Redis running in docker. Results would of course be more extreme over an external network but it's already a very substantial difference.
Existing logic
nick@Mac-mini taskiq-redis % poetry run python scheduler.py
Time taken: 0.01 seconds
nick@Mac-mini taskiq-redis % poetry run python insert_keys.py
nick@Mac-mini taskiq-redis % poetry run python scheduler.py
Time taken: 13.65 seconds
Passing count=1000
to scan_iter
nick@Mac-mini taskiq-redis % poetry run python scheduler.py
Time taken: 0.01 seconds
nick@Mac-mini taskiq-redis % poetry run python insert_keys.py
nick@Mac-mini taskiq-redis % poetry run python scheduler.py
Time taken: 0.23 seconds
Scripts used for reference
# scheduler.py
import asyncio
from taskiq import ScheduledTask
from taskiq_redis import (
RedisClusterScheduleSource,
RedisScheduleSource,
RedisSentinelScheduleSource,
)
import time
redis_url = "redis://localhost:6379/0"
async def main():
start_time = time.perf_counter()
source = RedisScheduleSource(redis_url)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
cron="* * * * *",
)
await source.add_schedule(schedule)
await source.get_schedules()
await source.shutdown()
end_time = time.perf_counter()
print(f"Time taken: {end_time - start_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
# insert_keys.py
import redis
import random
import string
from typing import Final
TOTAL_KEYS: Final[int] = 500_000
BATCH_SIZE: Final[int] = 1000 # Command flush interval
def generate_random_value(length: int = 10) -> str:
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
def main() -> None:
r: redis.Redis = redis.Redis(host="localhost", port=6379, db=0)
# Use a pipeline to batch commands for better performance.
pipe = r.pipeline()
for i in range(1, TOTAL_KEYS + 1):
key: str = f"key{i}"
value: str = generate_random_value()
pipe.set(key, value)
# Execute the batch when reaching the batch_size.
if i % BATCH_SIZE == 0:
pipe.execute()
# Execute any remaining commands.
pipe.execute()
if __name__ == "__main__":
main()