33import multiprocessing
44import os
55import pickle
6+ import queue
67import signal
78import threading
89import time
3334 get_loopback_ip , get_mp_context , get_open_port ,
3435 set_process_title )
3536from vllm .v1 .executor .abstract import Executor , FailureCallback
36- from vllm .v1 .outputs import DraftTokenIds , ModelRunnerOutput
37+ from vllm .v1 .outputs import (AsyncModelRunnerOutput , DraftTokenIds ,
38+ ModelRunnerOutput )
3739from vllm .worker .worker_base import WorkerWrapperBase
3840
3941logger = init_logger (__name__ )
@@ -414,6 +416,16 @@ def __init__(
414416 # Initializes a message queue for sending the model output
415417 self .worker_response_mq = MessageQueue (1 , 1 )
416418
419+ scheduler_config = vllm_config .scheduler_config
420+ self .use_async_scheduling = scheduler_config .async_scheduling
421+ if self .use_async_scheduling :
422+ self .async_output_queue : queue .Queue = queue .Queue ()
423+ self .async_output_copy_thread = Thread (
424+ target = self .async_output_busy_loop ,
425+ daemon = True ,
426+ name = "WorkerAsyncOutputCopy" )
427+ self .async_output_copy_thread .start ()
428+
417429 # Initialize device and loads weights
418430 self .worker .init_device ()
419431 self .worker .load_model ()
@@ -595,6 +607,36 @@ class ResponseStatus(Enum):
595607 SUCCESS = auto ()
596608 FAILURE = auto ()
597609
610+ def enqueue_output (self , output : Any ):
611+ """Prepares output from the worker and enqueues it to the
612+ worker_response_mq. If the output is an Exception, it is
613+ converted to a FAILURE response.
614+ """
615+ if isinstance (output , AsyncModelRunnerOutput ):
616+ output = output .get_output ()
617+
618+ if isinstance (output , Exception ):
619+ result = (WorkerProc .ResponseStatus .FAILURE , str (output ))
620+ else :
621+ result = (WorkerProc .ResponseStatus .SUCCESS , output )
622+ self .worker_response_mq .enqueue (result )
623+
624+ def handle_output (self , output : Any ):
625+ """Handles output from the worker. If async scheduling is enabled,
626+ it is passed to the async_output_busy_loop thread. Otherwise, it is
627+ enqueued directly to the worker_response_mq.
628+ """
629+ if self .use_async_scheduling :
630+ self .async_output_queue .put (output )
631+ else :
632+ self .enqueue_output (output )
633+
634+ def async_output_busy_loop (self ):
635+ """Entrypoint for the thread which handles outputs asynchronously."""
636+ while True :
637+ output = self .async_output_queue .get ()
638+ self .enqueue_output (output )
639+
598640 def worker_busy_loop (self ):
599641 """Main busy loop for Multiprocessing Workers"""
600642 while True :
@@ -614,10 +656,8 @@ def worker_busy_loop(self):
614656 # exception might not be serializable, so we convert it to
615657 # string, only for logging purpose.
616658 if output_rank is None or self .rank == output_rank :
617- self .worker_response_mq .enqueue (
618- (WorkerProc .ResponseStatus .FAILURE , str (e )))
659+ self .handle_output (e )
619660 continue
620661
621662 if output_rank is None or self .rank == output_rank :
622- self .worker_response_mq .enqueue (
623- (WorkerProc .ResponseStatus .SUCCESS , output ))
663+ self .handle_output (output )
0 commit comments