Skip to content

Bump redis from 2.10.6 to 3.4.1 #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions redis_tasks/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ def zadd(self, name, items, nx=False, xx=False, ch=False, incr=False):
return self.execute_command('ZADD', name, *pieces)


# Backwards compatibility with redis-py < 3.0.0
pipeline_base = getattr(redis.client, 'BasePipeline', redis.client.Pipeline)


class RTPipeline(pipeline_base, RTRedis):
class RTPipeline(redis.client.Pipeline, RTRedis):
pass


Expand All @@ -110,18 +106,5 @@ def middleware_constructor(class_path):
return [middleware_constructor(x) for x in settings.MIDDLEWARE]


class RedisKey(redis.connection.Token):
def __init__(self, name):
self.name = name

def __str__(self):
return settings.REDIS_PREFIX + ':' + self.name

@property
def encoded_value(self):
return str(self).encode('utf-8')

def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return self.name == other.name
def construct_redis_key(name):
return settings.REDIS_PREFIX + ':' + name
4 changes: 2 additions & 2 deletions redis_tasks/contrib/graph.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid

from redis_tasks import Queue
from redis_tasks.conf import RedisKey, connection
from redis_tasks.conf import connection, construct_redis_key
from redis_tasks.utils import atomic_pipeline, deserialize, serialize


Expand All @@ -19,7 +19,7 @@ def chain(members):
class TaskGraph:
def __init__(self, id=None):
self.id = id or str(uuid.uuid4())
self.key = RedisKey(f"ston.graph:{self.id}")
self.key = construct_redis_key(f"ston.graph:{self.id}")
self.nodes = []
self.edges = set()

Expand Down
8 changes: 4 additions & 4 deletions redis_tasks/queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .conf import RedisKey, connection
from .conf import connection, construct_redis_key
from .exceptions import TaskDoesNotExist
from .registries import queue_registry
from .task import Task
Expand All @@ -8,11 +8,11 @@
class Queue(object):
def __init__(self, name='default'):
self.name = name
self.key = RedisKey('queue:' + name)
self.key = construct_redis_key('queue:' + name)
# We use a separate key for the workers to wait on, as we need to do a
# multi-key blocking rpop on it, and redis does not have a variant of
# that operation that is not at risk of losing tasks.
self.unblock_key = RedisKey('unblock_queue:' + name)
self.unblock_key = construct_redis_key('unblock_queue:' + name)

@classmethod
def all(cls):
Expand Down Expand Up @@ -103,7 +103,7 @@ def await_multi(cls, queues, timeout):
"""Blocks until one of the passed queues contains a tasks.

Return the queue that contained a task or None if `timeout` was reached."""
queue_map = {str(q.unblock_key): q for q in queues}
queue_map = {q.unblock_key: q for q in queues}
result = connection.brpop(queue_map.keys(), timeout)
if result is None:
return None
Expand Down
20 changes: 10 additions & 10 deletions redis_tasks/registries.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import redis_tasks

from .conf import RedisKey, connection, settings
from .conf import connection, construct_redis_key, settings
from .exceptions import WorkerDoesNotExist
from .utils import atomic_pipeline, decode_list
from .utils import LazyObject, atomic_pipeline, decode_list


class ExpiringRegistry:
def __init__(self, name):
self.key = RedisKey(name + '_tasks')
self.key = construct_redis_key(name + '_tasks')

@atomic_pipeline
def add(self, task, *, pipeline):
Expand Down Expand Up @@ -48,8 +48,8 @@ def transaction(pipeline):
connection.transaction(transaction, self.key)


finished_task_registry = ExpiringRegistry('finished')
failed_task_registry = ExpiringRegistry('failed')
finished_task_registry = LazyObject(lambda: ExpiringRegistry('finished'))
failed_task_registry = LazyObject(lambda: ExpiringRegistry('failed'))


def registry_maintenance():
Expand All @@ -60,7 +60,7 @@ def registry_maintenance():

class WorkerRegistry:
def __init__(self):
self.key = RedisKey('workers')
self.key = construct_redis_key('workers')

@atomic_pipeline
def add(self, worker, *, pipeline):
Expand All @@ -83,7 +83,7 @@ def get_worker_ids(self):

