Skip to content

Commit f9da9b2

Browse files
committed
Adjust to removal of redis.connection.Token
1 parent cbafd12 commit f9da9b2

File tree

11 files changed

+38
-49
lines changed

11 files changed

+38
-49
lines changed

redis_tasks/conf.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,5 @@ def middleware_constructor(class_path):
110110
return [middleware_constructor(x) for x in settings.MIDDLEWARE]
111111

112112

113-
class RedisKey(redis.connection.Token):
114-
def __init__(self, name):
115-
self.name = name
116-
117-
def __str__(self):
118-
return settings.REDIS_PREFIX + ':' + self.name
119-
120-
@property
121-
def encoded_value(self):
122-
return str(self).encode('utf-8')
123-
124-
def __eq__(self, other):
125-
if not isinstance(other, self.__class__):
126-
return NotImplemented
127-
return self.name == other.name
113+
def construct_redis_key(name):
114+
return settings.REDIS_PREFIX + ':' + name

redis_tasks/contrib/graph.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import uuid
22

33
from redis_tasks import Queue
4-
from redis_tasks.conf import RedisKey, connection
4+
from redis_tasks.conf import connection, construct_redis_key
55
from redis_tasks.utils import atomic_pipeline, deserialize, serialize
66

77

@@ -19,7 +19,7 @@ def chain(members):
1919
class TaskGraph:
2020
def __init__(self, id=None):
2121
self.id = id or str(uuid.uuid4())
22-
self.key = RedisKey(f"ston.graph:{self.id}")
22+
self.key = construct_redis_key(f"ston.graph:{self.id}")
2323
self.nodes = []
2424
self.edges = set()
2525

redis_tasks/queue.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .conf import RedisKey, connection
1+
from .conf import connection, construct_redis_key
22
from .exceptions import TaskDoesNotExist
33
from .registries import queue_registry
44
from .task import Task
@@ -8,11 +8,11 @@
88
class Queue(object):
99
def __init__(self, name='default'):
1010
self.name = name
11-
self.key = RedisKey('queue:' + name)
11+
self.key = construct_redis_key('queue:' + name)
1212
# We use a separate key for the workers to wait on, as we need to do a
1313
# multi-key blocking rpop on it, and redis does not have a variant of
1414
# that operation that is not at risk of losing tasks.
15-
self.unblock_key = RedisKey('unblock_queue:' + name)
15+
self.unblock_key = construct_redis_key('unblock_queue:' + name)
1616

1717
@classmethod
1818
def all(cls):
@@ -103,7 +103,7 @@ def await_multi(cls, queues, timeout):
103103
"""Blocks until one of the passed queues contains a tasks.
104104
105105
Return the queue that contained a task or None if `timeout` was reached."""
106-
queue_map = {str(q.unblock_key): q for q in queues}
106+
queue_map = {q.unblock_key: q for q in queues}
107107
result = connection.brpop(queue_map.keys(), timeout)
108108
if result is None:
109109
return None

redis_tasks/registries.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import redis_tasks
22

3-
from .conf import RedisKey, connection, settings
3+
from .conf import connection, construct_redis_key, settings
44
from .exceptions import WorkerDoesNotExist
5-
from .utils import atomic_pipeline, decode_list
5+
from .utils import LazyObject, atomic_pipeline, decode_list
66

77

88
class ExpiringRegistry:
99
def __init__(self, name):
10-
self.key = RedisKey(name + '_tasks')
10+
self.key = construct_redis_key(name + '_tasks')
1111

1212
@atomic_pipeline
1313
def add(self, task, *, pipeline):
@@ -48,8 +48,8 @@ def transaction(pipeline):
4848
connection.transaction(transaction, self.key)
4949

5050

51-
finished_task_registry = ExpiringRegistry('finished')
52-
failed_task_registry = ExpiringRegistry('failed')
51+
finished_task_registry = LazyObject(lambda: ExpiringRegistry('finished'))
52+
failed_task_registry = LazyObject(lambda: ExpiringRegistry('failed'))
5353

5454

5555
def registry_maintenance():
@@ -60,7 +60,7 @@ def registry_maintenance():
6060

6161
class WorkerRegistry:
6262
def __init__(self):
63-
self.key = RedisKey('workers')
63+
self.key = construct_redis_key('workers')
6464

6565
@atomic_pipeline
6666
def add(self, worker, *, pipeline):
@@ -83,7 +83,7 @@ def get_worker_ids(self):
8383

