Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
95b1b7d
first prototype, working for BS=1
benchislett Aug 24, 2025
5bd9851
wip for batched
benchislett Aug 25, 2025
9a59696
fix bs > 1
benchislett Aug 25, 2025
a24d715
add back removed code
benchislett Aug 25, 2025
51b6169
minor perf optimization
benchislett Aug 25, 2025
2832e37
improvements
benchislett Aug 26, 2025
bd331b4
remove old prints
benchislett Aug 26, 2025
dfa5ca9
Merge branch 'main' into overlap-model-execution
benchislett Aug 26, 2025
c118525
fix precommit
benchislett Aug 26, 2025
43b4f17
Merge branch 'main' into overlap-model-execution
benchislett Aug 27, 2025
752ccf9
misc cleanup
benchislett Aug 27, 2025
9f28326
refactor prepare_input_ids
benchislett Aug 27, 2025
15d7b31
tiny refactor to reorder some ops
benchislett Aug 27, 2025
b351a56
Merge branch 'main' into overlap-model-execution
benchislett Sep 2, 2025
5df3ae8
refactor async model runner output
benchislett Sep 2, 2025
efcc3ee
tiny cleanup
benchislett Sep 2, 2025
b4611f4
Merge branch 'main' into overlap-model-execution
benchislett Sep 2, 2025
6c025bb
remove torch from multiproc_executor
benchislett Sep 2, 2025
bc99a79
refactor async output in multiproc executor
benchislett Sep 3, 2025
2ffa123
cleanup
benchislett Sep 3, 2025
7ae3166
improve async gpu model runner output structure
benchislett Sep 3, 2025
75c109d
use cuda event to sync copy stream
benchislett Sep 3, 2025
3f9d46b
Merge branch 'main' into overlap-model-execution
benchislett Sep 3, 2025
ff5bc7a
minor refactor for readability
benchislett Sep 4, 2025
6a44032
more minor refactor
benchislett Sep 4, 2025
b411981
Merge branch 'main' into overlap-model-execution
benchislett Sep 4, 2025
0d23f0e
refactor prepare_input_ids for fewer cpu ops
benchislett Sep 4, 2025
54feea9
restructure multiproc output handling to isolate effects on non-async…
benchislett Sep 5, 2025
4bddae2
Merge branch 'main' into overlap-model-execution
benchislett Sep 5, 2025
70f4921
Merge branch 'main' into overlap-model-execution
benchislett Sep 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import multiprocessing
import os
import pickle
import queue
import signal
import threading
import time
Expand Down Expand Up @@ -33,7 +34,8 @@
get_loopback_ip, get_mp_context, get_open_port,
set_process_title)
from vllm.v1.executor.abstract import Executor, FailureCallback
from vllm.v1.outputs import DraftTokenIds, ModelRunnerOutput
from vllm.v1.outputs import (AsyncModelRunnerOutput, DraftTokenIds,
ModelRunnerOutput)
from vllm.worker.worker_base import WorkerWrapperBase

logger = init_logger(__name__)
Expand Down Expand Up @@ -412,6 +414,16 @@ def __init__(
# Initializes a message queue for sending the model output
self.worker_response_mq = MessageQueue(1, 1)

scheduler_config = vllm_config.scheduler_config
self.use_async_scheduling = scheduler_config.async_scheduling
if self.use_async_scheduling:
self.async_output_queue: queue.Queue = queue.Queue()
self.async_output_copy_thread = Thread(
target=self.async_output_busy_loop,
daemon=True,
name="WorkerAsyncOutputCopy")
self.async_output_copy_thread.start()

# Initialize device and loads weights
self.worker.init_device()
self.worker.load_model()
Expand Down Expand Up @@ -593,6 +605,36 @@ class ResponseStatus(Enum):
SUCCESS = auto()
FAILURE = auto()

def enqueue_output(self, output: Any):
"""Prepares output from the worker and enqueues it to the
worker_response_mq. If the output is an Exception, it is
converted to a FAILURE response.
"""
if isinstance(output, AsyncModelRunnerOutput):
output = output.get_output()

if isinstance(output, Exception):
result = (WorkerProc.ResponseStatus.FAILURE, str(output))
else:
result = (WorkerProc.ResponseStatus.SUCCESS, output)
self.worker_response_mq.enqueue(result)

def handle_output(self, output: Any):
"""Handles output from the worker. If async scheduling is enabled,
it is passed to the async_output_busy_loop thread. Otherwise, it is
enqueued directly to the worker_response_mq.
"""
if self.use_async_scheduling:
self.async_output_queue.put(output)
else:
self.enqueue_output(output)

def async_output_busy_loop(self):
"""Entrypoint for the thread which handles outputs asynchronously."""
while True:
output = self.async_output_queue.get()
self.enqueue_output(output)

def worker_busy_loop(self):
"""Main busy loop for Multiprocessing Workers"""
while True:
Expand All @@ -612,10 +654,8 @@ def worker_busy_loop(self):
# exception might not be serializable, so we convert it to
# string, only for logging purpose.
if output_rank is None or self.rank == output_rank:
self.worker_response_mq.enqueue(
(WorkerProc.ResponseStatus.FAILURE, str(e)))
self.handle_output(e)
continue

if output_rank is None or self.rank == output_rank:
self.worker_response_mq.enqueue(
(WorkerProc.ResponseStatus.SUCCESS, output))
self.handle_output(output)
15 changes: 15 additions & 0 deletions vllm/v1/outputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import NamedTuple, Optional

Expand Down Expand Up @@ -114,6 +115,20 @@ class ModelRunnerOutput:
num_nans_in_logits: Optional[dict[str, int]] = None


# ModelRunnerOutput wrapper for async scheduling.
class AsyncModelRunnerOutput(ABC):

@abstractmethod
def get_output(self) -> ModelRunnerOutput:
"""Get the ModelRunnerOutput for this async output.

This is a blocking call that waits until the results are ready, which
might involve copying device tensors to the host.
This method should only be called once per AsyncModelRunnerOutput.
"""
pass


@dataclass
class DraftTokenIds:

Expand Down
5 changes: 5 additions & 0 deletions vllm/v1/worker/gpu_input_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ def __init__(

self.pooling_params: dict[str, PoolingParams] = {}

# Cached reference to the GPU tensor of previously sampled tokens
self.prev_sampled_token_ids: Optional[torch.Tensor] = None
self.prev_sampled_token_ids_invalid_indices: Optional[set[int]] = None
self.prev_req_id_to_index: Optional[dict[str, int]] = None

@property
def req_ids(self) -> list[str]:
# None elements should only be present transiently
Expand Down
Loading