Skip to content

Commit 5096566

Browse files
committed
worker doesn't store its own connection
1 parent 06bf0da commit 5096566

File tree

6 files changed

+116
-175
lines changed

6 files changed

+116
-175
lines changed

scheduler/helpers/queues/queue_logic.py

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,11 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
4949
loop = asyncio.new_event_loop()
5050
coro_result = loop.run_until_complete(result)
5151
result = coro_result
52-
if job_model.success_callback:
53-
job_model.success_callback(job_model, connection, result)
52+
job_model.call_success_callback(job_model, connection, result)
5453
return result
5554
except Exception as e:
5655
logger.error(f"Job {job_model.name} failed with exception: {e}", exc_info=True)
57-
if job_model.failure_callback:
58-
job_model.failure_callback(job_model, connection, *sys.exc_info())
56+
job_model.call_failure_callback(job_model, connection, *sys.exc_info())
5957
raise
6058
finally:
6159
assert job_model is _job_stack.pop()
@@ -121,43 +119,27 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
121119
self.connection, before_score
122120
)
123121

124-
with self.connection.pipeline() as pipeline:
125-
for job_name, job_score in started_jobs:
126-
job = JobModel.get(job_name, connection=self.connection)
127-
if job is None or job.failure_callback is None or job_score + job.timeout > before_score:
128-
continue
122+
for job_name, job_score in started_jobs:
123+
job = JobModel.get(job_name, connection=self.connection)
124+
if job is None or not job.has_failure_callback or job_score + job.timeout > before_score:
125+
continue
129126

130-
logger.debug(f"Running failure callbacks for {job.name}")
131-
try:
132-
job.failure_callback(job, self.connection, traceback.extract_stack())
133-
except Exception: # noqa
134-
logger.exception(f"Job {self.name}: error while executing failure callback")
135-
raise
127+
logger.debug(f"Running failure callbacks for {job.name}")
128+
try:
129+
job.call_failure_callback(job, self.connection, traceback.extract_stack())
130+
except Exception: # noqa
131+
logger.exception(f"Job {self.name}: error while executing failure callback")
132+
raise
136133

137-
else:
138-
logger.warning(
139-
f"Queue cleanup: Moving job to {self.failed_job_registry.key} (due to AbandonedJobError)"
140-
)
141-
exc_string = (
142-
f"Moved to {self.failed_job_registry.key}, due to AbandonedJobError, at {datetime.now()}"
143-
)
144-
job.status = JobStatus.FAILED
145-
score = current_timestamp() + SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL
146-
Result.create(
147-
connection=pipeline,
148-
job_name=job.name,
149-
worker_name=job.worker_name,
150-
_type=ResultType.FAILED,
151-
ttl=SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL,
152-
exc_string=exc_string,
153-
)
154-
self.failed_job_registry.add(pipeline, job.name, score)
155-
job.expire(connection=pipeline, ttl=SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL)
156-
job.save(connection=pipeline)
134+
else:
135+
logger.warning(
136+
f"Queue cleanup: Moving job to {self.failed_job_registry.key} (due to AbandonedJobError)"
137+
)
138+
exc_string = f"Moved to {self.failed_job_registry.key}, due to AbandonedJobError, at {datetime.now()}"
139+
self.job_handle_failure(JobStatus.FAILED, job, exc_string)
157140

158141
for registry in self.REGISTRIES.values():
159142
getattr(self, registry).cleanup(connection=self.connection, timestamp=before_score)
160-
pipeline.execute()
161143

