From e79fa119bc32a3553d7fdf10b660762e3a364132 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Mus=C3=ADlek?= Date: Tue, 7 Nov 2023 10:34:25 +0100 Subject: [PATCH 1/2] Add cluster option to result backend --- .github/workflows/test.yml | 18 +++++ taskiq_redis/__init__.py | 6 +- taskiq_redis/redis_backend.py | 120 ++++++++++++++++++++++++++++++++ tests/conftest.py | 14 ++++ tests/test_result_backend.py | 127 +++++++++++++++++++++++++++++++++- 5 files changed, 283 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c7ebcf1..5108ca5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,12 +42,30 @@ jobs: --health-retries=30 ports: - 6379:6379 + redis-cluster: + image: bitnami/redis-cluster:6.2.5 + env: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "localhost" + options: >- + --health-cmd="redis-cli ping" + --health-interval=5s + --health-timeout=5s + --health-retries=30 + ports: + - 7000:6379 strategy: matrix: py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"] runs-on: "ubuntu-latest" steps: - uses: actions/checkout@v4 + - uses: shogo82148/actions-setup-redis@v1 + with: + redis-version: "6.x" + auto-start: false + - name: Set up single-node Redis cluster + run: redis-cli -h localhost -p 7000 --cluster-yes CLUSTER ADDSLOTS {0..16383} - name: Set up Python uses: actions/setup-python@v2 with: diff --git a/taskiq_redis/__init__.py b/taskiq_redis/__init__.py index cf6a432..36cec84 100644 --- a/taskiq_redis/__init__.py +++ b/taskiq_redis/__init__.py @@ -1,9 +1,13 @@ """Package for redis integration.""" -from taskiq_redis.redis_backend import RedisAsyncResultBackend +from taskiq_redis.redis_backend import ( + RedisAsyncClusterResultBackend, + RedisAsyncResultBackend, +) from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker from taskiq_redis.schedule_source import RedisScheduleSource __all__ = [ + "RedisAsyncClusterResultBackend", "RedisAsyncResultBackend", "ListQueueBroker", "PubSubBroker", diff --git a/taskiq_redis/redis_backend.py b/taskiq_redis/redis_backend.py index 35103a4..2e0cf3c 100644 --- a/taskiq_redis/redis_backend.py +++ b/taskiq_redis/redis_backend.py @@ -2,6 +2,7 @@ from typing import Dict, Optional, TypeVar, Union from redis.asyncio import ConnectionPool, Redis +from redis.asyncio.cluster import RedisCluster from taskiq import AsyncResultBackend from taskiq.abc.result_backend import TaskiqResult @@ -134,3 +135,122 @@ async def get_result( taskiq_result.log = None return taskiq_result + + +class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]): + """Async result backend based on redis cluster.""" + + def __init__( + self, + redis_url: str, + keep_results: bool = True, + result_ex_time: Optional[int] = None, + result_px_time: Optional[int] = None, + ) -> None: + """ + Constructs a new result backend. + + :param redis_url: url to redis cluster. + :param keep_results: flag to not remove results from Redis after reading. + :param result_ex_time: expire time in seconds for result. + :param result_px_time: expire time in milliseconds for result. + + :raises DuplicateExpireTimeSelectedError: if result_ex_time + and result_px_time are selected. + :raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time + and result_px_time are equal zero. + """ + self.redis: RedisCluster[bytes] = RedisCluster.from_url(redis_url) + self.keep_results = keep_results + self.result_ex_time = result_ex_time + self.result_px_time = result_px_time + + unavailable_conditions = any( + ( + self.result_ex_time is not None and self.result_ex_time <= 0, + self.result_px_time is not None and self.result_px_time <= 0, + ), + ) + if unavailable_conditions: + raise ExpireTimeMustBeMoreThanZeroError( + "You must select one expire time param and it must be more than zero.", + ) + + if self.result_ex_time and self.result_px_time: + raise DuplicateExpireTimeSelectedError( + "Choose either result_ex_time or result_px_time.", + ) + + async def shutdown(self) -> None: + """Closes redis connection.""" + await self.redis.close() + await super().shutdown() + + async def set_result( + self, + task_id: str, + result: TaskiqResult[_ReturnType], + ) -> None: + """ + Sets task result in redis. + + Dumps TaskiqResult instance into the bytes and writes + it to redis. + + :param task_id: ID of the task. + :param result: TaskiqResult instance. + """ + redis_set_params: Dict[str, Union[str, bytes, int]] = { + "name": task_id, + "value": pickle.dumps(result), + } + if self.result_ex_time: + redis_set_params["ex"] = self.result_ex_time + elif self.result_px_time: + redis_set_params["px"] = self.result_px_time + + await self.redis.set(**redis_set_params) # type: ignore + + async def is_result_ready(self, task_id: str) -> bool: + """ + Returns whether the result is ready. + + :param task_id: ID of the task. + + :returns: True if the result is ready else False. + """ + return bool(await self.redis.exists(task_id)) # type: ignore[attr-defined] + + async def get_result( + self, + task_id: str, + with_logs: bool = False, + ) -> TaskiqResult[_ReturnType]: + """ + Gets result from the task. + + :param task_id: task's id. + :param with_logs: if True it will download task's logs. + :raises ResultIsMissingError: if there is no result when trying to get it. + :return: task's return value. + """ + if self.keep_results: + result_value = await self.redis.get( # type: ignore[attr-defined] + name=task_id, + ) + else: + result_value = await self.redis.getdel( # type: ignore[attr-defined] + name=task_id, + ) + + if result_value is None: + raise ResultIsMissingError + + taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301 + result_value, + ) + + if not with_logs: + taskiq_result.log = None + + return taskiq_result diff --git a/tests/conftest.py b/tests/conftest.py index 1abfa07..3a125ce 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -26,3 +26,17 @@ def redis_url() -> str: :return: URL string. """ return os.environ.get("TEST_REDIS_URL", "redis://localhost") + + +@pytest.fixture +def redis_cluster_url() -> str: + """ + URL to connect to redis cluster. + + It tries to get it from environ, + and return default one if the variable is + not set. + + :return: URL string. + """ + return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7000") diff --git a/tests/test_result_backend.py b/tests/test_result_backend.py index 183696f..15ecdd0 100644 --- a/tests/test_result_backend.py +++ b/tests/test_result_backend.py @@ -3,7 +3,7 @@ import pytest from taskiq import TaskiqResult -from taskiq_redis import RedisAsyncResultBackend +from taskiq_redis import RedisAsyncClusterResultBackend, RedisAsyncResultBackend from taskiq_redis.exceptions import ResultIsMissingError @@ -130,3 +130,128 @@ async def test_keep_results_after_reading(redis_url: str) -> None: res2 = await result_backend.get_result(task_id=task_id) assert res1 == res2 await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_set_result_success_cluster(redis_cluster_url: str) -> None: + """ + Tests that results can be set without errors in cluster mode. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + fetched_result = await result_backend.get_result( + task_id=task_id, + with_logs=True, + ) + assert fetched_result.log == "My Log" + assert fetched_result.return_value == 11 + assert fetched_result.execution_time == 112.2 + assert fetched_result.is_err + await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_fetch_without_logs_cluster(redis_cluster_url: str) -> None: + """ + Check if fetching value without logs works fine. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + fetched_result = await result_backend.get_result( + task_id=task_id, + with_logs=False, + ) + assert fetched_result.log is None + assert fetched_result.return_value == 11 + assert fetched_result.execution_time == 112.2 + assert fetched_result.is_err + await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_remove_results_after_reading_cluster(redis_cluster_url: str) -> None: + """ + Check if removing results after reading works fine. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + keep_results=False, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + await result_backend.get_result(task_id=task_id) + with pytest.raises(ResultIsMissingError): + await result_backend.get_result(task_id=task_id) + + await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_keep_results_after_reading_cluster(redis_cluster_url: str) -> None: + """ + Check if keeping results after reading works fine. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + keep_results=True, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + res1 = await result_backend.get_result(task_id=task_id) + res2 = await result_backend.get_result(task_id=task_id) + assert res1 == res2 + await result_backend.shutdown() From ae672c563743e559e8ea46e20563ac52b00d006c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Mus=C3=ADlek?= Date: Tue, 7 Nov 2023 15:56:01 +0100 Subject: [PATCH 2/2] fixup! Add cluster option to result backend --- .github/workflows/test.yml | 32 ++----------------- docker-compose.yml | 60 +++++++++++++++++++++++++++++++++++ taskiq_redis/redis_backend.py | 2 +- tests/conftest.py | 4 +-- 4 files changed, 65 insertions(+), 33 deletions(-) create mode 100644 docker-compose.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5108ca5..4f7d58f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,42 +30,14 @@ jobs: - name: Run lint check run: poetry run pre-commit run -a ${{ matrix.cmd }} pytest: - services: - redis: - image: bitnami/redis:6.2.5 - env: - ALLOW_EMPTY_PASSWORD: "yes" - options: >- - --health-cmd="redis-cli ping" - --health-interval=5s - --health-timeout=5s - --health-retries=30 - ports: - - 6379:6379 - redis-cluster: - image: bitnami/redis-cluster:6.2.5 - env: - ALLOW_EMPTY_PASSWORD: "yes" - REDIS_NODES: "localhost" - options: >- - --health-cmd="redis-cli ping" - --health-interval=5s - --health-timeout=5s - --health-retries=30 - ports: - - 7000:6379 strategy: matrix: py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"] runs-on: "ubuntu-latest" steps: - uses: actions/checkout@v4 - - uses: shogo82148/actions-setup-redis@v1 - with: - redis-version: "6.x" - auto-start: false - - name: Set up single-node Redis cluster - run: redis-cli -h localhost -p 7000 --cluster-yes CLUSTER ADDSLOTS {0..16383} + - name: Set up Redis instance and Redis cluster + run: docker-compose up -d - name: Set up Python uses: actions/setup-python@v2 with: diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f7810f2 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,60 @@ +version: '3.2' + +services: + redis: + image: bitnami/redis:6.2.5 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 3 + start_period: 10s + ports: + - 7000:6379 + redis-node-0: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-1: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-2: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-3: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-4: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-5: + image: docker.io/bitnami/redis-cluster:7.2 + depends_on: + - redis-node-0 + - redis-node-1 + - redis-node-2 + - redis-node-3 + - redis-node-4 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + REDIS_CLUSTER_REPLICAS: 1 + REDIS_CLUSTER_CREATOR: "yes" + ports: + - 7001:6379 diff --git a/taskiq_redis/redis_backend.py b/taskiq_redis/redis_backend.py index 2e0cf3c..3a0810d 100644 --- a/taskiq_redis/redis_backend.py +++ b/taskiq_redis/redis_backend.py @@ -183,7 +183,7 @@ def __init__( async def shutdown(self) -> None: """Closes redis connection.""" - await self.redis.close() + await self.redis.aclose() # type: ignore[attr-defined] await super().shutdown() async def set_result( diff --git a/tests/conftest.py b/tests/conftest.py index 3a125ce..dcccb79 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,7 +25,7 @@ def redis_url() -> str: :return: URL string. """ - return os.environ.get("TEST_REDIS_URL", "redis://localhost") + return os.environ.get("TEST_REDIS_URL", "redis://localhost:7000") @pytest.fixture @@ -39,4 +39,4 @@ def redis_cluster_url() -> str: :return: URL string. """ - return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7000") + return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7001")