From b78a3797dbefcf7c30bc7cf104e854135c67cefd Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 1 Apr 2025 23:00:12 +0200 Subject: [PATCH 1/2] Added new schedule source. --- README.md | 64 ++++++++ poetry.lock | 44 ++++- pyproject.toml | 1 + taskiq_redis/__init__.py | 2 + taskiq_redis/list_schedule_source.py | 229 +++++++++++++++++++++++++++ tests/test_array_schedule_source.py | 138 ++++++++++++++++ 6 files changed, 477 insertions(+), 1 deletion(-) create mode 100644 taskiq_redis/list_schedule_source.py create mode 100644 tests/test_array_schedule_source.py diff --git a/README.md b/README.md index 986b3bd..0e1e9ed 100644 --- a/README.md +++ b/README.md @@ -125,3 +125,67 @@ RedisAsyncResultBackend parameters: > result_px_time=1000000, > ) > ``` + + +## Schedule sources + + +You can use this package to add dynamic schedule sources. They are used to store +schedules for taskiq scheduler. + +The advantage of using schedule sources from this package over default `LabelBased` source is that you can +dynamically add schedules in it. + +We have two types of schedules: + +* `RedisScheduleSource` +* `ListRedisScheduleSource` + + +### RedisScheduleSource + +This source is super simple. It stores all schedules by key `{prefix}:{schedule_id}`. When scheduler requests +schedules, it retrieves all values from redis that start with a given `prefix`. + +This is very ineficent and should not be used for high-volume schedules. Because if you have `1000` schedules, this scheduler will make at least `20` requests to retrieve them (we use `scan` and `mget` to minimize number of calls). + +### ListRedisScheduleSource + +This source holds values in lists. + +* For cron tasks it uses key `{prefix}:cron`. +* For timed schedules it uses key `{prefix}:time:{time}` where `{time}` is actually time where schedules should run. + +The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excesive calls to redis. + + +### Migration from one source to another + +To migrate from `RedisScheduleSource` to `ListRedisScheduleSource` you can define the latter as this: + +```python +# broker.py +import asyncio +import datetime + +from taskiq import TaskiqScheduler + +from taskiq_redis import ListRedisScheduleSource, RedisStreamBroker +from taskiq_redis.schedule_source import RedisScheduleSource + +broker = RedisStreamBroker(url="redis://localhost:6379") + +old_source = RedisScheduleSource("redis://localhost/1", prefix="prefix1") +array_source = ListRedisScheduleSource( + "redis://localhost/1", + prefix="prefix2", + # To migrate schedules from an old source. +).with_migrate_from( + old_source, + # To delete schedules from an old source. + delete_schedules=True, +) +scheduler = TaskiqScheduler(broker, [array_source]) +``` + +During startup the scheduler will try to migrate schedules from an old source to a new one. Please be sure to specify different prefixe just to avoid any kind of collision between these two. diff --git a/poetry.lock b/poetry.lock index c11a936..1399470 100644 --- a/poetry.lock +++ b/poetry.lock @@ -298,6 +298,21 @@ docs = ["furo (>=2024.8.6)", "sphinx (>=8.1.3)", "sphinx-autodoc-typehints (>=3) testing = ["covdefaults (>=2.3)", "coverage (>=7.6.10)", "diff-cover (>=9.2.1)", "pytest (>=8.3.4)", "pytest-asyncio (>=0.25.2)", "pytest-cov (>=6)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.28.1)"] typing = ["typing-extensions (>=4.12.2)"] +[[package]] +name = "freezegun" +version = "1.5.1" +description = "Let your Python tests travel through time" +optional = false +python-versions = ">=3.7" +groups = ["dev"] +files = [ + {file = "freezegun-1.5.1-py3-none-any.whl", hash = "sha256:bf111d7138a8abe55ab48a71755673dbaa4ab87f4cff5634a4442dfec34c15f1"}, + {file = "freezegun-1.5.1.tar.gz", hash = "sha256:b29dedfcda6d5e8e083ce71b2b542753ad48cfec44037b3fc79702e2980a89e9"}, +] + +[package.dependencies] +python-dateutil = ">=2.7" + [[package]] name = "identify" version = "2.6.9" @@ -783,6 +798,21 @@ psutil = ["psutil (>=3.0)"] setproctitle = ["setproctitle"] testing = ["filelock"] +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +description = "Extensions to the standard Python datetime module" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["dev"] +files = [ + {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, + {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, +] + +[package.dependencies] +six = ">=1.5" + [[package]] name = "pytz" version = "2025.1" @@ -905,6 +935,18 @@ files = [ {file = "ruff-0.9.10.tar.gz", hash = "sha256:9bacb735d7bada9cfb0f2c227d3658fc443d90a727b47f206fb33f52f3c0eac7"}, ] +[[package]] +name = "six" +version = "1.17.0" +description = "Python 2 and 3 compatibility utilities" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["dev"] +files = [ + {file = "six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274"}, + {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, +] + [[package]] name = "sniffio" version = "1.3.1" @@ -1075,4 +1117,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "2ddef8d5706f34450be1a63117ef3dc7ee8fda5b6bbf5904873ac7acc20572c9" +content-hash = "9adc5cc97a24b785948ac6f06f9a26f8ae87a3c0032cb39fd95bd9dee0b1228b" diff --git a/pyproject.toml b/pyproject.toml index eaad441..fb7b019 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ fakeredis = "^2" pre-commit = "^4" pytest-xdist = { version = "^3", extras = ["psutil"] } ruff = "^0" +freezegun = "^1.5.1" [tool.mypy] strict = true diff --git a/taskiq_redis/__init__.py b/taskiq_redis/__init__.py index 0efd023..38377c9 100644 --- a/taskiq_redis/__init__.py +++ b/taskiq_redis/__init__.py @@ -1,5 +1,6 @@ """Package for redis integration.""" +from taskiq_redis.list_schedule_source import ListRedisScheduleSource from taskiq_redis.redis_backend import ( RedisAsyncClusterResultBackend, RedisAsyncResultBackend, @@ -25,6 +26,7 @@ "ListQueueBroker", "ListQueueClusterBroker", "ListQueueSentinelBroker", + "ListRedisScheduleSource", "PubSubBroker", "PubSubSentinelBroker", "RedisAsyncClusterResultBackend", diff --git a/taskiq_redis/list_schedule_source.py b/taskiq_redis/list_schedule_source.py new file mode 100644 index 0000000..394326a --- /dev/null +++ b/taskiq_redis/list_schedule_source.py @@ -0,0 +1,229 @@ +import datetime +from logging import getLogger +from typing import Any, List, Optional + +from redis.asyncio import BlockingConnectionPool, Redis +from taskiq import ScheduledTask, ScheduleSource +from taskiq.abc.serializer import TaskiqSerializer +from taskiq.compat import model_dump, model_validate +from taskiq.serializers import PickleSerializer +from typing_extensions import Self + +logger = getLogger("taskiq.redis_schedule_source") + + +class ListRedisScheduleSource(ScheduleSource): + """Schecule source based on arrays.""" + + def __init__( + self, + url: str, + prefix: str = "schedule", + max_connection_pool_size: Optional[int] = None, + serializer: Optional[TaskiqSerializer] = None, + bufffer_size: int = 50, + skip_past_schedules: bool = False, + **connection_kwargs: Any, + ) -> None: + super().__init__() + self._prefix = prefix + self._buffer_size = bufffer_size + self._connection_pool = BlockingConnectionPool.from_url( + url=url, + max_connections=max_connection_pool_size, + **connection_kwargs, + ) + if serializer is None: + serializer = PickleSerializer() + self._serializer = serializer + self._is_first_run = True + self._previous_schedule_source: Optional[ScheduleSource] = None + self._delete_schedules_after_migration: bool = True + self._skip_past_schedules = skip_past_schedules + + async def startup(self) -> None: + """ + Startup the schedule source. + + By default this function does nothing. + But if the previous schedule source is set, + it will try to migrate schedules from it. + """ + if self._previous_schedule_source is not None: + logger.info("Migrating schedules from previous source") + await self._previous_schedule_source.startup() + schedules = await self._previous_schedule_source.get_schedules() + logger.info(f"Found {len(schedules)}") + for schedule in schedules: + await self.add_schedule(schedule) + if self._delete_schedules_after_migration: + await self._previous_schedule_source.delete_schedule( + schedule.schedule_id, + ) + await self._previous_schedule_source.shutdown() + logger.info("Migration complete") + + def _get_time_key(self, time: datetime.datetime) -> str: + """Get the key for a time-based schedule.""" + if time.tzinfo is None: + time = time.replace(tzinfo=datetime.timezone.utc) + iso_time = time.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M") + return f"{self._prefix}:time:{iso_time}" + + def _get_cron_key(self) -> str: + """Get the key for a cron-based schedule.""" + return f"{self._prefix}:cron" + + def _get_data_key(self, schedule_id: str) -> str: + """Get the key for a schedule data.""" + return f"{self._prefix}:data:{schedule_id}" + + def _parse_time_key(self, key: str) -> Optional[datetime.datetime]: + """Get time value from the timed-key.""" + try: + dt_str = key.split(":", 2)[2] + return datetime.datetime.strptime(dt_str, "%Y-%m-%dT%H:%M").replace( + tzinfo=datetime.timezone.utc, + ) + except ValueError: + logger.debug("Failed to parse time key %s", key) + return None + + async def _get_previous_time_schedules(self) -> list[bytes]: + """ + Function that gets all timed schedules that are in the past. + + Since this source doesn't retrieve all the schedules at once, + we need to get all the schedules that are in the past and haven't + been sent yet. + + We do this by getting all the time keys and checking if the time + is less than the current time. + + This function is called only during the first run to minimize + the number of requests to the Redis server. + """ + logger.info("Getting previous time schedules") + minute_before = datetime.datetime.now( + datetime.timezone.utc, + ).replace(second=0, microsecond=0) - datetime.timedelta( + minutes=1, + ) + schedules = [] + async with Redis(connection_pool=self._connection_pool) as redis: + time_keys: list[str] = [] + # We need to get all the time keys and check if the time is less than + # the current time. + async for key in redis.scan_iter(f"{self._prefix}:time:*"): + key_time = self._parse_time_key(key.decode()) + if key_time and key_time <= minute_before: + time_keys.append(key.decode()) + for key in time_keys: + schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore + + return schedules + + async def delete_schedule(self, schedule_id: str) -> None: + """Delete a schedule from the source.""" + async with Redis(connection_pool=self._connection_pool) as redis: + schedule = await redis.getdel(self._get_data_key(schedule_id)) + if schedule is not None: + logger.debug("Deleting schedule %s", schedule_id) + schedule = model_validate( + ScheduledTask, + self._serializer.loadb(schedule), + ) + # We need to remove the schedule from the cron or time list. + if schedule.cron is not None: + await redis.lrem(self._get_cron_key(), 0, schedule_id) # type: ignore + elif schedule.time is not None: + time_key = self._get_time_key(schedule.time) + await redis.lrem(time_key, 0, schedule_id) # type: ignore + + async def add_schedule(self, schedule: "ScheduledTask") -> None: + """Add a schedule to the source.""" + async with Redis(connection_pool=self._connection_pool) as redis: + # At first we set data key which contains the schedule data. + await redis.set( + f"{self._prefix}:data:{schedule.schedule_id}", + self._serializer.dumpb(model_dump(schedule)), + ) + # Then we add the schedule to the cron or time list. + # This is an optimization, so we can get all the schedules + # for the current time much faster. + if schedule.cron is not None: + await redis.rpush(self._get_cron_key(), schedule.schedule_id) # type: ignore + elif schedule.time is not None: + await redis.rpush( # type: ignore + self._get_time_key(schedule.time), + schedule.schedule_id, + ) + + async def post_send(self, task: ScheduledTask) -> None: + """Delete a task after it's completed.""" + if task.time is not None: + await self.delete_schedule(task.schedule_id) + + async def get_schedules(self) -> List["ScheduledTask"]: + """ + Get all schedules. + + This function gets all the schedules from the schedule source. + What it does is get all the cron schedules and time schedules + for the current time and return them. + + If it's the first run, it also gets all the time schedules + that are in the past and haven't been sent yet. + """ + schedules = [] + current_time = datetime.datetime.now(datetime.timezone.utc) + timed: list[bytes] = [] + # Only during first run, we need to get previous time schedules + if self._is_first_run and not self._skip_past_schedules: + timed = await self._get_previous_time_schedules() + self._is_first_run = False + async with Redis(connection_pool=self._connection_pool) as redis: + buffer = [] + crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore + logger.debug("Got cron scheduleds: %s", crons) + if crons: + buffer.extend(crons) + timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore + logger.debug("Got timed scheduleds: %s", crons) + if timed: + buffer.extend(timed) + while buffer: + schedules.extend( + await redis.mget( + ( + self._get_data_key(x.decode()) + for x in buffer[: self._buffer_size] + ), + ), + ) + buffer = buffer[self._buffer_size :] + + return [ + model_validate(ScheduledTask, self._serializer.loadb(schedule)) + for schedule in schedules + if schedule + ] + + def with_migrate_from( + self, + source: ScheduleSource, + delete_schedules: bool = True, + ) -> Self: + """ + Enable migration from previous schedule source. + + If this function is called during declaration, + the source will try to migrate schedules from the previous source. + + :param source: previous schedule source + :param delete_schedules: delete schedules during migration process + from the previous source. + """ + self._previous_schedule_source = source + self._delete_schedules_after_migration = delete_schedules + return self diff --git a/tests/test_array_schedule_source.py b/tests/test_array_schedule_source.py new file mode 100644 index 0000000..d8e62ae --- /dev/null +++ b/tests/test_array_schedule_source.py @@ -0,0 +1,138 @@ +import datetime +import uuid + +import pytest +from freezegun import freeze_time +from taskiq import ScheduledTask + +from taskiq_redis.array_schedule_source import ArrayRedisScheduleSource +from taskiq_redis.schedule_source import RedisScheduleSource + + +@pytest.mark.anyio +@freeze_time("2025-01-01 00:00:00") +async def test_schedule_cron(redis_url: str) -> None: + """Test adding a cron schedule.""" + prefix = uuid.uuid4().hex + source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + task_name="test_task", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + ) + await source.add_schedule(schedule) + scehdules = await source.get_schedules() + assert scehdules == [schedule] + + +@pytest.mark.anyio +@freeze_time("2025-01-01 00:00:00") +async def test_schedule_from_past(redis_url: str) -> None: + """Test adding a cron schedule.""" + prefix = uuid.uuid4().hex + source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + task_name="test_task", + labels={}, + args=[], + kwargs={}, + time=datetime.datetime.now(datetime.timezone.utc) + - datetime.timedelta(minutes=4), + ) + await source.add_schedule(schedule) + # When running for the first time, the scheduler will get all the + # schedules that are in the past. + scehdules = await source.get_schedules() + assert scehdules == [schedule] + # After getting the schedules for the second time, + # all the schedules in the past are ignored. + scehdules = await source.get_schedules() + assert scehdules == [] + + +@pytest.mark.anyio +@freeze_time("2025-01-01 00:00:00") +async def test_schedule_removal(redis_url: str) -> None: + """Test adding a cron schedule.""" + prefix = uuid.uuid4().hex + source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + task_name="test_task", + labels={}, + args=[], + kwargs={}, + time=datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(minutes=4), + ) + await source.add_schedule(schedule) + # When running for the first time, the scheduler will get all the + # schedules that are in the past. + scehdules = await source.get_schedules() + assert scehdules == [] + # Assert that we will get the schedule after the time has passed. + with freeze_time("2025-01-01 00:04:00"): + scehdules = await source.get_schedules() + assert scehdules == [schedule] + + +@pytest.mark.anyio +@freeze_time("2025-01-01 00:00:00") +async def test_deletion(redis_url: str) -> None: + """Test adding a cron schedule.""" + prefix = uuid.uuid4().hex + source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + task_name="test_task", + labels={}, + args=[], + kwargs={}, + time=datetime.datetime.now(datetime.timezone.utc), + ) + await source.add_schedule(schedule) + # When running for the first time, the scheduler will get all the + # schedules that are in the past. + scehdules = await source.get_schedules() + assert scehdules == [schedule] + await source.delete_schedule(schedule.schedule_id) + scehdules = await source.get_schedules() + assert scehdules == [] + + +@pytest.mark.anyio +@freeze_time("2025-01-01 00:00:00") +async def test_migration(redis_url: str) -> None: + """Test adding a cron schedule.""" + new_prefix = uuid.uuid4().hex + old_prefix = uuid.uuid4().hex + old_source = RedisScheduleSource(redis_url, prefix=old_prefix) + + for i in range(30): + schedule = ScheduledTask( + task_name="test_task", + labels={}, + args=[], + kwargs={}, + time=datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(minutes=i), + ) + await old_source.add_schedule(schedule) + + old_schedules = await old_source.get_schedules() + + source = ArrayRedisScheduleSource( + redis_url, + prefix=new_prefix, + skip_past_schedules=True, + ).with_migrate_from( + old_source, + delete_schedules=True, + ) + + await source.startup() + assert await old_source.get_schedules() == [] + + for old_schedule in old_schedules: + with freeze_time(old_schedule.time): + assert await source.get_schedules() == [old_schedule] From c4f4508d5c04d3fe641edd8e2ab737b70f30f494 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 1 Apr 2025 23:05:12 +0200 Subject: [PATCH 2/2] Fixed pytest. --- ...hedule_source.py => test_list_schedule_source.py} | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) rename tests/{test_array_schedule_source.py => test_list_schedule_source.py} (91%) diff --git a/tests/test_array_schedule_source.py b/tests/test_list_schedule_source.py similarity index 91% rename from tests/test_array_schedule_source.py rename to tests/test_list_schedule_source.py index d8e62ae..bb5bdd5 100644 --- a/tests/test_array_schedule_source.py +++ b/tests/test_list_schedule_source.py @@ -5,7 +5,7 @@ from freezegun import freeze_time from taskiq import ScheduledTask -from taskiq_redis.array_schedule_source import ArrayRedisScheduleSource +from taskiq_redis.list_schedule_source import ListRedisScheduleSource from taskiq_redis.schedule_source import RedisScheduleSource @@ -14,7 +14,7 @@ async def test_schedule_cron(redis_url: str) -> None: """Test adding a cron schedule.""" prefix = uuid.uuid4().hex - source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + source = ListRedisScheduleSource(redis_url, prefix=prefix) schedule = ScheduledTask( task_name="test_task", labels={}, @@ -32,7 +32,7 @@ async def test_schedule_cron(redis_url: str) -> None: async def test_schedule_from_past(redis_url: str) -> None: """Test adding a cron schedule.""" prefix = uuid.uuid4().hex - source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + source = ListRedisScheduleSource(redis_url, prefix=prefix) schedule = ScheduledTask( task_name="test_task", labels={}, @@ -57,7 +57,7 @@ async def test_schedule_from_past(redis_url: str) -> None: async def test_schedule_removal(redis_url: str) -> None: """Test adding a cron schedule.""" prefix = uuid.uuid4().hex - source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + source = ListRedisScheduleSource(redis_url, prefix=prefix) schedule = ScheduledTask( task_name="test_task", labels={}, @@ -82,7 +82,7 @@ async def test_schedule_removal(redis_url: str) -> None: async def test_deletion(redis_url: str) -> None: """Test adding a cron schedule.""" prefix = uuid.uuid4().hex - source = ArrayRedisScheduleSource(redis_url, prefix=prefix) + source = ListRedisScheduleSource(redis_url, prefix=prefix) schedule = ScheduledTask( task_name="test_task", labels={}, @@ -121,7 +121,7 @@ async def test_migration(redis_url: str) -> None: old_schedules = await old_source.get_schedules() - source = ArrayRedisScheduleSource( + source = ListRedisScheduleSource( redis_url, prefix=new_prefix, skip_past_schedules=True,