diff --git a/poetry.lock b/poetry.lock index 1a9c76b..3ba2b77 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "annotated-types" @@ -436,38 +436,38 @@ files = [ [[package]] name = "mypy" -version = "1.6.0" +version = "1.6.1" description = "Optional static typing for Python" optional = false python-versions = ">=3.8" files = [ - {file = "mypy-1.6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:091f53ff88cb093dcc33c29eee522c087a438df65eb92acd371161c1f4380ff0"}, - {file = "mypy-1.6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:eb7ff4007865833c470a601498ba30462b7374342580e2346bf7884557e40531"}, - {file = "mypy-1.6.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49499cf1e464f533fc45be54d20a6351a312f96ae7892d8e9f1708140e27ce41"}, - {file = "mypy-1.6.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:4c192445899c69f07874dabda7e931b0cc811ea055bf82c1ababf358b9b2a72c"}, - {file = "mypy-1.6.0-cp310-cp310-win_amd64.whl", hash = "sha256:3df87094028e52766b0a59a3e46481bb98b27986ed6ded6a6cc35ecc75bb9182"}, - {file = "mypy-1.6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:3c8835a07b8442da900db47ccfda76c92c69c3a575872a5b764332c4bacb5a0a"}, - {file = "mypy-1.6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:24f3de8b9e7021cd794ad9dfbf2e9fe3f069ff5e28cb57af6f873ffec1cb0425"}, - {file = "mypy-1.6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:856bad61ebc7d21dbc019b719e98303dc6256cec6dcc9ebb0b214b81d6901bd8"}, - {file = "mypy-1.6.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:89513ddfda06b5c8ebd64f026d20a61ef264e89125dc82633f3c34eeb50e7d60"}, - {file = "mypy-1.6.0-cp311-cp311-win_amd64.whl", hash = "sha256:9f8464ed410ada641c29f5de3e6716cbdd4f460b31cf755b2af52f2d5ea79ead"}, - {file = "mypy-1.6.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:971104bcb180e4fed0d7bd85504c9036346ab44b7416c75dd93b5c8c6bb7e28f"}, - {file = "mypy-1.6.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ab98b8f6fdf669711f3abe83a745f67f50e3cbaea3998b90e8608d2b459fd566"}, - {file = "mypy-1.6.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1a69db3018b87b3e6e9dd28970f983ea6c933800c9edf8c503c3135b3274d5ad"}, - {file = "mypy-1.6.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:dccd850a2e3863891871c9e16c54c742dba5470f5120ffed8152956e9e0a5e13"}, - {file = "mypy-1.6.0-cp312-cp312-win_amd64.whl", hash = "sha256:f8598307150b5722854f035d2e70a1ad9cc3c72d392c34fffd8c66d888c90f17"}, - {file = "mypy-1.6.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:fea451a3125bf0bfe716e5d7ad4b92033c471e4b5b3e154c67525539d14dc15a"}, - {file = "mypy-1.6.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e28d7b221898c401494f3b77db3bac78a03ad0a0fff29a950317d87885c655d2"}, - {file = "mypy-1.6.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e4b7a99275a61aa22256bab5839c35fe8a6887781862471df82afb4b445daae6"}, - {file = "mypy-1.6.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:7469545380dddce5719e3656b80bdfbb217cfe8dbb1438532d6abc754b828fed"}, - {file = "mypy-1.6.0-cp38-cp38-win_amd64.whl", hash = "sha256:7807a2a61e636af9ca247ba8494031fb060a0a744b9fee7de3a54bed8a753323"}, - {file = "mypy-1.6.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d2dad072e01764823d4b2f06bc7365bb1d4b6c2f38c4d42fade3c8d45b0b4b67"}, - {file = "mypy-1.6.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b19006055dde8a5425baa5f3b57a19fa79df621606540493e5e893500148c72f"}, - {file = "mypy-1.6.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:31eba8a7a71f0071f55227a8057468b8d2eb5bf578c8502c7f01abaec8141b2f"}, - {file = "mypy-1.6.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8e0db37ac4ebb2fee7702767dfc1b773c7365731c22787cb99f507285014fcaf"}, - {file = "mypy-1.6.0-cp39-cp39-win_amd64.whl", hash = "sha256:c69051274762cccd13498b568ed2430f8d22baa4b179911ad0c1577d336ed849"}, - {file = "mypy-1.6.0-py3-none-any.whl", hash = "sha256:9e1589ca150a51d9d00bb839bfeca2f7a04f32cd62fad87a847bc0818e15d7dc"}, - {file = "mypy-1.6.0.tar.gz", hash = "sha256:4f3d27537abde1be6d5f2c96c29a454da333a2a271ae7d5bc7110e6d4b7beb3f"}, + {file = "mypy-1.6.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e5012e5cc2ac628177eaac0e83d622b2dd499e28253d4107a08ecc59ede3fc2c"}, + {file = "mypy-1.6.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d8fbb68711905f8912e5af474ca8b78d077447d8f3918997fecbf26943ff3cbb"}, + {file = "mypy-1.6.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21a1ad938fee7d2d96ca666c77b7c494c3c5bd88dff792220e1afbebb2925b5e"}, + {file = "mypy-1.6.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:b96ae2c1279d1065413965c607712006205a9ac541895004a1e0d4f281f2ff9f"}, + {file = "mypy-1.6.1-cp310-cp310-win_amd64.whl", hash = "sha256:40b1844d2e8b232ed92e50a4bd11c48d2daa351f9deee6c194b83bf03e418b0c"}, + {file = "mypy-1.6.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:81af8adaa5e3099469e7623436881eff6b3b06db5ef75e6f5b6d4871263547e5"}, + {file = "mypy-1.6.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8c223fa57cb154c7eab5156856c231c3f5eace1e0bed9b32a24696b7ba3c3245"}, + {file = "mypy-1.6.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8032e00ce71c3ceb93eeba63963b864bf635a18f6c0c12da6c13c450eedb183"}, + {file = "mypy-1.6.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4c46b51de523817a0045b150ed11b56f9fff55f12b9edd0f3ed35b15a2809de0"}, + {file = "mypy-1.6.1-cp311-cp311-win_amd64.whl", hash = "sha256:19f905bcfd9e167159b3d63ecd8cb5e696151c3e59a1742e79bc3bcb540c42c7"}, + {file = "mypy-1.6.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:82e469518d3e9a321912955cc702d418773a2fd1e91c651280a1bda10622f02f"}, + {file = "mypy-1.6.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:d4473c22cc296425bbbce7e9429588e76e05bc7342da359d6520b6427bf76660"}, + {file = "mypy-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:59a0d7d24dfb26729e0a068639a6ce3500e31d6655df8557156c51c1cb874ce7"}, + {file = "mypy-1.6.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:cfd13d47b29ed3bbaafaff7d8b21e90d827631afda134836962011acb5904b71"}, + {file = "mypy-1.6.1-cp312-cp312-win_amd64.whl", hash = "sha256:eb4f18589d196a4cbe5290b435d135dee96567e07c2b2d43b5c4621b6501531a"}, + {file = "mypy-1.6.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:41697773aa0bf53ff917aa077e2cde7aa50254f28750f9b88884acea38a16169"}, + {file = "mypy-1.6.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7274b0c57737bd3476d2229c6389b2ec9eefeb090bbaf77777e9d6b1b5a9d143"}, + {file = "mypy-1.6.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bbaf4662e498c8c2e352da5f5bca5ab29d378895fa2d980630656178bd607c46"}, + {file = "mypy-1.6.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:bb8ccb4724f7d8601938571bf3f24da0da791fe2db7be3d9e79849cb64e0ae85"}, + {file = "mypy-1.6.1-cp38-cp38-win_amd64.whl", hash = "sha256:68351911e85145f582b5aa6cd9ad666c8958bcae897a1bfda8f4940472463c45"}, + {file = "mypy-1.6.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:49ae115da099dcc0922a7a895c1eec82c1518109ea5c162ed50e3b3594c71208"}, + {file = "mypy-1.6.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8b27958f8c76bed8edaa63da0739d76e4e9ad4ed325c814f9b3851425582a3cd"}, + {file = "mypy-1.6.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:925cd6a3b7b55dfba252b7c4561892311c5358c6b5a601847015a1ad4eb7d332"}, + {file = "mypy-1.6.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8f57e6b6927a49550da3d122f0cb983d400f843a8a82e65b3b380d3d7259468f"}, + {file = "mypy-1.6.1-cp39-cp39-win_amd64.whl", hash = "sha256:a43ef1c8ddfdb9575691720b6352761f3f53d85f1b57d7745701041053deff30"}, + {file = "mypy-1.6.1-py3-none-any.whl", hash = "sha256:4cbe68ef919c28ea561165206a2dcb68591c50f3bcf777932323bc208d949cf1"}, + {file = "mypy-1.6.1.tar.gz", hash = "sha256:4d01c00d09a0be62a4ca3f933e315455bde83f37f892ba4b08ce92f3cf44bcc1"}, ] [package.dependencies] @@ -1006,13 +1006,13 @@ files = [ [[package]] name = "taskiq" -version = "0.9.3" +version = "0.10.1" description = "Distributed task queue with full async support" optional = false python-versions = ">=3.8.1,<4.0.0" files = [ - {file = "taskiq-0.9.3-py3-none-any.whl", hash = "sha256:6a895f97e092d95f2c944c4d71921db8b70f135c4816679cc88fe45f8e780bf1"}, - {file = "taskiq-0.9.3.tar.gz", hash = "sha256:25f0b6b7fe5bbb33ce65adf1aaff89690fe11a8eefd3b05515155313ffb33f39"}, + {file = "taskiq-0.10.1-py3-none-any.whl", hash = "sha256:1608ca31424f3050bc312d14a62c65c1582b74238311aee8e2478b0ac4a2099b"}, + {file = "taskiq-0.10.1.tar.gz", hash = "sha256:c1c6f9781adc661387f3a37e4faad8e87cc83658cd5fc1f4b0e3ed7c06e27c16"}, ] [package.dependencies] @@ -1022,11 +1022,14 @@ packaging = ">=19" pycron = ">=3.0.0,<4.0.0" pydantic = ">=1.0,<=3.0" pytz = "*" -taskiq_dependencies = ">=1,<2" +taskiq_dependencies = ">=1.3.1,<2" typing-extensions = ">=3.10.0.0" [package.extras] +cbor = ["cbor2 (>=5.4.6,<6.0.0)"] metrics = ["prometheus_client (>=0,<1)"] +msgpack = ["msgpack (>=1.0.7,<2.0.0)"] +orjson = ["orjson (>=3.9.9,<4.0.0)"] reload = ["gitignore-parser (>=0,<1)", "watchdog (>=2.1.9,<3.0.0)"] uv = ["uvloop (>=0.16.0,<1)"] zmq = ["pyzmq (>=23.2.0,<24.0.0)"] @@ -1134,4 +1137,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "9b948c6d221f781c13c695ab1c4f7b95a30b83964959d408436305ef499b7693" +content-hash = "7e219527ae841cdea7e23697cd563cf0fdc41b0d7294fccd285c8be8a6a6968d" diff --git a/pyproject.toml b/pyproject.toml index 4f2e2a2..18ef6e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ keywords = [ [tool.poetry.dependencies] python = "^3.8.1" -taskiq = "^0" +taskiq = ">=0.10.1,<1" redis = "^5" [tool.poetry.group.dev.dependencies] @@ -124,7 +124,7 @@ convention = "pep257" ignore-decorators = ["typing.overload"] [tool.ruff.pylint] -allow-magic-value-types = ["int", "str", "float", "tuple"] +allow-magic-value-types = ["int", "str", "float"] [tool.ruff.flake8-bugbear] extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"] diff --git a/taskiq_redis/__init__.py b/taskiq_redis/__init__.py index 09e9e2d..cf6a432 100644 --- a/taskiq_redis/__init__.py +++ b/taskiq_redis/__init__.py @@ -1,9 +1,11 @@ """Package for redis integration.""" from taskiq_redis.redis_backend import RedisAsyncResultBackend from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker +from taskiq_redis.schedule_source import RedisScheduleSource __all__ = [ "RedisAsyncResultBackend", "ListQueueBroker", "PubSubBroker", + "RedisScheduleSource", ] diff --git a/taskiq_redis/schedule_source.py b/taskiq_redis/schedule_source.py new file mode 100644 index 0000000..c2c98d1 --- /dev/null +++ b/taskiq_redis/schedule_source.py @@ -0,0 +1,97 @@ +import dataclasses +from typing import Any, List, Optional + +from redis.asyncio import ConnectionPool, Redis +from taskiq import ScheduleSource +from taskiq.abc.serializer import TaskiqSerializer +from taskiq.scheduler.scheduled_task import ScheduledTask + +from taskiq_redis.serializer import PickleSerializer + + +class RedisScheduleSource(ScheduleSource): + """ + Source of schedules for redis. + + This class allows you to store schedules in redis. + Also it supports dynamic schedules. + + :param url: url to redis. + :param prefix: prefix for redis schedule keys. + :param buffer_size: buffer size for redis scan. + This is how many keys will be fetched at once. + :param max_connection_pool_size: maximum number of connections in pool. + :param serializer: serializer for data. + :param connection_kwargs: additional arguments for aio-redis ConnectionPool. + """ + + def __init__( + self, + url: str, + prefix: str = "schedule", + buffer_size: int = 50, + max_connection_pool_size: Optional[int] = None, + serializer: Optional[TaskiqSerializer] = None, + **connection_kwargs: Any, + ) -> None: + self.prefix = prefix + self.connection_pool: ConnectionPool = ConnectionPool.from_url( + url=url, + max_connections=max_connection_pool_size, + **connection_kwargs, + ) + self.buffer_size = buffer_size + if serializer is None: + serializer = PickleSerializer() + self.serializer = serializer + + async def delete_schedule(self, schedule_id: str) -> None: + """Remove schedule by id.""" + async with Redis(connection_pool=self.connection_pool) as redis: + await redis.delete(f"{self.prefix}:{schedule_id}") + + async def add_schedule(self, schedule: ScheduledTask) -> None: + """ + Add schedule to redis. + + :param schedule: schedule to add. + :param schedule_id: schedule id. + """ + async with Redis(connection_pool=self.connection_pool) as redis: + await redis.set( + f"{self.prefix}:{schedule.schedule_id}", + self.serializer.dumpb(dataclasses.asdict(schedule)), + ) + + async def get_schedules(self) -> List[ScheduledTask]: + """ + Get all schedules from redis. + + This method is used by scheduler to get all schedules. + + :return: list of schedules. + """ + schedules = [] + async with Redis(connection_pool=self.connection_pool) as redis: + buffer = [] + async for key in redis.scan_iter(f"{self.prefix}:*"): + buffer.append(key) + if len(buffer) >= self.buffer_size: + schedules.extend(await redis.mget(buffer)) + buffer = [] + if buffer: + schedules.extend(await redis.mget(buffer)) + return [ + ScheduledTask(**self.serializer.loadb(schedule)) + for schedule in schedules + if schedule + ] + + async def post_send(self, task: ScheduledTask) -> None: + """Delete a task after it's completed.""" + if task.time is not None: + await self.delete_schedule(task.schedule_id) + + async def shutdown(self) -> None: + """Shut down the schedule source.""" + await self.connection_pool.disconnect() diff --git a/taskiq_redis/serializer.py b/taskiq_redis/serializer.py new file mode 100644 index 0000000..b5bec92 --- /dev/null +++ b/taskiq_redis/serializer.py @@ -0,0 +1,16 @@ +import pickle +from typing import Any + +from taskiq.abc.serializer import TaskiqSerializer + + +class PickleSerializer(TaskiqSerializer): + """Serializer that uses pickle.""" + + def dumpb(self, value: Any) -> bytes: + """Dumps value to bytes.""" + return pickle.dumps(value) + + def loadb(self, value: bytes) -> Any: + """Loads value from bytes.""" + return pickle.loads(value) # noqa: S301 diff --git a/tests/test_backend.py b/tests/test_backend.py index ce63adc..6946e49 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -86,6 +86,7 @@ async def test_success_backend_default_result( result = await backend.get_result(task_id=task_id) assert result == default_taskiq_result + await backend.shutdown() @pytest.mark.anyio @@ -117,6 +118,7 @@ async def test_success_backend_custom_result( assert result.is_err == custom_taskiq_result.is_err assert result.execution_time == custom_taskiq_result.execution_time assert result.log == custom_taskiq_result.log + await backend.shutdown() @pytest.mark.anyio @@ -183,6 +185,7 @@ async def test_success_backend_expire_ex_param( result = await backend.get_result(task_id=task_id) assert result == default_taskiq_result + await backend.shutdown() @pytest.mark.anyio @@ -213,6 +216,7 @@ async def test_unsuccess_backend_expire_ex_param( with pytest.raises(ResultIsMissingError): await backend.get_result(task_id=task_id) + await backend.shutdown() @pytest.mark.anyio @@ -243,6 +247,7 @@ async def test_success_backend_expire_px_param( result = await backend.get_result(task_id=task_id) assert result == default_taskiq_result + await backend.shutdown() @pytest.mark.anyio @@ -273,3 +278,4 @@ async def test_unsuccess_backend_expire_px_param( with pytest.raises(ResultIsMissingError): await backend.get_result(task_id=task_id) + await backend.shutdown() diff --git a/tests/test_broker.py b/tests/test_broker.py index adef1b7..f664a96 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -68,6 +68,7 @@ async def test_pub_sub_broker( message2 = worker2_task.result() assert message1 == valid_broker_message.message assert message1 == message2 + await broker.shutdown() @pytest.mark.anyio @@ -92,3 +93,6 @@ async def test_list_queue_broker( assert worker1_task.done() != worker2_task.done() message = worker1_task.result() if worker1_task.done() else worker2_task.result() assert message == valid_broker_message.message + worker1_task.cancel() + worker2_task.cancel() + await broker.shutdown() diff --git a/tests/test_result_backend.py b/tests/test_result_backend.py index 38ae8ef..183696f 100644 --- a/tests/test_result_backend.py +++ b/tests/test_result_backend.py @@ -37,6 +37,7 @@ async def test_set_result_success(redis_url: str) -> None: assert fetched_result.return_value == 11 assert fetched_result.execution_time == 112.2 assert fetched_result.is_err + await result_backend.shutdown() @pytest.mark.anyio @@ -69,6 +70,7 @@ async def test_fetch_without_logs(redis_url: str) -> None: assert fetched_result.return_value == 11 assert fetched_result.execution_time == 112.2 assert fetched_result.is_err + await result_backend.shutdown() @pytest.mark.anyio @@ -98,6 +100,8 @@ async def test_remove_results_after_reading(redis_url: str) -> None: with pytest.raises(ResultIsMissingError): await result_backend.get_result(task_id=task_id) + await result_backend.shutdown() + @pytest.mark.anyio async def test_keep_results_after_reading(redis_url: str) -> None: @@ -125,3 +129,4 @@ async def test_keep_results_after_reading(redis_url: str) -> None: res1 = await result_backend.get_result(task_id=task_id) res2 = await result_backend.get_result(task_id=task_id) assert res1 == res2 + await result_backend.shutdown() diff --git a/tests/test_schedule_source.py b/tests/test_schedule_source.py new file mode 100644 index 0000000..eeef3a0 --- /dev/null +++ b/tests/test_schedule_source.py @@ -0,0 +1,61 @@ +import uuid + +import pytest +from taskiq import ScheduledTask + +from taskiq_redis import RedisScheduleSource + + +@pytest.mark.anyio +async def test_set_schedule(redis_url: str) -> None: + prefix = uuid.uuid4().hex + source = RedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + "test_task", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + ) + await source.add_schedule(schedule) + schedules = await source.get_schedules() + assert schedules == [schedule] + await source.shutdown() + + +@pytest.mark.anyio +async def test_delete_schedule(redis_url: str) -> None: + prefix = uuid.uuid4().hex + source = RedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + "test_task", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + ) + await source.add_schedule(schedule) + schedules = await source.get_schedules() + assert schedules == [schedule] + await source.delete_schedule(schedule.schedule_id) + schedules = await source.get_schedules() + # Schedules are empty. + assert not schedules + await source.shutdown() + + +@pytest.mark.anyio +async def test_post_run_cron(redis_url: str) -> None: + prefix = uuid.uuid4().hex + source = RedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + "test_task", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + ) + await source.add_schedule(schedule) + schedules = await source.get_schedules() + assert schedules == [schedule] + await source.shutdown()