@@ -57,14 +57,13 @@ class TaskScheduler:
5757 the code launching the task.
5858 You can also specify the `result` (and/or an `error`) when returning from the function.
5959
60- The reconciliation loop runs every 5 mns, so this is not a precise scheduler. When wanting
61- to launch now, the launch will still not happen before the next loop run.
62-
63- Tasks will be run on the worker specified with `run_background_tasks_on` config,
64- or the main one by default.
60+ The reconciliation loop runs every minute, so this is not a precise scheduler.
6561 There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
6662 full. In this regard, please take great care that scheduled tasks can actually finished.
6763 For now there is no mechanism to stop a running task if it is stuck.
64+
65+ Tasks will be run on the worker specified with `run_background_tasks_on` config,
66+ or the main one by default.
6867 """
6968
7069 # Precision of the scheduler, evaluation of tasks to run will only happen
@@ -85,7 +84,7 @@ def __init__(self, hs: "HomeServer"):
8584 self ._actions : Dict [
8685 str ,
8786 Callable [
88- [ScheduledTask , bool ],
87+ [ScheduledTask ],
8988 Awaitable [Tuple [TaskStatus , Optional [JsonMapping ], Optional [str ]]],
9089 ],
9190 ] = {}
@@ -98,11 +97,13 @@ def __init__(self, hs: "HomeServer"):
9897 "handle_scheduled_tasks" ,
9998 self ._handle_scheduled_tasks ,
10099 )
100+ else :
101+ self .replication_client = hs .get_replication_command_handler ()
101102
102103 def register_action (
103104 self ,
104105 function : Callable [
105- [ScheduledTask , bool ],
106+ [ScheduledTask ],
106107 Awaitable [Tuple [TaskStatus , Optional [JsonMapping ], Optional [str ]]],
107108 ],
108109 action_name : str ,
@@ -115,10 +116,9 @@ def register_action(
115116 calling `schedule_task` but rather in an `__init__` method.
116117
117118 Args:
118- function: The function to be executed for this action. The parameters
119- passed to the function when launched are the `ScheduledTask` being run,
120- and a `first_launch` boolean to signal if it's a resumed task or the first
121- launch of it. The function should return a tuple of new `status`, `result`
119+ function: The function to be executed for this action. The parameter
120+ passed to the function when launched is the `ScheduledTask` being run.
121+ The function should return a tuple of new `status`, `result`
122122 and `error` as specified in `ScheduledTask`.
123123 action_name: The name of the action to be associated with the function
124124 """
@@ -171,6 +171,12 @@ async def schedule_task(
171171 )
172172 await self ._store .insert_scheduled_task (task )
173173
174+ if status == TaskStatus .ACTIVE :
175+ if self ._run_background_tasks :
176+ await self ._launch_task (task )
177+ else :
178+ self .replication_client .send_new_active_task (task .id )
179+
174180 return task .id
175181
176182 async def update_task (
@@ -265,21 +271,13 @@ async def delete_task(self, id: str) -> None:
265271 Args:
266272 id: id of the task to delete
267273 """
268- if self .task_is_running (id ):
269- raise Exception (f"Task { id } is currently running and can't be deleted" )
274+ task = await self .get_task (id )
275+ if task is None :
276+ raise Exception (f"Task { id } does not exist" )
277+ if task .status == TaskStatus .ACTIVE :
278+ raise Exception (f"Task { id } is currently ACTIVE and can't be deleted" )
270279 await self ._store .delete_scheduled_task (id )
271280
272- def task_is_running (self , id : str ) -> bool :
273- """Check if a task is currently running.
274-
275- Can only be called from the worker handling the task scheduling.
276-
277- Args:
278- id: id of the task to check
279- """
280- assert self ._run_background_tasks
281- return id in self ._running_tasks
282-
283281 async def _handle_scheduled_tasks (self ) -> None :
284282 """Main loop taking care of launching tasks and cleaning up old ones."""
285283 await self ._launch_scheduled_tasks ()
@@ -288,29 +286,11 @@ async def _handle_scheduled_tasks(self) -> None:
288286 async def _launch_scheduled_tasks (self ) -> None :
289287 """Retrieve and launch scheduled tasks that should be running at that time."""
290288 for task in await self .get_tasks (statuses = [TaskStatus .ACTIVE ]):
291- if not self .task_is_running (task .id ):
292- if (
293- len (self ._running_tasks )
294- < TaskScheduler .MAX_CONCURRENT_RUNNING_TASKS
295- ):
296- await self ._launch_task (task , first_launch = False )
297- else :
298- if (
299- self ._clock .time_msec ()
300- > task .timestamp + TaskScheduler .LAST_UPDATE_BEFORE_WARNING_MS
301- ):
302- logger .warn (
303- f"Task { task .id } (action { task .action } ) has seen no update for more than 24h and may be stuck"
304- )
289+ await self ._launch_task (task )
305290 for task in await self .get_tasks (
306291 statuses = [TaskStatus .SCHEDULED ], max_timestamp = self ._clock .time_msec ()
307292 ):
308- if (
309- not self .task_is_running (task .id )
310- and len (self ._running_tasks )
311- < TaskScheduler .MAX_CONCURRENT_RUNNING_TASKS
312- ):
313- await self ._launch_task (task , first_launch = True )
293+ await self ._launch_task (task )
314294
315295 running_tasks_gauge .set (len (self ._running_tasks ))
316296
@@ -320,27 +300,27 @@ async def _clean_scheduled_tasks(self) -> None:
320300 statuses = [TaskStatus .FAILED , TaskStatus .COMPLETE ]
321301 ):
322302 # FAILED and COMPLETE tasks should never be running
323- assert not self .task_is_running ( task . id )
303+ assert task . id not in self ._running_tasks
324304 if (
325305 self ._clock .time_msec ()
326306 > task .timestamp + TaskScheduler .KEEP_TASKS_FOR_MS
327307 ):
328308 await self ._store .delete_scheduled_task (task .id )
329309
330- async def _launch_task (self , task : ScheduledTask , first_launch : bool ) -> None :
310+ async def _launch_task (self , task : ScheduledTask ) -> None :
331311 """Launch a scheduled task now.
332312
333313 Args:
334314 task: the task to launch
335- first_launch: `True` if it's the first time is launched, `False` otherwise
336315 """
337- assert task . action in self ._actions
316+ assert self ._run_background_tasks
338317
318+ assert task .action in self ._actions
339319 function = self ._actions [task .action ]
340320
341321 async def wrapper () -> None :
342322 try :
343- (status , result , error ) = await function (task , first_launch )
323+ (status , result , error ) = await function (task )
344324 except Exception :
345325 f = Failure ()
346326 logger .error (
@@ -360,6 +340,20 @@ async def wrapper() -> None:
360340 )
361341 self ._running_tasks .remove (task .id )
362342
343+ if len (self ._running_tasks ) >= TaskScheduler .MAX_CONCURRENT_RUNNING_TASKS :
344+ return
345+
346+ if (
347+ self ._clock .time_msec ()
348+ > task .timestamp + TaskScheduler .LAST_UPDATE_BEFORE_WARNING_MS
349+ ):
350+ logger .warn (
351+ f"Task { task .id } (action { task .action } ) has seen no update for more than 24h and may be stuck"
352+ )
353+
354+ if task .id in self ._running_tasks :
355+ return
356+
363357 self ._running_tasks .add (task .id )
364358 await self .update_task (task .id , status = TaskStatus .ACTIVE )
365359 description = f"{ task .id } -{ task .action } "
0 commit comments