Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements/docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ ruff
cachetools
msgspec
pydantic
torch
torch
5 changes: 2 additions & 3 deletions vllm/v1/core/sched/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from vllm._bc_linter import bc_linter_include

if TYPE_CHECKING:
import numpy as np
import numpy.typing as npt
import torch

from vllm.distributed.kv_transfer.kv_connector.v1.base import (
Expand All @@ -20,6 +18,7 @@
from vllm.pooling_params import PoolingParams
from vllm.sampling_params import SamplingParams
from vllm.v1.request import Request
from vllm.v1.structured_output import GrammarBitmaskPlaceholder


@bc_linter_include
Expand Down Expand Up @@ -160,7 +159,7 @@
# for filling the next token bitmask
structured_output_request_ids: dict[str, int]
# the bitmask for the whole batch
grammar_bitmask: Optional[npt.NDArray[np.int32]]
grammar_bitmask: GrammarBitmaskPlaceholder

Check notice on line 162 in vllm/v1/core/sched/output.py

View workflow job for this annotation

GitHub Actions / bc_lint

Function SchedulerOutput: grammar_bitmask changed from Optional[npt.NDArray[np.int32]] to GrammarBitmaskPlaceholder

# KV Cache Connector metadata.
kv_connector_metadata: Optional[KVConnectorMetadata] = None
34 changes: 24 additions & 10 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ def schedule(self) -> SchedulerOutput:
# for FSM compilation.
if request.status == RequestStatus.WAITING_FOR_FSM:
structured_output_req = request.structured_output_request
if structured_output_req and structured_output_req.grammar:
if structured_output_req and \
structured_output_req.is_grammar_ready:
request.status = RequestStatus.WAITING
else:
self.waiting.pop_request()
Expand Down Expand Up @@ -851,7 +852,9 @@ def get_grammar_bitmask(
if not structured_output_request_ids:
bitmask = None
else:
bitmask = self.structured_output_manager.grammar_bitmask(
# Submit async grammar bitmask computation, return the placeholder
# The actual result will be retrieved later in gpu_model_runner
bitmask = self.structured_output_manager.submit_grammar_bitmask(
self.requests,
structured_output_request_ids,
scheduled_spec_decode_tokens,
Expand All @@ -870,6 +873,7 @@ def update_from_output(
pooler_outputs = model_runner_output.pooler_output
num_nans_in_logits = model_runner_output.num_nans_in_logits
kv_connector_output = model_runner_output.kv_connector_output
structured_list = []

outputs: dict[int, list[EngineCoreOutput]] = defaultdict(list)
spec_decoding_stats: Optional[SpecDecodingStats] = None
Expand Down Expand Up @@ -945,11 +949,7 @@ def update_from_output(

if new_token_ids and self.structured_output_manager.should_advance(
request):
# NOTE: structured_output_request
# should not be None if use_structured_output, we have
# checked above, so safe to ignore type warning
request.structured_output_request.grammar.accept_tokens( # type: ignore[union-attr]
req_id, new_token_ids)
structured_list.append((req_id, new_token_ids))

if num_nans_in_logits is not None and req_id in num_nans_in_logits:
request.num_nans_in_logits = num_nans_in_logits[req_id]
Expand Down Expand Up @@ -985,6 +985,9 @@ def update_from_output(
# This is a rare case and unlikely to impact performance.
self.waiting.remove_requests(stopped_preempted_reqs)

self.structured_output_manager.submit_batch_accept_tokens(
structured_list)

# KV Connector: update state for finished KV Transfers.
if model_runner_output.kv_connector_output:
self._update_from_kv_xfer_finished(
Expand Down Expand Up @@ -1070,6 +1073,8 @@ def update_draft_token_ids(
self,
draft_token_ids: DraftTokenIds,
) -> None:
spec_structured_dict = {}

for req_id, spec_token_ids in zip(
draft_token_ids.req_ids,
draft_token_ids.draft_token_ids,
Expand All @@ -1084,12 +1089,21 @@ def update_draft_token_ids(
# NOTE(woosuk): request.spec_token_ids should be updated.
request.spec_token_ids.clear()
elif self.structured_output_manager.should_advance(request):
metadata = request.structured_output_request
request.spec_token_ids = metadata.grammar.validate_tokens( # type: ignore[union-attr]
spec_token_ids)
spec_structured_dict[req_id] = spec_token_ids
else:
request.spec_token_ids = spec_token_ids

# Batch validate tokens for structured output requests
spec_structured_dict = (
self.structured_output_manager.submit_batch_validate_tokens(
spec_structured_dict))

# Update requests with validated tokens
for req_id in spec_structured_dict:
request = self.requests.get(req_id)
if request is not None:
request.spec_token_ids = spec_structured_dict[req_id]

def get_request_counts(self) -> tuple[int, int]:
"""Returns (num_running_reqs, num_waiting_reqs)."""
return len(self.running), len(self.waiting)
Expand Down
12 changes: 6 additions & 6 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def step_with_batch_queue(
return engine_core_outputs, model_executed

def shutdown(self):
self.structured_output_manager.clear_backend()
self.structured_output_manager.shutdown()
if self.model_executor:
self.model_executor.shutdown()
if self.scheduler:
Expand Down Expand Up @@ -442,11 +442,11 @@ def preprocess_add_request(
self.request_block_hasher)
if req.use_structured_output:
# Note on thread safety: no race condition.
# `grammar_init` is only invoked in input processing thread. For
# `structured_output_manager`, each request is independent and
# grammar compilation is async. Scheduler always checks grammar
# compilation status before scheduling request.
self.structured_output_manager.grammar_init(req)
# `submit_grammar_init` is only invoked in input processing thread.
# For `structured_output_manager`, each request is independent
# and grammar compilation is async. Scheduler always checks
# grammar compilation status before scheduling request.
self.structured_output_manager.submit_grammar_init(req)
return req, request.current_wave


Expand Down
Loading