Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
9ca44ce
[V1] AsyncLLM data parallel WIP
njhill Feb 26, 2025
3f51611
Handle pausing loop
njhill Feb 27, 2025
d8c591e
More single-node updates
njhill Feb 27, 2025
65e225d
some cleanup
njhill Feb 27, 2025
5ce57b6
fix up utility methods
njhill Feb 27, 2025
a3f1102
revert config check
njhill Feb 27, 2025
a66fb01
fixes
njhill Feb 27, 2025
67672c2
cleanup
njhill Feb 27, 2025
cf52fbf
fixes
njhill Feb 27, 2025
a4ec81b
reconcile with LLMEngine DP in decoupled engine case
njhill Feb 27, 2025
292aa00
minor simplification
njhill Feb 27, 2025
4b62ffd
rework
njhill Feb 28, 2025
407c72e
class refactor
njhill Mar 1, 2025
31bf7ea
fix
njhill Mar 1, 2025
fde51ce
adjust core engine init
njhill Mar 1, 2025
d5a3e68
Merge remote-tracking branch 'refs/remotes/origin/main' into multi-en…
njhill Mar 3, 2025
6d89a1b
fix new typing
njhill Mar 3, 2025
448abd9
fix :facepalm:
njhill Mar 3, 2025
a1e513e
bind socket first
njhill Mar 3, 2025
50cf64c
do you have to let it linger
njhill Mar 3, 2025
f365998
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 3, 2025
b2571f0
add comments
njhill Mar 4, 2025
32c6f24
aggregate stats
njhill Mar 4, 2025
9c30cd7
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 4, 2025
672d07e
Fix test
njhill Mar 4, 2025
dea382b
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 5, 2025
d24a626
fix and minor cleanup
njhill Mar 5, 2025
cd03c80
Add CI test
njhill Mar 6, 2025
f1004b7
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 6, 2025
d3298fa
Some simplification and fixes
njhill Mar 6, 2025
74dde48
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 6, 2025
5fe1b75
address @markmc's stats suggestion
njhill Mar 6, 2025
648659f
address @tms's arg comment
njhill Mar 6, 2025
119d1ec
fix utility method breakage
njhill Mar 6, 2025
55328ee
rename AsyncMPClient output_processor to output_handler
njhill Mar 6, 2025
4f5330e
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 6, 2025
48770ec
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 7, 2025
d229f4d
Fix
njhill Mar 7, 2025
2f91cc4
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 15, 2025
518047a
Remove redundant logic related to removed stats aggregation
njhill Mar 13, 2025
cb2b099
Fixes
njhill Mar 15, 2025
ff1137a
Merge remote-tracking branch 'refs/remotes/origin/main' into multi-en…
njhill Mar 16, 2025
61f4fcb
fix issue from main merge
njhill Mar 16, 2025
44874c2
remove leftover unused field
njhill Mar 17, 2025
66fc582
Fix offline DP compatibility
njhill Mar 17, 2025
7764466
Add timeout to data_parallel.py
njhill Mar 17, 2025
51e8bf0
Merge remote-tracking branch 'refs/remotes/origin/main' into multi-en…
njhill Mar 17, 2025
f692c12
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 19, 2025
47b5e1c
Enable less-frequent all-reduce optimization
njhill Mar 20, 2025
f226139
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 20, 2025
af47920
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 20, 2025
693c521
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 20, 2025
6e131e3
clean distributed shutdown
njhill Mar 20, 2025
d9ac856
address misc loose-ends
njhill Mar 20, 2025
3abbdef
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 21, 2025
b18417e
further tweaks
njhill Mar 21, 2025
56b2b78
Merge remote-tracking branch 'refs/remotes/origin/main' into multi-en…
njhill Mar 25, 2025
05ab310
Additional debug
njhill Mar 25, 2025
5295c34
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 27, 2025
4f897b8
Address review comments on tests
njhill Mar 27, 2025
62f32ed
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 27, 2025
771ccf1
Fix env var fallback
njhill Mar 27, 2025
05a0e83
Fix test supports_v1 check
njhill Mar 27, 2025
bc41b13
Fix yapf :facepalm:
njhill Mar 27, 2025
ccecb42
Merge remote-tracking branch 'origin/main' into multi-engine
njhill Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
from vllm.transformers_utils.s3_utils import S3Model
from vllm.transformers_utils.utils import is_s3
from vllm.utils import (GiB_bytes, LayerBlockType, cuda_device_count_stateless,
get_cpu_memory, random_uuid, resolve_obj_by_qualname)
get_cpu_memory, get_open_port, random_uuid,
resolve_obj_by_qualname)

if TYPE_CHECKING:
from ray.util.placement_group import PlacementGroup
Expand Down Expand Up @@ -1423,10 +1424,15 @@ def __post_init__(self) -> None:
self.world_size = self.pipeline_parallel_size * \
self.tensor_parallel_size

