Skip to content

Add cluster option to result backend #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,14 @@ jobs:
- name: Run lint check
run: poetry run pre-commit run -a ${{ matrix.cmd }}
pytest:
services:
redis:
image: bitnami/redis:6.2.5
env:
ALLOW_EMPTY_PASSWORD: "yes"
options: >-
--health-cmd="redis-cli ping"
--health-interval=5s
--health-timeout=5s
--health-retries=30
ports:
- 6379:6379
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v4
- name: Set up Redis instance and Redis cluster
run: docker-compose up -d
- name: Set up Python
uses: actions/setup-python@v2
with:
Expand Down
60 changes: 60 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: '3.2'

services:
redis:
image: bitnami/redis:6.2.5
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s
ports:
- 7000:6379
redis-node-0:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-1:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-2:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-3:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-4:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-5:
image: docker.io/bitnami/redis-cluster:7.2
depends_on:
- redis-node-0
- redis-node-1
- redis-node-2
- redis-node-3
- redis-node-4
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
REDIS_CLUSTER_REPLICAS: 1
REDIS_CLUSTER_CREATOR: "yes"
ports:
- 7001:6379
6 changes: 5 additions & 1 deletion taskiq_redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Package for redis integration."""
from taskiq_redis.redis_backend import RedisAsyncResultBackend
from taskiq_redis.redis_backend import (
RedisAsyncClusterResultBackend,
RedisAsyncResultBackend,
)
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
from taskiq_redis.schedule_source import RedisScheduleSource

__all__ = [
"RedisAsyncClusterResultBackend",
"RedisAsyncResultBackend",
"ListQueueBroker",
"PubSubBroker",
Expand Down
120 changes: 120 additions & 0 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, Optional, TypeVar, Union

from redis.asyncio import ConnectionPool, Redis
from redis.asyncio.cluster import RedisCluster
from taskiq import AsyncResultBackend
from taskiq.abc.result_backend import TaskiqResult

Expand Down Expand Up @@ -134,3 +135,122 @@ async def get_result(
taskiq_result.log = None

return taskiq_result


class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]):
"""Async result backend based on redis cluster."""

def __init__(
self,
redis_url: str,
keep_results: bool = True,
result_ex_time: Optional[int] = None,
result_px_time: Optional[int] = None,
) -> None:
"""
Constructs a new result backend.

:param redis_url: url to redis cluster.
:param keep_results: flag to not remove results from Redis after reading.
:param result_ex_time: expire time in seconds for result.
:param result_px_time: expire time in milliseconds for result.

:raises DuplicateExpireTimeSelectedError: if result_ex_time
and result_px_time are selected.
:raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
and result_px_time are equal zero.
"""
self.redis: RedisCluster[bytes] = RedisCluster.from_url(redis_url)
self.keep_results = keep_results
self.result_ex_time = result_ex_time
self.result_px_time = result_px_time

unavailable_conditions = any(
(
self.result_ex_time is not None and self.result_ex_time <= 0,
self.result_px_time is not None and self.result_px_time <= 0,
),
)
if unavailable_conditions:
raise ExpireTimeMustBeMoreThanZeroError(
"You must select one expire time param and it must be more than zero.",
)

if self.result_ex_time and self.result_px_time:
raise DuplicateExpireTimeSelectedError(
"Choose either result_ex_time or result_px_time.",
)

async def shutdown(self) -> None:
"""Closes redis connection."""
await self.redis.aclose() # type: ignore[attr-defined]
await super().shutdown()

async def set_result(
self,
task_id: str,
result: TaskiqResult[_ReturnType],
) -> None:
"""
Sets task result in redis.

Dumps TaskiqResult instance into the bytes and writes
it to redis.

:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
redis_set_params: Dict[str, Union[str, bytes, int]] = {
"name": task_id,
"value": pickle.dumps(result),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

await self.redis.set(**redis_set_params) # type: ignore

async def is_result_ready(self, task_id: str) -> bool:
"""
Returns whether the result is ready.

:param task_id: ID of the task.

:returns: True if the result is ready else False.
"""
return bool(await self.redis.exists(task_id)) # type: ignore[attr-defined]

async def get_result(
self,
task_id: str,
with_logs: bool = False,
) -> TaskiqResult[_ReturnType]:
"""
Gets result from the task.

:param task_id: task's id.
:param with_logs: if True it will download task's logs.
:raises ResultIsMissingError: if there is no result when trying to get it.
:return: task's return value.
"""
if self.keep_results:
result_value = await self.redis.get( # type: ignore[attr-defined]
name=task_id,
)
else:
result_value = await self.redis.getdel( # type: ignore[attr-defined]
name=task_id,
)

if result_value is None:
raise ResultIsMissingError

taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301
result_value,
)

if not with_logs:
taskiq_result.log = None

return taskiq_result
16 changes: 15 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,18 @@ def redis_url() -> str:

:return: URL string.
"""
return os.environ.get("TEST_REDIS_URL", "redis://localhost")
return os.environ.get("TEST_REDIS_URL", "redis://localhost:7000")


@pytest.fixture
def redis_cluster_url() -> str:
"""
URL to connect to redis cluster.

It tries to get it from environ,
and return default one if the variable is
not set.

:return: URL string.
"""
return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7001")
127 changes: 126 additions & 1 deletion tests/test_result_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest
from taskiq import TaskiqResult

from taskiq_redis import RedisAsyncResultBackend
from taskiq_redis import RedisAsyncClusterResultBackend, RedisAsyncResultBackend
from taskiq_redis.exceptions import ResultIsMissingError


Expand Down Expand Up @@ -130,3 +130,128 @@ async def test_keep_results_after_reading(redis_url: str) -> None:
res2 = await result_backend.get_result(task_id=task_id)
assert res1 == res2
await result_backend.shutdown()


@pytest.mark.anyio
async def test_set_result_success_cluster(redis_cluster_url: str) -> None:
"""
Tests that results can be set without errors in cluster mode.

:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

fetched_result = await result_backend.get_result(
task_id=task_id,
with_logs=True,
)
assert fetched_result.log == "My Log"
assert fetched_result.return_value == 11
assert fetched_result.execution_time == 112.2
assert fetched_result.is_err
await result_backend.shutdown()


@pytest.mark.anyio
async def test_fetch_without_logs_cluster(redis_cluster_url: str) -> None:
"""
Check if fetching value without logs works fine.

:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

fetched_result = await result_backend.get_result(
task_id=task_id,
with_logs=False,
)
assert fetched_result.log is None
assert fetched_result.return_value == 11
assert fetched_result.execution_time == 112.2
assert fetched_result.is_err
await result_backend.shutdown()


@pytest.mark.anyio
async def test_remove_results_after_reading_cluster(redis_cluster_url: str) -> None:
"""
Check if removing results after reading works fine.

:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
keep_results=False,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

await result_backend.get_result(task_id=task_id)
with pytest.raises(ResultIsMissingError):
await result_backend.get_result(task_id=task_id)

await result_backend.shutdown()


@pytest.mark.anyio
async def test_keep_results_after_reading_cluster(redis_cluster_url: str) -> None:
"""
Check if keeping results after reading works fine.

:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
keep_results=True,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

res1 = await result_backend.get_result(task_id=task_id)
res2 = await result_backend.get_result(task_id=task_id)
assert res1 == res2
await result_backend.shutdown()