8484
def get_running_tasks(self):
8585
"""Returns a worker_id -> task_id dict"""
86-
task_key_prefix = RedisKey('worker_task:')
86+
task_key_prefix = construct_redis_key('worker_task:')
8787
lua = connection.register_script("""
8888
local workers_key, task_key_prefix = unpack(KEYS)
8989
local worker_ids = redis.call("ZRANGE", workers_key, 0, -1)
@@ -114,12 +114,12 @@ def get_dead_ids(self):
114114
self.key, '-inf', oldest_valid))
115115

116116

117-
worker_registry = WorkerRegistry()
117+
worker_registry = LazyObject(lambda: WorkerRegistry())
118118

119119

120120
class QueueRegistry:
121121
def __init__(self):
122-
self.key = RedisKey('queues')
122+
self.key = construct_redis_key('queues')
123123

124124
def get_names(self):
125125
return decode_list(connection.smembers(self.key))
@@ -133,4 +133,4 @@ def remove(self, queue, *, pipeline):
133133
pipeline.srem(self.key, queue.name)
134134

135135

136-
queue_registry = QueueRegistry()
136+
queue_registry = LazyObject(lambda: QueueRegistry())

redis_tasks/scheduler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import croniter
1111

12-
from .conf import RedisKey, connection, settings
12+
from .conf import connection, construct_redis_key, settings
1313
from .exceptions import TaskDoesNotExist
1414
from .queue import Queue
1515
from .smear_dst import DstSmearingTz
@@ -80,7 +80,7 @@ def get_next(self, after):
8080
class SchedulerEntry:
8181
def __init__(self, id, config):
8282
self.id = id
83-
self.key = RedisKey(f"schedule_entry:{self.id}")
83+
self.key = construct_redis_key(f"schedule_entry:{self.id}")
8484
self.singleton = config.get('singleton', True)
8585
self.task_template = [config['task'],
8686
config.get('args', ()), config.get('kwargs', {})]
@@ -192,7 +192,7 @@ class Mutex(object):
192192
expire_script = None
193193

194194
def __init__(self, *, timeout):
195-
self.key = RedisKey('scheduler')
195+
self.key = construct_redis_key('scheduler')
196196
self.timeout = timeout
197197
self.token = None
198198

redis_tasks/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import redis_tasks
99

10-
from .conf import RedisKey, connection, settings, task_middleware
10+
from .conf import connection, construct_redis_key, settings, task_middleware
1111
from .exceptions import (
1212
InvalidOperation, TaskAborted, TaskDoesNotExist, WorkerShutdown)
1313
from .registries import failed_task_registry, finished_task_registry
@@ -137,7 +137,7 @@ def key(self):
137137

138138
@classmethod
139139
def key_for(cls, task_id):
140-
return RedisKey('task:' + task_id)
140+
return construct_redis_key('task:' + task_id)
141141

142142
@atomic_pipeline
143143
def enqueue(self, queue, *, pipeline):

redis_tasks/worker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from operator import attrgetter
33

4-
from .conf import RedisKey, connection, settings
4+
from .conf import connection, construct_redis_key, settings
55
from .exceptions import WorkerDoesNotExist
66
from .queue import Queue
77
from .registries import worker_registry
@@ -31,8 +31,8 @@ def fetch(cls, id):
3131
def __init__(self, id=None, *, description=None, queues=None,
3232
fetch_id=None):
3333
self.id = id or fetch_id
34-
self.key = RedisKey('worker:' + self.id)
35-
self.task_key = RedisKey('worker_task:' + self.id)
34+
self.key = construct_redis_key('worker:' + self.id)
35+
self.task_key = construct_redis_key('worker_task:' + self.id)
3636

3737
if fetch_id:
3838
self.refresh()

redis_tasks/worker_process.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from contextlib import contextmanager
1212
from datetime import timedelta
1313

14-
from .conf import RedisKey, connection, settings, task_middleware
14+
from .conf import connection, construct_redis_key, settings, task_middleware
1515
from .exceptions import WorkerShutdown
1616
from .queue import Queue
1717
from .registries import registry_maintenance
@@ -238,7 +238,7 @@ def maybe_shutdown(self):
238238
class Maintenance:
239239
def __init__(self):
240240
self.last_run_at = None
241-
self.key = RedisKey('last_maintenance')
241+
self.key = construct_redis_key('last_maintenance')
242242

243243
def run_if_neccessary(self):
244244
if (self.last_run_at and

tests/conftest.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,16 @@ def clear_redis():
6969

7070

7171
def do_clear_redis():
72+
from redis_tasks import conf
7273
with conf.connection.pipeline() as pipeline:
73-
for key in conf.connection.scan_iter(conf.RedisKey('*')):
74+
for key in conf.connection.scan_iter(conf.construct_redis_key('*')):
7475
pipeline.delete(key)
7576
pipeline.execute()
7677

7778

7879
@pytest.fixture(scope="function")
7980
def connection():
81+
from redis_tasks import conf
8082
yield conf.connection
8183

8284

@@ -100,6 +102,7 @@ def __getattr__(self, name):
100102

101103
@pytest.fixture(scope="function")
102104
def assert_atomic(mocker):
105+
from redis_tasks import conf
103106
@contextmanager
104107
def cm(*, exceptions=[]):
105108
real_connection = conf.connection._wrapped

tests/test_conf.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import os
22
from types import SimpleNamespace
33

4+
import pytest
5+
46
from redis_tasks import conf, defaults
57

68

@@ -28,9 +30,6 @@ def test_mock_settings_after(settings):
2830
assert settings.DEFAULT_TASK_TIMEOUT != "foo"
2931

3032

31-
def test_RedisKey(settings):
32-
rk = conf.RedisKey("foo")
33+
def test_construct_redis_key(settings):
3334
settings.REDIS_PREFIX = "bar"
34-
assert str(rk) == "bar:foo"
35-
settings.REDIS_PREFIX = "zoo"
36-
assert str(rk) == "zoo:foo"
35+
assert conf.construct_redis_key("foo") == "bar:foo"

tests/test_registries.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pytest
22

33
from redis_tasks import registries
4-
from redis_tasks.conf import RedisKey
4+
from redis_tasks.conf import construct_redis_key
55
from redis_tasks.exceptions import WorkerDoesNotExist
66
from redis_tasks.task import TaskOutcome
77
from tests.utils import QueueFactory, Something, TaskFactory, WorkerFactory
@@ -14,7 +14,7 @@ def test_expiring_registry(connection, settings, mocker, assert_atomic):
1414
settings.EXPIRING_REGISTRIES_TTL = 10
1515
delete_tasks = mocker.patch('redis_tasks.task.Task.delete_many')
1616

17-
assert registry.key == RedisKey('testexpire_tasks')
17+
assert registry.key == construct_redis_key('testexpire_tasks')
1818

1919
timestamp = mocker.patch('redis_tasks.conf.RTRedis.time')
2020
timestamp.return_value = (1000, 0)

0 commit comments

Comments
 (0)