self.data_parallel_size = envs.VLLM_DP_SIZE
self.data_parallel_rank = envs.VLLM_DP_RANK
self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT
if self.data_parallel_size > 1:
self.data_parallel_master_port = get_open_port()
# TODO multi-node
else:
self.data_parallel_size = envs.VLLM_DP_SIZE
self.data_parallel_rank = envs.VLLM_DP_RANK
self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT

self.world_size_across_dp = self.world_size * self.data_parallel_size

if self.distributed_executor_backend == "external_launcher":
Expand Down
7 changes: 7 additions & 0 deletions vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class EngineArgs:
# number of P/D disaggregation (or other disaggregation) workers
pipeline_parallel_size: int = 1
tensor_parallel_size: int = 1
data_parallel_size: int = 1
max_parallel_loading_workers: Optional[int] = None
block_size: Optional[int] = None
enable_prefix_caching: Optional[bool] = None
Expand Down Expand Up @@ -430,6 +431,11 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
type=int,
default=EngineArgs.tensor_parallel_size,
help='Number of tensor parallel replicas.')
parser.add_argument('--data-parallel-size',
'-dp',
type=int,
default=EngineArgs.data_parallel_size,
help='Number of data parallel replicas.')
parser.add_argument(
'--max-parallel-loading-workers',
type=int,
Expand Down Expand Up @@ -1170,6 +1176,7 @@ def create_engine_config(self,
parallel_config = ParallelConfig(
pipeline_parallel_size=self.pipeline_parallel_size,
tensor_parallel_size=self.tensor_parallel_size,
data_parallel_size=self.data_parallel_size,
max_parallel_loading_workers=self.max_parallel_loading_workers,
disable_custom_all_reduce=self.disable_custom_all_reduce,
tokenizer_pool_config=TokenizerPoolConfig.create_config(
Expand Down
6 changes: 3 additions & 3 deletions vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2130,11 +2130,11 @@ def make_zmq_socket(
if type == zmq.constants.PULL:
socket.setsockopt(zmq.constants.RCVHWM, 0)
socket.setsockopt(zmq.constants.RCVBUF, buf_size)
socket.connect(path)
socket.bind(path)
elif type == zmq.constants.PUSH:
socket.setsockopt(zmq.constants.SNDHWM, 0)
socket.setsockopt(zmq.constants.SNDBUF, buf_size)
socket.bind(path)
socket.connect(path)
else:
raise ValueError(f"Unknown Socket Type: {type}")

Expand All @@ -2147,7 +2147,7 @@ def zmq_socket_ctx(
type: Any) -> Iterator[zmq.Socket]: # type: ignore[name-defined]
"""Context manager for a ZMQ socket"""

ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined]
ctx = zmq.Context() # type: ignore[attr-defined]
try:
yield make_zmq_socket(ctx, path, type)

Expand Down
9 changes: 7 additions & 2 deletions vllm/v1/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def update_from_output(

new_running: List[Request] = []
outputs: List[EngineCoreOutput] = []
finished_requests: List[str] = []

# NOTE(woosuk): As len(self.running) can be up to 1K or more, the below
# loop can be a performance bottleneck. We should do our best to avoid
Expand Down Expand Up @@ -566,15 +567,18 @@ def update_from_output(
# Transmit partial if chunked prefill & prompt logprobs is enabled
if new_token_ids or prompt_logprobs_tensors is not None:
# Add EngineCoreOutput for this Request.
finish_reason = request.get_finished_reason()
outputs.append(
EngineCoreOutput(
request_id=req_id,
new_token_ids=new_token_ids,
finish_reason=request.get_finished_reason(),
finish_reason=finish_reason,
new_logprobs=new_logprobs,
new_prompt_logprobs_tensors=prompt_logprobs_tensors,
stop_reason=request.stop_reason,
events=request.take_events()))
if finish_reason:
finished_requests.append(req_id)

self.scheduled_req_ids.remove(request.request_id)
if not stopped:
Expand All @@ -583,6 +587,7 @@ def update_from_output(
self.running = new_running
return EngineCoreOutputs(
outputs=outputs,
finished_requests=finished_requests,
scheduler_stats=self.make_stats(),
)

Expand Down Expand Up @@ -653,7 +658,7 @@ def get_num_unfinished_requests(self) -> int:
return len(self.waiting) + len(self.running)

def has_unfinished_requests(self) -> bool:
return self.get_num_unfinished_requests() > 0
return len(self.running) > 0 or len(self.waiting) > 0

def get_num_unscheduled_requests(self) -> int:
"""Number of requests that are not being processed by the executor."""
Expand Down
6 changes: 5 additions & 1 deletion vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class EngineCoreOutputs(
timestamp: float = 0.0

utility_output: Optional[UtilityOutput] = None
finished_requests: List[str] = []
# In DP case, used to signal that the engine is paused.
global_finished: bool = False

def __post_init__(self):
if self.timestamp == 0.0:
Expand All @@ -146,4 +149,5 @@ class EngineCoreRequestType(enum.Enum):
"""
ADD = b'\x00'
ABORT = b'\x01'
UTILITY = b'\x02'
START_DP = b'\x02'
UTILITY = b'\x03'
Loading