Skip to content

Commit 24c2d2b

Browse files
committed
fix:timeout
1 parent 6359f3d commit 24c2d2b

File tree

2 files changed

+44
-49
lines changed

2 files changed

+44
-49
lines changed

scheduler/redis_models/job.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,16 @@ def prepare_for_execution(self, worker_name: str, registry: JobNamesRegistry, co
128128
self.last_heartbeat = utils.utcnow()
129129
self.started_at = self.last_heartbeat
130130
self.status = JobStatus.STARTED
131-
registry.add(connection, self.name, self.last_heartbeat.timestamp() + self.job_info_ttl)
131+
registry.add(connection, self.name, self.last_heartbeat.timestamp() + self.timeout)
132132
self.save(connection=connection)
133133

134134
def after_execution(
135-
self,
136-
job_info_ttl: int,
137-
status: JobStatus,
138-
connection: ConnectionType,
139-
prev_registry: Optional[JobNamesRegistry] = None,
140-
new_registry: Optional[JobNamesRegistry] = None,
135+
self,
136+
job_info_ttl: int,
137+
status: JobStatus,
138+
connection: ConnectionType,
139+
prev_registry: Optional[JobNamesRegistry] = None,
140+
new_registry: Optional[JobNamesRegistry] = None,
141141
) -> None:
142142
"""After the job is executed, update the status, heartbeat, and other metadata."""
143143
self.status = status
@@ -190,26 +190,26 @@ def deserialize(cls, data: Dict[str, Any]) -> Self:
190190

191191
@classmethod
192192
def create(
193-
cls,
194-
connection: ConnectionType,
195-
func: FunctionReferenceType,
196-
queue_name: str,
197-
args: Union[List[Any], Optional[Tuple]] = None,
198-
kwargs: Optional[Dict[str, Any]] = None,
199-
result_ttl: Optional[int] = None,
200-
job_info_ttl: Optional[int] = None,
201-
status: Optional[JobStatus] = None,
202-
description: Optional[str] = None,
203-
timeout: Optional[int] = None,
204-
name: Optional[str] = None,
205-
task_type: Optional[str] = None,
206-
scheduled_task_id: Optional[int] = None,
207-
meta: Optional[Dict[str, Any]] = None,
208-
*,
209-
on_success: Optional[Callback] = None,
210-
on_failure: Optional[Callback] = None,
211-
on_stopped: Optional[Callback] = None,
212-
at_front: Optional[bool] = None,
193+
cls,
194+
connection: ConnectionType,
195+
func: FunctionReferenceType,
196+
queue_name: str,
197+
args: Union[List[Any], Optional[Tuple]] = None,
198+
kwargs: Optional[Dict[str, Any]] = None,
199+
result_ttl: Optional[int] = None,
200+
job_info_ttl: Optional[int] = None,
201+
status: Optional[JobStatus] = None,
202+
description: Optional[str] = None,
203+
timeout: Optional[int] = None,
204+
name: Optional[str] = None,
205+
task_type: Optional[str] = None,
206+
scheduled_task_id: Optional[int] = None,
207+
meta: Optional[Dict[str, Any]] = None,
208+
*,
209+
on_success: Optional[Callback] = None,
210+
on_failure: Optional[Callback] = None,
211+
on_stopped: Optional[Callback] = None,
212+
at_front: Optional[bool] = None,
213213
) -> Self:
214214
"""Creates a new job-model for the given function, arguments, and keyword arguments.
215215
:returns: A job-model instance.
@@ -283,7 +283,7 @@ def create(
283283

284284

285285
def _get_call_string(
286-
func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None
286+
func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None
287287
) -> Optional[str]:
288288
"""
289289
Returns a string representation of the call, formatted as a regular

scheduler/worker/worker.py

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
from scheduler.helpers.queues import Queue, perform_job
3737
from scheduler.helpers.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException
38-
from scheduler.helpers.utils import utcnow, current_timestamp
38+
from scheduler.helpers.utils import utcnow
3939

4040
try:
4141
from setproctitle import setproctitle as setprocname
@@ -96,16 +96,16 @@ def from_model(cls, model: WorkerModel) -> Self:
9696
return res
9797

9898
def __init__(
99-
self,
100-
queues: Iterable[Union[str, Queue]],
101-
name: str,
102-
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
103-
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
104-
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
105-
fork_job_execution: bool = True,
106-
with_scheduler: bool = True,
107-
burst: bool = False,
108-
model: Optional[WorkerModel] = None,
99+
self,
100+
queues: Iterable[Union[str, Queue]],
101+
name: str,
102+
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
103+
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
104+
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
105+
fork_job_execution: bool = True,
106+
with_scheduler: bool = True,
107+
burst: bool = False,
108+
model: Optional[WorkerModel] = None,
109109
) -> None:
110110
self.fork_job_execution = fork_job_execution
111111
self.job_monitoring_interval: int = job_monitoring_interval
@@ -352,7 +352,7 @@ def run_maintenance_tasks(self) -> None:
352352
self._model.save(connection=self.connection)
353353

354354
def dequeue_job_and_maintain_ttl(
355-
self, timeout: Optional[int], max_idle_time: Optional[int] = None
355+
self, timeout: Optional[int], max_idle_time: Optional[int] = None
356356
) -> Tuple[Optional[JobModel], Optional[Queue]]:
357357
"""Dequeues a job while maintaining the TTL.
358358
:param timeout: The timeout for the dequeue operation.
@@ -523,7 +523,7 @@ def reorder_queues(self, reference_queue: Queue) -> None:
523523
return
524524
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
525525
pos = self._ordered_queues.index(reference_queue)
526-
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
526+
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
527527
return
528528
if self._dequeue_strategy == DequeueStrategy.RANDOM:
529529
shuffle(self._ordered_queues)
@@ -607,7 +607,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
607607
while True:
608608
try:
609609
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(
610-
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
610+
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
611611
):
612612
retpid, ret_val = self.wait_for_job_execution_process()
613613
break
@@ -625,7 +625,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
625625
self.wait_for_job_execution_process()
626626
break
627627

628-
self.maintain_heartbeats(job, queue)
628+
self._model.heartbeat(self.connection, self.job_monitoring_interval + 60)
629629

630630
except OSError as e:
631631
# In case we encountered an OSError due to EINTR (which is
@@ -685,11 +685,6 @@ def execute_job(self, job: JobModel, queue: Queue) -> None:
685685
self.perform_job(job, queue)
686686
self._model.set_field("state", WorkerStatus.IDLE, connection=self.connection)
687687

688-
def maintain_heartbeats(self, job: JobModel, queue: Queue) -> None:
689-
"""Updates worker and job's last heartbeat field."""
690-
self._model.heartbeat(self.connection, self.job_monitoring_interval + 60)
691-
ttl = self.get_heartbeat_ttl(job)
692-
693688
def execute_in_separate_process(self, job: JobModel, queue: Queue) -> None:
694689
"""This is the entry point of the newly spawned job execution process.
695690
After fork()'ing, assure we are generating random sequences that are different from the worker.
@@ -840,7 +835,7 @@ class RoundRobinWorker(Worker):
840835

841836
def reorder_queues(self, reference_queue: Queue) -> None:
842837
pos = self._ordered_queues.index(reference_queue)
843-
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
838+
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
844839

845840

846841
class RandomWorker(Worker):

0 commit comments

Comments
 (0)