def get_running_tasks(self):
"""Returns a worker_id -> task_id dict"""
task_key_prefix = RedisKey('worker_task:')
task_key_prefix = construct_redis_key('worker_task:')
lua = connection.register_script("""
local workers_key, task_key_prefix = unpack(KEYS)
local worker_ids = redis.call("ZRANGE", workers_key, 0, -1)
Expand Down Expand Up @@ -114,12 +114,12 @@ def get_dead_ids(self):
self.key, '-inf', oldest_valid))


worker_registry = WorkerRegistry()
worker_registry = LazyObject(WorkerRegistry)


class QueueRegistry:
def __init__(self):
self.key = RedisKey('queues')
self.key = construct_redis_key('queues')

def get_names(self):
return decode_list(connection.smembers(self.key))
Expand All @@ -133,4 +133,4 @@ def remove(self, queue, *, pipeline):
pipeline.srem(self.key, queue.name)


queue_registry = QueueRegistry()
queue_registry = LazyObject(QueueRegistry)
6 changes: 3 additions & 3 deletions redis_tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import croniter

from .conf import RedisKey, connection, settings
from .conf import connection, construct_redis_key, settings
from .exceptions import TaskDoesNotExist
from .queue import Queue
from .smear_dst import DstSmearingTz
Expand Down Expand Up @@ -80,7 +80,7 @@ def get_next(self, after):
class SchedulerEntry:
def __init__(self, id, config):
self.id = id
self.key = RedisKey(f"schedule_entry:{self.id}")
self.key = construct_redis_key(f"schedule_entry:{self.id}")
self.singleton = config.get('singleton', True)
self.task_template = [config['task'],
config.get('args', ()), config.get('kwargs', {})]
Expand Down Expand Up @@ -192,7 +192,7 @@ class Mutex(object):
expire_script = None

def __init__(self, *, timeout):
self.key = RedisKey('scheduler')
self.key = construct_redis_key('scheduler')
self.timeout = timeout
self.token = None

Expand Down
4 changes: 2 additions & 2 deletions redis_tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import redis_tasks

from .conf import RedisKey, connection, settings, task_middleware
from .conf import connection, construct_redis_key, settings, task_middleware
from .exceptions import (
InvalidOperation, TaskAborted, TaskDoesNotExist, WorkerShutdown)
from .registries import failed_task_registry, finished_task_registry
Expand Down Expand Up @@ -137,7 +137,7 @@ def key(self):

@classmethod
def key_for(cls, task_id):
return RedisKey('task:' + task_id)
return construct_redis_key('task:' + task_id)

@atomic_pipeline
def enqueue(self, queue, *, pipeline):
Expand Down
6 changes: 3 additions & 3 deletions redis_tasks/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from operator import attrgetter

from .conf import RedisKey, connection, settings
from .conf import connection, construct_redis_key, settings
from .exceptions import WorkerDoesNotExist
from .queue import Queue
from .registries import worker_registry
Expand Down Expand Up @@ -31,8 +31,8 @@ def fetch(cls, id):
def __init__(self, id=None, *, description=None, queues=None,
fetch_id=None):
self.id = id or fetch_id
self.key = RedisKey('worker:' + self.id)
self.task_key = RedisKey('worker_task:' + self.id)
self.key = construct_redis_key('worker:' + self.id)
self.task_key = construct_redis_key('worker_task:' + self.id)

if fetch_id:
self.refresh()
Expand Down
4 changes: 2 additions & 2 deletions redis_tasks/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from contextlib import contextmanager
from datetime import timedelta

from .conf import RedisKey, connection, settings, task_middleware
from .conf import connection, construct_redis_key, settings, task_middleware
from .exceptions import WorkerShutdown
from .queue import Queue
from .registries import registry_maintenance
Expand Down Expand Up @@ -238,7 +238,7 @@ def maybe_shutdown(self):
class Maintenance:
def __init__(self):
self.last_run_at = None
self.key = RedisKey('last_maintenance')
self.key = construct_redis_key('last_maintenance')

def run_if_neccessary(self):
if (self.last_run_at and
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pytest-mock==2.0.0 # via -r requirements.in
pytest==5.4.1 # via -r requirements.in, pytest-cov, pytest-mock
python-dateutil==2.7.3 # via croniter
pytz==2019.3 # via -r requirements.in
redis==2.10.6 # via -r requirements.in
redis==3.4.1 # via -r requirements.in
six==1.11.0 # via more-itertools, packaging, pip-tools, python-dateutil
testfixtures==6.14.0 # via flake8-isort
toml==0.10.0 # via isort
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_version():
zip_safe=False,
platforms='any',
python_requires='>=3.6',
install_requires=['redis >=2.10.0, <3.0.0',
install_requires=['redis >=3.2.0',
'click',
'croniter >=0.3.23',
'pytz'],
Expand Down
5 changes: 4 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ def clear_redis():


def do_clear_redis():
from redis_tasks import conf
with conf.connection.pipeline() as pipeline:
for key in conf.connection.scan_iter(conf.RedisKey('*')):
for key in conf.connection.scan_iter(conf.construct_redis_key('*')):
pipeline.delete(key)
pipeline.execute()


@pytest.fixture(scope="function")
def connection():
from redis_tasks import conf
yield conf.connection


Expand All @@ -100,6 +102,7 @@ def __getattr__(self, name):

@pytest.fixture(scope="function")
def assert_atomic(mocker):
from redis_tasks import conf
@contextmanager
def cm(*, exceptions=[]):
real_connection = conf.connection._wrapped
Expand Down
7 changes: 2 additions & 5 deletions tests/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ def test_mock_settings_after(settings):
assert settings.DEFAULT_TASK_TIMEOUT != "foo"


def test_RedisKey(settings):
rk = conf.RedisKey("foo")
def test_construct_redis_key(settings):
settings.REDIS_PREFIX = "bar"
assert str(rk) == "bar:foo"
settings.REDIS_PREFIX = "zoo"
assert str(rk) == "zoo:foo"
assert conf.construct_redis_key("foo") == "bar:foo"
4 changes: 2 additions & 2 deletions tests/test_registries.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

from redis_tasks import registries
from redis_tasks.conf import RedisKey
from redis_tasks.conf import construct_redis_key
from redis_tasks.exceptions import WorkerDoesNotExist
from redis_tasks.task import TaskOutcome
from tests.utils import QueueFactory, Something, TaskFactory, WorkerFactory
Expand All @@ -14,7 +14,7 @@ def test_expiring_registry(connection, settings, mocker, assert_atomic):
settings.EXPIRING_REGISTRIES_TTL = 10
delete_tasks = mocker.patch('redis_tasks.task.Task.delete_many')

assert registry.key == RedisKey('testexpire_tasks')
assert registry.key == construct_redis_key('testexpire_tasks')

timestamp = mocker.patch('redis_tasks.conf.RTRedis.time')
timestamp.return_value = (1000, 0)
Expand Down