Skip to content

Commit d80cd57

Browse files
authored
Fix new scheduled tasks jumping the queue (#17962)
Currently, when a new scheduled task is added and its scheduled time has already passed, we set it to ACTIVE. This is problematic, because it means it will jump the queue ahead of all other SCHEDULED tasks; furthermore, if the Synapse process gets restarted, it will jump ahead of any ACTIVE tasks which have been started but are taking a while to run. Instead, we leave it set to SCHEDULED, but kick off a call to `_launch_scheduled_tasks`, which will decide if we actually have capacity to start a new task, and start the newly-added task if so.
1 parent 59ad4b1 commit d80cd57

File tree

5 files changed

+64
-55
lines changed

5 files changed

+64
-55
lines changed

changelog.d/17962.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix new scheduled tasks jumping the queue.

synapse/replication/tcp/commands.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ def to_line(self) -> str:
495495

496496

497497
class NewActiveTaskCommand(_SimpleCommand):
498-
"""Sent to inform instance handling background tasks that a new active task is available to run.
498+
"""Sent to inform instance handling background tasks that a new task is ready to run.
499499
500500
Format::
501501

synapse/replication/tcp/handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,7 @@ def on_NEW_ACTIVE_TASK(
727727
) -> None:
728728
"""Called when get a new NEW_ACTIVE_TASK command."""
729729
if self._task_scheduler:
730-
self._task_scheduler.launch_task_by_id(cmd.data)
730+
self._task_scheduler.on_new_task(cmd.data)
731731

732732
def new_connection(self, connection: IReplicationConnection) -> None:
733733
"""Called when we have a new connection."""

synapse/util/task_scheduler.py

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,10 @@ async def schedule_task(
174174
The id of the scheduled task
175175
"""
176176
status = TaskStatus.SCHEDULED
177+
start_now = False
177178
if timestamp is None or timestamp < self._clock.time_msec():
178179
timestamp = self._clock.time_msec()
179-
status = TaskStatus.ACTIVE
180+
start_now = True
180181

181182
task = ScheduledTask(
182183
random_string(16),
@@ -190,9 +191,11 @@ async def schedule_task(
190191
)
191192
await self._store.insert_scheduled_task(task)
192193

193-
if status == TaskStatus.ACTIVE:
194+
# If the task is ready to run immediately, run the scheduling algorithm now
195+
# rather than waiting
196+
if start_now:
194197
if self._run_background_tasks:
195-
await self._launch_task(task)
198+
self._launch_scheduled_tasks()
196199
else:
197200
self._hs.get_replication_command_handler().send_new_active_task(task.id)
198201

@@ -300,23 +303,13 @@ async def delete_task(self, id: str) -> None:
300303
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
301304
await self._store.delete_scheduled_task(id)
302305

303-
def launch_task_by_id(self, id: str) -> None:
304-
"""Try launching the task with the given ID."""
305-
# Don't bother trying to launch new tasks if we're already at capacity.
306-
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
307-
return
308-
309-
run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)
310-
311-
async def _launch_task_by_id(self, id: str) -> None:
312-
"""Helper async function for `launch_task_by_id`."""
313-
task = await self.get_task(id)
314-
if task:
315-
await self._launch_task(task)
306+
def on_new_task(self, task_id: str) -> None:
307+
"""Handle a notification that a new ready-to-run task has been added to the queue"""
308+
# Just run the scheduler
309+
self._launch_scheduled_tasks()
316310

317-
@wrap_as_background_process("launch_scheduled_tasks")
318-
async def _launch_scheduled_tasks(self) -> None:
319-
"""Retrieve and launch scheduled tasks that should be running at that time."""
311+
def _launch_scheduled_tasks(self) -> None:
312+
"""Retrieve and launch scheduled tasks that should be running at this time."""
320313
# Don't bother trying to launch new tasks if we're already at capacity.
321314
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
322315
return
@@ -326,20 +319,26 @@ async def _launch_scheduled_tasks(self) -> None:
326319

327320
self._launching_new_tasks = True
328321

329-
try:
330-
for task in await self.get_tasks(
331-
statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
332-
):
333-
await self._launch_task(task)
334-
for task in await self.get_tasks(
335-
statuses=[TaskStatus.SCHEDULED],
336-
max_timestamp=self._clock.time_msec(),
337-
limit=self.MAX_CONCURRENT_RUNNING_TASKS,
338-
):
339-
await self._launch_task(task)
340-
341-
finally:
342-
self._launching_new_tasks = False
322+
async def inner() -> None:
323+
try:
324+
for task in await self.get_tasks(
325+
statuses=[TaskStatus.ACTIVE],
326+
limit=self.MAX_CONCURRENT_RUNNING_TASKS,
327+
):
328+
# _launch_task will ignore tasks that we're already running, and
329+
# will also do nothing if we're already at the maximum capacity.
330+
await self._launch_task(task)
331+
for task in await self.get_tasks(
332+
statuses=[TaskStatus.SCHEDULED],
333+
max_timestamp=self._clock.time_msec(),
334+
limit=self.MAX_CONCURRENT_RUNNING_TASKS,
335+
):
336+
await self._launch_task(task)
337+
338+
finally:
339+
self._launching_new_tasks = False
340+
341+
run_as_background_process("launch_scheduled_tasks", inner)
343342

344343
@wrap_as_background_process("clean_scheduled_tasks")
345344
async def _clean_scheduled_tasks(self) -> None:

tests/util/test_task_scheduler.py

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
# [This file includes modifications made by New Vector Limited]
1919
#
2020
#
21-
22-
from typing import Optional, Tuple
21+
from typing import List, Optional, Tuple
2322

2423
from twisted.internet.task import deferLater
2524
from twisted.test.proto_helpers import MemoryReactor
@@ -104,33 +103,43 @@ def test_schedule_lot_of_tasks(self) -> None:
104103
)
105104
)
106105

107-
# This is to give the time to the active tasks to finish
106+
def get_tasks_of_status(status: TaskStatus) -> List[ScheduledTask]:
107+
tasks = (
108+
self.get_success(self.task_scheduler.get_task(task_id))
109+
for task_id in task_ids
110+
)
111+
return [t for t in tasks if t is not None and t.status == status]
112+
113+
# At this point, there should be MAX_CONCURRENT_RUNNING_TASKS active tasks and
114+
# one scheduled task.
115+
self.assertEquals(
116+
len(get_tasks_of_status(TaskStatus.ACTIVE)),
117+
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
118+
)
119+
self.assertEquals(
120+
len(get_tasks_of_status(TaskStatus.SCHEDULED)),
121+
1,
122+
)
123+
124+
# Give the time to the active tasks to finish
108125
self.reactor.advance(1)
109126

110-
# Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
127+
# Check that MAX_CONCURRENT_RUNNING_TASKS tasks have run and that one
111128
# is still scheduled.
112-
tasks = [
113-
self.get_success(self.task_scheduler.get_task(task_id))
114-
for task_id in task_ids
115-
]
116-
117129
self.assertEquals(
118-
len(
119-
[t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
120-
),
130+
len(get_tasks_of_status(TaskStatus.COMPLETE)),
121131
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
122132
)
123-
124-
scheduled_tasks = [
125-
t for t in tasks if t is not None and t.status == TaskStatus.ACTIVE
126-
]
133+
scheduled_tasks = get_tasks_of_status(TaskStatus.SCHEDULED)
127134
self.assertEquals(len(scheduled_tasks), 1)
128135

129-
# We need to wait for the next run of the scheduler loop
130-
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
131-
self.reactor.advance(1)
136+
# The scheduled task should start 0.1s after the first of the active tasks
137+
# finishes
138+
self.reactor.advance(0.1)
139+
self.assertEquals(len(get_tasks_of_status(TaskStatus.ACTIVE)), 1)
132140

133-
# Check that the last task has been properly executed after the next scheduler loop run
141+
# ... and should finally complete after another second
142+
self.reactor.advance(1)
134143
prev_scheduled_task = self.get_success(
135144
self.task_scheduler.get_task(scheduled_tasks[0].id)
136145
)

0 commit comments

Comments
 (0)