Skip to content

Commit b19a33b

Browse files
committed
Add stop event
1 parent 9878734 commit b19a33b

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

src/guidellm/scheduler/scheduler.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from multiprocessing import Manager
77
from queue import Empty as QueueEmpty
88
from queue import Queue
9+
from threading import Event
910
from typing import (
1011
Any,
1112
Generic,
@@ -126,7 +127,7 @@ async def run(
126127
) as executor,
127128
):
128129
requests_iter: Optional[Iterator[Any]] = None
129-
futures, queues = await self._start_processes(
130+
futures, queues, stop_event = await self._start_processes(
130131
manager, executor, scheduling_strategy
131132
)
132133
run_info, requests_iter, times_iter = self._run_setup(
@@ -178,7 +179,7 @@ async def run(
178179
run_info=run_info,
179180
)
180181

181-
await self._stop_processes(futures, queues.requests)
182+
await self._stop_processes(futures, stop_event)
182183

183184
async def _start_processes(
184185
self,
@@ -188,6 +189,7 @@ async def _start_processes(
188189
) -> tuple[
189190
list[asyncio.Future],
190191
MPQueues[RequestT, ResponseT],
192+
Event,
191193
]:
192194
await self.worker.prepare_multiprocessing()
193195
queues: MPQueues[RequestT, ResponseT] = MPQueues(
@@ -197,6 +199,7 @@ async def _start_processes(
197199
times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit),
198200
responses=manager.Queue(),
199201
)
202+
stop_event = manager.Event()
200203

201204
num_processes = min(
202205
scheduling_strategy.processes_limit,
@@ -226,6 +229,7 @@ async def _start_processes(
226229
executor,
227230
self.worker.process_loop_asynchronous,
228231
queues,
232+
stop_event,
229233
False, # TODO: Make configurable
230234
requests_limit,
231235
id_,
@@ -234,7 +238,7 @@ async def _start_processes(
234238

235239
await asyncio.sleep(0.1) # give time for processes to start
236240

237-
return futures, queues
241+
return futures, queues, stop_event
238242

239243
def _run_setup(
240244
self,
@@ -369,10 +373,9 @@ def _check_result_ready(
369373
async def _stop_processes(
370374
self,
371375
futures: list[asyncio.Future],
372-
requests_queue: Queue[RequestSession[RequestT, ResponseT]],
376+
stop_event: Event,
373377
):
374-
# FIXME: Need new method for stopping workers
375-
for _ in futures:
376-
requests_queue.put(None)
378+
# stop all processes
379+
stop_event.set()
377380

378381
await asyncio.gather(*futures)

src/guidellm/scheduler/worker.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from dataclasses import dataclass
77
from queue import Empty as QueueEmpty
88
from queue import Queue
9+
from threading import Event
910
from typing import (
1011
Any,
1112
Generic,
@@ -181,6 +182,7 @@ async def resolve_scheduler_request(
181182
def process_loop_asynchronous(
182183
self,
183184
queues: MPQueues[RequestT, ResponseT],
185+
stop_event: Event,
184186
prioritize_sessions: bool,
185187
max_concurrency: int,
186188
process_id: int,
@@ -207,7 +209,10 @@ async def _process_runner():
207209
if request_session is not None:
208210
pending_sessions.append(request_session)
209211
lock.release()
210-
continue
212+
if stop_event.is_set():
213+
return # Exit if stop event is set
214+
else:
215+
continue
211216

212217
async def wait_then_requeue(
213218
session: RequestSession[RequestT, ResponseT],
@@ -309,13 +314,15 @@ async def prepare_multiprocessing(self):
309314
def process_loop_asynchronous(
310315
self,
311316
queues: MPQueues[GenerationRequest, ResponseSummary],
317+
stop_event: Event,
312318
prioritize_sessions: bool,
313319
max_concurrency: int,
314320
process_id: int,
315321
):
316322
asyncio.run(self.backend.validate())
317323
super().process_loop_asynchronous(
318324
queues=queues,
325+
stop_event=stop_event,
319326
prioritize_sessions=prioritize_sessions,
320327
max_concurrency=max_concurrency,
321328
process_id=process_id,

0 commit comments

Comments
 (0)