Skip to content

Commit 19c07f1

Browse files
committed
fix:bug job with incorrect score to active registry
1 parent 1f4b726 commit 19c07f1

File tree

4 files changed

+41
-47
lines changed

4 files changed

+41
-47
lines changed

scheduler/helpers/queues/queue_logic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,10 +397,10 @@ def enqueue_job(
397397
) -> JobModel:
398398
"""Enqueues a job for delayed execution without checking dependencies.
399399
400-
If Queue is instantiated with is_async=False, job is executed immediately.
400+
If Queue is instantiated with is_async=False, the job is executed immediately.
401401
:param job_model: The job redis model
402402
:param pipeline: The Broker Pipeline
403-
:param at_front: Whether to enqueue the job at the front
403+
:param at_front: Should the job be enqueued at the front
404404
405405
:returns: The enqueued JobModel
406406
"""

scheduler/redis_models/job.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,16 @@ def prepare_for_execution(self, worker_name: str, registry: JobNamesRegistry, co
128128
self.last_heartbeat = utils.utcnow()
129129
self.started_at = self.last_heartbeat
130130
self.status = JobStatus.STARTED
131-
registry.add(connection, self.name, self.last_heartbeat.timestamp())
131+
registry.add(connection, self.name, self.last_heartbeat.timestamp() + self.job_info_ttl)
132132
self.save(connection=connection)
133133

134134
def after_execution(
135-
self,
136-
job_info_ttl: int,
137-
status: JobStatus,
138-
connection: ConnectionType,
139-
prev_registry: Optional[JobNamesRegistry] = None,
140-
new_registry: Optional[JobNamesRegistry] = None,
135+
self,
136+
job_info_ttl: int,
137+
status: JobStatus,
138+
connection: ConnectionType,
139+
prev_registry: Optional[JobNamesRegistry] = None,
140+
new_registry: Optional[JobNamesRegistry] = None,
141141
) -> None:
142142
"""After the job is executed, update the status, heartbeat, and other metadata."""
143143
self.status = status
@@ -190,26 +190,26 @@ def deserialize(cls, data: Dict[str, Any]) -> Self:
190190

191191
@classmethod
192192
def create(
193-
cls,
194-
connection: ConnectionType,
195-
func: FunctionReferenceType,
196-
queue_name: str,
197-
args: Union[List[Any], Optional[Tuple]] = None,
198-
kwargs: Optional[Dict[str, Any]] = None,
199-
result_ttl: Optional[int] = None,
200-
job_info_ttl: Optional[int] = None,
201-
status: Optional[JobStatus] = None,
202-
description: Optional[str] = None,
203-
timeout: Optional[int] = None,
204-
name: Optional[str] = None,
205-
task_type: Optional[str] = None,
206-
scheduled_task_id: Optional[int] = None,
207-
meta: Optional[Dict[str, Any]] = None,
208-
*,
209-
on_success: Optional[Callback] = None,
210-
on_failure: Optional[Callback] = None,
211-
on_stopped: Optional[Callback] = None,
212-
at_front: Optional[bool] = None,
193+
cls,
194+
connection: ConnectionType,
195+
func: FunctionReferenceType,
196+
queue_name: str,
197+
args: Union[List[Any], Optional[Tuple]] = None,
198+
kwargs: Optional[Dict[str, Any]] = None,
199+
result_ttl: Optional[int] = None,
200+
job_info_ttl: Optional[int] = None,
201+
status: Optional[JobStatus] = None,
202+
description: Optional[str] = None,
203+
timeout: Optional[int] = None,
204+
name: Optional[str] = None,
205+
task_type: Optional[str] = None,
206+
scheduled_task_id: Optional[int] = None,
207+
meta: Optional[Dict[str, Any]] = None,
208+
*,
209+
on_success: Optional[Callback] = None,
210+
on_failure: Optional[Callback] = None,
211+
on_stopped: Optional[Callback] = None,
212+
at_front: Optional[bool] = None,
213213
) -> Self:
214214
"""Creates a new job-model for the given function, arguments, and keyword arguments.
215215
:returns: A job-model instance.
@@ -283,7 +283,7 @@ def create(
283283

284284

285285
def _get_call_string(
286-
func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None
286+
func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None
287287
) -> Optional[str]:
288288
"""
289289
Returns a string representation of the call, formatted as a regular

scheduler/redis_models/worker.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,13 @@ def set_current_job_working_time(self, job_execution_time: float, connection: Co
8484
self.set_field("current_job_working_time", job_execution_time, connection=connection)
8585

8686
def heartbeat(self, connection: ConnectionType, timeout: Optional[int] = None) -> None:
87-
timeout = timeout or DEFAULT_WORKER_TTL + 60
88-
connection.expire(self._key, timeout)
89-
now = utcnow()
90-
self.set_field("last_heartbeat", now, connection=connection)
91-
logger.debug(f"Next heartbeat for worker {self._key} should arrive in {timeout} seconds.")
87+
with connection.pipeline() as pipeline:
88+
timeout = timeout or DEFAULT_WORKER_TTL + 60
89+
pipeline.expire(self._key, timeout)
90+
now = utcnow()
91+
self.set_field("last_heartbeat", now, connection=pipeline)
92+
pipeline.execute()
93+
logger.debug(f"Next heartbeat for worker {self._key} should arrive in {timeout} seconds.")
9294

9395
@classmethod
9496
def cleanup(cls, connection: ConnectionType, queue_name: Optional[str] = None):

scheduler/worker/worker.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -687,14 +687,8 @@ def execute_job(self, job: JobModel, queue: Queue) -> None:
687687

688688
def maintain_heartbeats(self, job: JobModel, queue: Queue) -> None:
689689
"""Updates worker and job's last heartbeat field."""
690-
with self.connection.pipeline() as pipeline:
691-
self._model.heartbeat(pipeline, self.job_monitoring_interval + 60)
692-
ttl = self.get_heartbeat_ttl(job)
693-
694-
queue.active_job_registry.add(pipeline, job.name, current_timestamp() + ttl, update_existing_only=False)
695-
results = pipeline.execute()
696-
if results[2] == 1:
697-
job.delete(queue.connection)
690+
self._model.heartbeat(self.connection, self.job_monitoring_interval + 60)
691+
ttl = self.get_heartbeat_ttl(job)
698692

699693
def execute_in_separate_process(self, job: JobModel, queue: Queue) -> None:
700694
"""This is the entry point of the newly spawned job execution process.
@@ -785,10 +779,8 @@ def perform_job(self, job: JobModel, queue: Queue) -> bool:
785779
logger.debug(f"[Worker {self.name}/{self._pid}]: Performing {job.name} code.")
786780

787781
try:
788-
with self.connection.pipeline() as pipeline:
789-
self.worker_before_execution(job, connection=pipeline)
790-
job.prepare_for_execution(self.name, queue.active_job_registry, connection=pipeline)
791-
pipeline.execute()
782+
self.worker_before_execution(job, connection=queue.connection)
783+
job.prepare_for_execution(self.name, queue.active_job_registry, connection=queue.connection)
792784
timeout = job.timeout or SCHEDULER_CONFIG.DEFAULT_JOB_TIMEOUT
793785
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(timeout, JobTimeoutException, job_name=job.name):
794786
logger.debug(f"[Worker {self.name}/{self._pid}]: Performing job `{job.name}`...")

0 commit comments

Comments
 (0)