162144
def first_queued_job_name(self) -> Optional[str]:
163145
return self.queued_job_registry.get_first()
@@ -258,37 +240,35 @@ def create_and_enqueue_job(
258240
raise TypeError(f"Invalid type for when=`{when}`")
259241
return job_model
260242

261-
def job_handle_success(
262-
self, job: JobModel, result: Any, job_info_ttl: int, result_ttl: int, connection: ConnectionType
263-
) -> None:
243+
def job_handle_success(self, job: JobModel, result: Any, job_info_ttl: int, result_ttl: int) -> None:
264244
"""Saves and cleanup job after successful execution"""
265245
job.after_execution(
266246
job_info_ttl,
267247
JobStatus.FINISHED,
268248
prev_registry=self.active_job_registry,
269249
new_registry=self.finished_job_registry,
270-
connection=connection,
250+
connection=self.connection,
271251
)
272252
Result.create(
273-
connection,
253+
self.connection,
274254
job_name=job.name,
275255
worker_name=job.worker_name,
276256
_type=ResultType.SUCCESSFUL,
277257
return_value=result,
278258
ttl=result_ttl,
279259
)
280260

281-
def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str, connection: ConnectionType) -> None:
261+
def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str) -> None:
282262
# Does not set job status since the job might be stopped
283263
job.after_execution(
284264
SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL,
285265
status,
286266
prev_registry=self.active_job_registry,
287267
new_registry=self.failed_job_registry,
288-
connection=connection,
268+
connection=self.connection,
289269
)
290270
Result.create(
291-
connection,
271+
self.connection,
292272
job.name,
293273
job.worker_name,
294274
ResultType.FAILED,
@@ -301,19 +281,11 @@ def run_sync(self, job: JobModel) -> JobModel:
301281
job.prepare_for_execution("sync", self.active_job_registry, self.connection)
302282
try:
303283
result = perform_job(job, self.connection)
304-
305-
with self.connection.pipeline() as pipeline:
306-
self.job_handle_success(
307-
job, result=result, job_info_ttl=job.job_info_ttl, result_ttl=job.success_ttl, connection=pipeline
308-
)
309-
310-
pipeline.execute()
284+
self.job_handle_success(job, result=result, job_info_ttl=job.job_info_ttl, result_ttl=job.success_ttl)
311285
except Exception as e: # noqa
312286
logger.warning(f"Job {job.name} failed with exception: {e}")
313-
with self.connection.pipeline() as pipeline:
314-
exc_string = "".join(traceback.format_exception(*sys.exc_info()))
315-
self.job_handle_failure(JobStatus.FAILED, job, exc_string, pipeline)
316-
pipeline.execute()
287+
exc_string = "".join(traceback.format_exception(*sys.exc_info()))
288+
self.job_handle_failure(JobStatus.FAILED, job, exc_string)
317289
return job
318290

319291
@classmethod

scheduler/redis_models/job.py

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@ class TimeoutFormatError(Exception):
2121

2222

2323
class JobStatus(str, Enum):
24-
"""The Status of Job within its lifecycle at any given time."""
24+
"""The Status of Job within its lifecycle at any given time.
2525
26-
QUEUED = "queued"
27-
FINISHED = "finished"
28-
FAILED = "failed"
29-
STARTED = "started"
30-
SCHEDULED = "scheduled"
31-
STOPPED = "stopped"
32-
CANCELED = "canceled"
26+
scheduled -> queued -> (canceled | started -> (finished | failed | stopped))
27+
"""
28+
29+
SCHEDULED = "scheduled" # Job is scheduled to be queued at a later time
30+
QUEUED = "queued" # Job is waiting to be processed
31+
CANCELED = "canceled" # Job was canceled before being processed
32+
STARTED = "started" # Job is currently being processed
33+
FINISHED = "finished" # Job has been processed successfully
34+
FAILED = "failed" # Job has been processed but failed
35+
STOPPED = "stopped" # Job was stopped while being processed
3336

3437

3538
@dataclasses.dataclass(slots=True, kw_only=True)
@@ -49,9 +52,6 @@ class JobModel(HashModel):
4952
timeout: int = SCHEDULER_CONFIG.DEFAULT_JOB_TIMEOUT
5053
success_ttl: int = SCHEDULER_CONFIG.DEFAULT_SUCCESS_TTL
5154
job_info_ttl: int = SCHEDULER_CONFIG.DEFAULT_JOB_TTL
52-
status: JobStatus
53-
created_at: datetime
54-
meta: Dict[str, str]
5555
at_front: bool = False
5656
last_heartbeat: Optional[datetime] = None
5757
worker_name: Optional[str] = None
@@ -66,6 +66,9 @@ class JobModel(HashModel):
6666
stopped_callback_timeout: int = SCHEDULER_CONFIG.CALLBACK_TIMEOUT
6767
task_type: Optional[str] = None
6868
scheduled_task_id: Optional[int] = None
69+
status: JobStatus
70+
created_at: datetime
71+
meta: Dict[str, str]
6972

7073
def __hash__(self):
7174
return hash(self.name)
@@ -99,6 +102,9 @@ def func(self) -> Callable[[Any], Any]:
99102
def is_scheduled_task(self) -> bool:
100103
return self.scheduled_task_id is not None
101104

105+
def has_failure_callback(self) -> bool:
106+
return self.failure_callback_name is not None
107+
102108
def expire(self, ttl: int, connection: ConnectionType) -> None:
103109
"""Expire the Job Model if ttl >= 0"""
104110
if ttl == 0:
@@ -112,8 +118,8 @@ def persist(self, connection: ConnectionType) -> None:
112118
pipeline.execute()
113119

114120
def prepare_for_execution(self, worker_name: str, registry: JobNamesRegistry, connection: ConnectionType) -> None:
115-
"""Prepares the job for execution, setting the worker name,
116-
heartbeat information, status and other metadata before execution begins.
121+
"""Prepares the job for execution, setting the worker name, heartbeat information, status, and other metadata
122+
before execution begins.
117123
:param worker_name: The name of the worker
118124
:param registry: The registry to add the job to
119125
:param current_pid: The current process id
@@ -144,26 +150,26 @@ def after_execution(
144150
new_registry.add(connection, self.name, current_timestamp() + job_info_ttl)
145151
self.save(connection=connection)
146152

147-
@property
148-
def failure_callback(self) -> Optional[Callback]:
153+
def call_failure_callback(self, *args, **kwargs) -> Optional[Callback]:
149154
if self.failure_callback_name is None:
150155
return None
151-
logger.debug(f"Running failure callbacks for {self.name}")
152-
return Callback(self.failure_callback_name, self.failure_callback_timeout)
156+
logger.debug(f"Running failure callback for {self.name}")
157+
callback = Callback(self.failure_callback_name, self.failure_callback_timeout)
158+
return callback(*args, **kwargs)
153159

154-
@property
155-
def success_callback(self) -> Optional[Callable[..., Any]]:
160+
def call_success_callback(self, *args, **kwargs) -> Optional[Any]:
156161
if self.success_callback_name is None:
157162
return None
158-
logger.debug(f"Running success callbacks for {self.name}")
159-
return Callback(self.success_callback_name, self.success_callback_timeout)
163+
logger.debug(f"Running success callback for {self.name}")
164+
callback = Callback(self.success_callback_name, self.success_callback_timeout)
165+
return callback(*args, **kwargs)
160166

161-
@property
162-
def stopped_callback(self) -> Optional[Callable[..., Any]]:
167+
def call_stopped_callback(self, *args, **kwargs) -> Optional[Any]:
163168
if self.stopped_callback_name is None:
164169
return None
165170
logger.debug(f"Running stopped callbacks for {self.name}")
166-
return Callback(self.stopped_callback_name, self.stopped_callback_timeout)
171+
callback = Callback(self.stopped_callback_name, self.stopped_callback_timeout)
172+
return callback(*args, **kwargs)
167173

168174
def get_call_string(self):
169175
return _get_call_string(self.func_name, self.args, self.kwargs)

scheduler/redis_models/result.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ def create(
4343
) -> Self:
4444
if worker_name is None:
4545
logger.warning(f"Job {job_name} has no worker name, will save result with 'unknown_worker'")
46+
worker_name = "unknown_worker"
4647
result = cls(
4748
parent=job_name,
4849
ttl=ttl,
4950
type=_type,
5051
return_value=return_value,
5152
exc_string=exc_string,
52-
worker_name=worker_name or "unknown_worker",
53+
worker_name=worker_name,
5354
)
5455
result.save(connection)
5556
return result

scheduler/redis_models/worker.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class WorkerStatus(str, Enum):
1818
SUSPENDED = "suspended"
1919
BUSY = "busy"
2020
IDLE = "idle"
21+
STOPPED = "stopped"
2122

2223

2324
@dataclasses.dataclass(slots=True, kw_only=True)
@@ -96,17 +97,16 @@ def cleanup(cls, connection: ConnectionType, queue_name: Optional[str] = None):
9697
with connection.pipeline() as pipeline:
9798
for worker_key in worker_keys:
9899
pipeline.exists(worker_key)
99-
worker_exist = pipeline.execute()
100-
invalid_workers = list()
101-
for i, worker_name in enumerate(worker_names):
102-
if not worker_exist[i]:
103-
invalid_workers.append(worker_name)
100+
worker_exist: List[int] = pipeline.execute()
101+
invalid_workers: List[str] = [
102+
worker_name for i, worker_name in enumerate(worker_names) if not worker_exist[i]
103+
]
104104
if len(invalid_workers) == 0:
105105
return
106-
for invalid_subset in _split_list(invalid_workers, MAX_KEYS):
107-
pipeline.srem(cls._list_key, *invalid_subset)
106+
for invalid_workers_subset in _split_list(invalid_workers, MAX_KEYS):
107+
pipeline.srem(cls._list_key, *invalid_workers_subset)
108108
if queue_name:
109-
pipeline.srem(cls._children_key_template.format(queue_name), *invalid_subset)
109+
pipeline.srem(cls._children_key_template.format(queue_name), *invalid_workers_subset)
110110
pipeline.execute()
111111

112112

@@ -118,4 +118,4 @@ def _split_list(a_list: List[str], segment_size: int) -> Generator[list[str], An
118118
:returns: The list split into smaller lists
119119
"""
120120
for i in range(0, len(a_list), segment_size):
121-
yield a_list[i: i + segment_size]
121+
yield a_list[i : i + segment_size]

scheduler/tests/test_job_decorator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def setUp(self) -> None:
6060
get_queue("default").connection.flushall()
6161

6262
def test_all_job_methods_registered(self):
63-
self.assertEqual(6, len(JOB_METHODS_LIST))
63+
self.assertEqual(7, len(JOB_METHODS_LIST))
6464

6565
def test_job_decorator_no_params(self):
6666
test_job.delay()

0 commit comments

Comments
 (0)