From 80f26c1cd200969f730001d4ace8d4fafa3f9d60 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:04:18 +0800 Subject: [PATCH 01/21] fix assert Signed-off-by: youkaichao --- vllm/executor/uniproc_executor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vllm/executor/uniproc_executor.py b/vllm/executor/uniproc_executor.py index 94db232240d5..e041215de660 100644 --- a/vllm/executor/uniproc_executor.py +++ b/vllm/executor/uniproc_executor.py @@ -93,9 +93,10 @@ def _init_executor(self) -> None: ("ExecutorWithExternalLauncher needs deterministic " "execution, so it" "does not support delay_factor in scheduling") - assert not envs.VLLM_USE_V1, \ - ("V1 architecture cannot guarantee deterministic execution, " - "so it is not supported in ExecutorWithExternalLauncher.") + if envs.VLLM_USE_V1: + assert not envs.VLLM_ENABLE_V1_MULTIPROCESSING, \ + ("To get deterministic execution in V1, " + "please set VLLM_ENABLE_V1_MULTIPROCESSING=0") self.driver_worker = WorkerWrapperBase(vllm_config=self.vllm_config, rpc_rank=0) # engines are launched in torchrun-compatible launchers From 884fd62edb28d37e5eb2d3d05e5017c3d4ec45f2 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:20:43 +0800 Subject: [PATCH 02/21] use rpc rank Signed-off-by: youkaichao --- tests/v1/engine/test_engine_core.py | 6 ++++-- vllm/v1/engine/core.py | 2 +- vllm/v1/executor/abstract.py | 6 ++++-- vllm/v1/worker/gpu_worker.py | 5 ++--- vllm/v1/worker/tpu_worker.py | 3 +-- vllm/worker/worker_base.py | 4 ++++ 6 files changed, 16 insertions(+), 10 deletions(-) diff --git a/tests/v1/engine/test_engine_core.py b/tests/v1/engine/test_engine_core.py index d035668098eb..8c2998e58892 100644 --- a/tests/v1/engine/test_engine_core.py +++ b/tests/v1/engine/test_engine_core.py @@ -5,6 +5,7 @@ import time import uuid from concurrent.futures import Future +from typing import List import pytest from transformers import AutoTokenizer @@ -211,8 +212,9 @@ def make_request_with_max_tokens(max_tokens: int) -> EngineCoreRequest: class DummyExecutor(UniProcExecutor): - def initialize(self, kv_cache_config: KVCacheConfig) -> None: - super().initialize(kv_cache_config) + def initialize_from_config( + self, kv_cache_configs: List[KVCacheConfig]) -> None: + super().initialize_from_config(kv_cache_configs) # This executor actually can only run 1 batch at a time self.semaphore = threading.Semaphore(1) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 03825d6ea430..5f53254565de 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -110,7 +110,7 @@ def _initialize_kv_caches(self, num_cpu_blocks = 0 # Initialize kv cache and warmup the execution - self.model_executor.initialize(kv_cache_configs) + self.model_executor.initialize_from_config(kv_cache_configs) elapsed = time.time() - start logger.info(("init engine (profile, create kv cache, " diff --git a/vllm/v1/executor/abstract.py b/vllm/v1/executor/abstract.py index 3663cbd08aec..6956c4d64ffa 100644 --- a/vllm/v1/executor/abstract.py +++ b/vllm/v1/executor/abstract.py @@ -49,12 +49,14 @@ def get_class(vllm_config: VllmConfig) -> Type["Executor"]: f"{distributed_executor_backend}") return executor_class - def initialize(self, kv_cache_configs: List[KVCacheConfig]) -> None: + def initialize_from_config(self, + kv_cache_configs: List[KVCacheConfig]) -> None: """ Initialize the KV caches and begin the model execution loop of the underlying workers. """ - self.collective_rpc("initialize_cache", args=(kv_cache_configs, )) + self.collective_rpc("initialize_from_config", + args=(kv_cache_configs, )) self.collective_rpc("compile_or_warm_up_model") def determine_available_memory(self) -> int: # in bytes diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index 10154a752393..d5f05e26dba0 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -2,7 +2,7 @@ """A GPU worker class.""" import gc import os -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, Optional import torch import torch.distributed @@ -185,9 +185,8 @@ def determine_available_memory(self) -> int: def get_kv_cache_spec(self) -> KVCacheSpec: return self.model_runner.get_kv_cache_spec() - def initialize_cache(self, kv_cache_configs: List[KVCacheConfig]) -> None: + def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None: """Allocate GPU KV cache with the specified kv_cache_config.""" - kv_cache_config = kv_cache_configs[self.rank] if self.vllm_config.model_config.enable_sleep_mode: allocator = CuMemAllocator.get_instance() context = allocator.use_memory_pool(tag="kv_cache") diff --git a/vllm/v1/worker/tpu_worker.py b/vllm/v1/worker/tpu_worker.py index f29edd34ede3..726bf4e1ee5b 100644 --- a/vllm/v1/worker/tpu_worker.py +++ b/vllm/v1/worker/tpu_worker.py @@ -170,9 +170,8 @@ def get_model(self) -> nn.Module: def get_kv_cache_spec(self) -> KVCacheSpec: return self.model_runner.get_kv_cache_spec() - def initialize_cache(self, kv_cache_configs: List[KVCacheConfig]) -> None: + def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None: """Allocate GPU KV cache with the specified kv_cache_config.""" - kv_cache_config = kv_cache_configs[self.rank] self.model_runner.initialize_kv_cache(kv_cache_config) def check_health(self) -> None: diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 190429074d56..12caf771bf2c 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -567,6 +567,10 @@ def init_worker(self, all_kwargs: List[Dict[str, Any]]) -> None: self.worker = worker_class(**kwargs) assert self.worker is not None + def initialize_from_config(self, kv_cache_configs: List[Any]) -> None: + kv_cache_config = kv_cache_configs[self.rpc_rank] + self.worker.initialize_from_config(kv_cache_config) + def execute_method(self, method: Union[str, bytes], *args, **kwargs): try: target = self if self.worker is None else self.worker From dfefe4e4fe902e2139541c4c1b6294a8d5ba4435 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:24:53 +0800 Subject: [PATCH 03/21] fix output Signed-off-by: youkaichao --- vllm/v1/worker/gpu_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index d5f05e26dba0..af07d704e74a 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -224,7 +224,7 @@ def execute_model( scheduler_output: "SchedulerOutput", ) -> Optional[ModelRunnerOutput]: output = self.model_runner.execute_model(scheduler_output) - return output if self.rank == 0 else None + return output if self.is_driver_worker else None def profile(self, is_start: bool = True): if self.profiler is None: From f7bdc6deda424c90ef0bb8511a990ada8e7c1323 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:27:06 +0800 Subject: [PATCH 04/21] add config Signed-off-by: youkaichao --- vllm/v1/engine/llm_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 6b7de4deed39..a77d96df47b8 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -44,6 +44,7 @@ def __init__( use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: + self.vllm_config = vllm_config self.model_config = vllm_config.model_config self.cache_config = vllm_config.cache_config From e52e49e3decfea49ee82146a8baab55c855bb2fb Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:30:24 +0800 Subject: [PATCH 05/21] add determine_available_memory Signed-off-by: youkaichao --- vllm/executor/uniproc_executor.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/vllm/executor/uniproc_executor.py b/vllm/executor/uniproc_executor.py index e041215de660..24589444d970 100644 --- a/vllm/executor/uniproc_executor.py +++ b/vllm/executor/uniproc_executor.py @@ -120,6 +120,16 @@ def _init_executor(self) -> None: self.collective_rpc("init_device") self.collective_rpc("load_model") + def determine_available_memory(self) -> int: # in bytes + # same as determine_num_available_blocks below, + # we need to get the min across all ranks. + memory = super().determine_available_memory() + from vllm.distributed.parallel_state import get_world_group + cpu_group = get_world_group().cpu_group + memory_tensor = torch.tensor([memory], device="cpu", dtype=torch.int64) + dist.all_reduce(memory_tensor, group=cpu_group, op=dist.ReduceOp.MIN) + return memory_tensor.item() + def determine_num_available_blocks(self) -> Tuple[int, int]: """ Determine the number of available KV blocks. From 96f557b5a7e7fd1adc55d30d81235f3f923a5028 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:31:43 +0800 Subject: [PATCH 06/21] add more tests Signed-off-by: youkaichao --- tests/distributed/test_torchrun_example.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/distributed/test_torchrun_example.py b/tests/distributed/test_torchrun_example.py index a092a548a59c..5cafc3b54cb8 100644 --- a/tests/distributed/test_torchrun_example.py +++ b/tests/distributed/test_torchrun_example.py @@ -47,6 +47,9 @@ def test_consistent_across_ranks(obj): llm.llm_engine.vllm_config.cache_config.num_cpu_blocks) test_consistent_across_ranks( llm.llm_engine.vllm_config.cache_config.num_gpu_blocks) +test_consistent_across_ranks( + len(llm.llm_engine.model_executor.driver_worker.worker.model_runner.model. + parameters)) # all ranks should have the same outputs for output in outputs: From ebf9a334976cc595167fd62fd457529cdbfecb84 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:43:25 +0800 Subject: [PATCH 07/21] fix more compatibility Signed-off-by: youkaichao --- vllm/executor/uniproc_executor.py | 10 ---------- vllm/v1/engine/llm_engine.py | 4 ++++ vllm/v1/executor/abstract.py | 14 +++++++++++++- vllm/worker/worker_base.py | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/vllm/executor/uniproc_executor.py b/vllm/executor/uniproc_executor.py index 24589444d970..e041215de660 100644 --- a/vllm/executor/uniproc_executor.py +++ b/vllm/executor/uniproc_executor.py @@ -120,16 +120,6 @@ def _init_executor(self) -> None: self.collective_rpc("init_device") self.collective_rpc("load_model") - def determine_available_memory(self) -> int: # in bytes - # same as determine_num_available_blocks below, - # we need to get the min across all ranks. - memory = super().determine_available_memory() - from vllm.distributed.parallel_state import get_world_group - cpu_group = get_world_group().cpu_group - memory_tensor = torch.tensor([memory], device="cpu", dtype=torch.int64) - dist.all_reduce(memory_tensor, group=cpu_group, op=dist.ReduceOp.MIN) - return memory_tensor.item() - def determine_num_available_blocks(self) -> Tuple[int, int]: """ Determine the number of available KV blocks. diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index a77d96df47b8..19440c3b68b5 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -77,6 +77,10 @@ def __init__( log_stats=False, # FIXME: implement ) + if not multiprocess_mode: + # for v0 compatibility + self.model_executor = self.engine_core.engine_core.model_executor # type: ignore + @classmethod def from_engine_args( cls, diff --git a/vllm/v1/executor/abstract.py b/vllm/v1/executor/abstract.py index 6956c4d64ffa..e5a4fcc28508 100644 --- a/vllm/v1/executor/abstract.py +++ b/vllm/v1/executor/abstract.py @@ -3,6 +3,9 @@ from concurrent.futures import Future from typing import List, Type, Union +import torch +import torch.distributed as dist + from vllm.config import VllmConfig from vllm.executor.executor_base import ExecutorBase from vllm.executor.uniproc_executor import ( # noqa @@ -91,4 +94,13 @@ class UniProcExecutor(UniProcExecutorV0, Executor): class ExecutorWithExternalLauncher(ExecutorWithExternalLauncherV0, Executor): - pass + + def determine_available_memory(self) -> int: # in bytes + # same as determine_num_available_blocks below, + # we need to get the min across all ranks. + memory = super().determine_available_memory() + from vllm.distributed.parallel_state import get_world_group + cpu_group = get_world_group().cpu_group + memory_tensor = torch.tensor([memory], device="cpu", dtype=torch.int64) + dist.all_reduce(memory_tensor, group=cpu_group, op=dist.ReduceOp.MIN) + return memory_tensor.item() diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 12caf771bf2c..428ecf420229 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -569,7 +569,7 @@ def init_worker(self, all_kwargs: List[Dict[str, Any]]) -> None: def initialize_from_config(self, kv_cache_configs: List[Any]) -> None: kv_cache_config = kv_cache_configs[self.rpc_rank] - self.worker.initialize_from_config(kv_cache_config) + self.worker.initialize_from_config(kv_cache_config) # type: ignore def execute_method(self, method: Union[str, bytes], *args, **kwargs): try: From 1b5575b84142b2908d4cfdb4650e0e592499f7cc Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:44:50 +0800 Subject: [PATCH 08/21] fix tests Signed-off-by: youkaichao --- tests/distributed/test_torchrun_example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/distributed/test_torchrun_example.py b/tests/distributed/test_torchrun_example.py index 5cafc3b54cb8..bf98ce5b0f03 100644 --- a/tests/distributed/test_torchrun_example.py +++ b/tests/distributed/test_torchrun_example.py @@ -49,7 +49,7 @@ def test_consistent_across_ranks(obj): llm.llm_engine.vllm_config.cache_config.num_gpu_blocks) test_consistent_across_ranks( len(llm.llm_engine.model_executor.driver_worker.worker.model_runner.model. - parameters)) + parameters())) # all ranks should have the same outputs for output in outputs: From 4e2623af3bd8bdff9620613759441c9bfad0a8eb Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:46:40 +0800 Subject: [PATCH 09/21] fix tests Signed-off-by: youkaichao --- tests/distributed/test_torchrun_example.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/distributed/test_torchrun_example.py b/tests/distributed/test_torchrun_example.py index bf98ce5b0f03..b941ed835e2e 100644 --- a/tests/distributed/test_torchrun_example.py +++ b/tests/distributed/test_torchrun_example.py @@ -47,9 +47,9 @@ def test_consistent_across_ranks(obj): llm.llm_engine.vllm_config.cache_config.num_cpu_blocks) test_consistent_across_ranks( llm.llm_engine.vllm_config.cache_config.num_gpu_blocks) -test_consistent_across_ranks( - len(llm.llm_engine.model_executor.driver_worker.worker.model_runner.model. - parameters())) +params = list(llm.llm_engine.model_executor.driver_worker.worker.model_runner. + model.parameters()) +test_consistent_across_ranks(len(params)) # all ranks should have the same outputs for output in outputs: From d3d8230f8292eb8dfa969f35883f9e1c86e16fe6 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:54:12 +0800 Subject: [PATCH 10/21] add tests Signed-off-by: youkaichao --- .buildkite/test-pipeline.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index 66efe3ed3298..54ad24030d21 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -194,6 +194,7 @@ steps: - tests/v1 commands: # split the test to avoid interference + - VLLM_ENABLE_V1_MULTIPROCESSING=0 VLLM_USE_V1=1 torchrun --nproc-per-node=2 distributed/test_torchrun_example.py - VLLM_USE_V1=1 pytest -v -s v1/core - VLLM_USE_V1=1 pytest -v -s v1/engine - VLLM_USE_V1=1 pytest -v -s v1/sample From 2d862c82e5ff81edeb759e0fff275af4cc418088 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:58:09 +0800 Subject: [PATCH 11/21] comment Signed-off-by: youkaichao --- vllm/v1/executor/abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/executor/abstract.py b/vllm/v1/executor/abstract.py index e5a4fcc28508..11002ad0022d 100644 --- a/vllm/v1/executor/abstract.py +++ b/vllm/v1/executor/abstract.py @@ -96,7 +96,7 @@ class UniProcExecutor(UniProcExecutorV0, Executor): class ExecutorWithExternalLauncher(ExecutorWithExternalLauncherV0, Executor): def determine_available_memory(self) -> int: # in bytes - # same as determine_num_available_blocks below, + # same as determine_num_available_blocks in v0, # we need to get the min across all ranks. memory = super().determine_available_memory() from vllm.distributed.parallel_state import get_world_group From fbf8b3b4f99aadea30f4c9510837d7a4566cd6f1 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 21 Feb 2025 10:59:30 +0800 Subject: [PATCH 12/21] fix tpu worker Signed-off-by: youkaichao --- vllm/v1/worker/tpu_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/v1/worker/tpu_worker.py b/vllm/v1/worker/tpu_worker.py index 726bf4e1ee5b..c236f263eddb 100644 --- a/vllm/v1/worker/tpu_worker.py +++ b/vllm/v1/worker/tpu_worker.py @@ -36,6 +36,7 @@ def __init__( distributed_init_method: str, is_driver_worker: bool = False, ): + self.is_driver_worker = is_driver_worker self.vllm_config = vllm_config self.model_config = vllm_config.model_config self.cache_config = vllm_config.cache_config @@ -151,7 +152,7 @@ def execute_model( scheduler_output: "SchedulerOutput", ) -> Optional[ModelRunnerOutput]: output = self.model_runner.execute_model(scheduler_output) - return output if self.rank == 0 else None + return output if self.is_driver_worker else None def load_model(self) -> None: self.model_runner.load_model() From 1eaf5c7c02085909d16004a200e2d6f695bab8cd Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 13:58:16 +0800 Subject: [PATCH 13/21] auto set env var Signed-off-by: youkaichao --- vllm/config.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vllm/config.py b/vllm/config.py index d3139b5fd84e..6bcf34c3cff9 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -1407,6 +1407,11 @@ def __post_init__(self) -> None: self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT self.world_size_across_dp = self.world_size * self.data_parallel_size + if self.distributed_executor_backend == "external_launcher": + import os + os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0" + logger.info("Disabling V1 multiprocessing for external launcher.") + ray_only_devices = ["tpu"] from vllm.platforms import current_platform if (current_platform.device_type in ray_only_devices From 5ea12fd5148f6ba4330085a8ef0d54bd9781dabe Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 13:59:00 +0800 Subject: [PATCH 14/21] update tests Signed-off-by: youkaichao --- .buildkite/test-pipeline.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index f43816bd6beb..3b1dc20248b8 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -196,7 +196,7 @@ steps: - tests/v1 commands: # split the test to avoid interference - - VLLM_ENABLE_V1_MULTIPROCESSING=0 VLLM_USE_V1=1 torchrun --nproc-per-node=2 distributed/test_torchrun_example.py + - VLLM_USE_V1=1 torchrun --nproc-per-node=2 distributed/test_torchrun_example.py - VLLM_USE_V1=1 pytest -v -s v1/core - VLLM_USE_V1=1 pytest -v -s v1/engine - VLLM_USE_V1=1 pytest -v -s v1/sample From e1bee76fbb0abf59d4bbc17a91af55e83eccb37f Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 14:09:32 +0800 Subject: [PATCH 15/21] lazily read env var Signed-off-by: youkaichao --- vllm/v1/engine/llm_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 69de9a80575a..33b1ddc0f6fe 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -4,10 +4,10 @@ from typing_extensions import TypeVar +import vllm.envs as envs from vllm.config import ParallelConfig, VllmConfig from vllm.engine.arg_utils import EngineArgs from vllm.engine.metrics_types import StatLoggerBase -from vllm.envs import VLLM_ENABLE_V1_MULTIPROCESSING from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType from vllm.logger import init_logger from vllm.lora.request import LoRARequest @@ -102,7 +102,7 @@ def from_engine_args( vllm_config = engine_args.create_engine_config(usage_context) executor_class = Executor.get_class(vllm_config) - if VLLM_ENABLE_V1_MULTIPROCESSING: + if envs.VLLM_ENABLE_V1_MULTIPROCESSING: logger.debug("Enabling multiprocessing for LLMEngine.") enable_multiprocessing = True From 1588ed06d9c29f683e97a0431591cd45fe4e0900 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 14:12:06 +0800 Subject: [PATCH 16/21] add comments Signed-off-by: youkaichao --- tests/distributed/test_torchrun_example.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/distributed/test_torchrun_example.py b/tests/distributed/test_torchrun_example.py index b941ed835e2e..1c6c28b4ed35 100644 --- a/tests/distributed/test_torchrun_example.py +++ b/tests/distributed/test_torchrun_example.py @@ -47,6 +47,9 @@ def test_consistent_across_ranks(obj): llm.llm_engine.vllm_config.cache_config.num_cpu_blocks) test_consistent_across_ranks( llm.llm_engine.vllm_config.cache_config.num_gpu_blocks) + +# make sure we can access the model parameters from the calling process +# of the `LLM` instance. params = list(llm.llm_engine.model_executor.driver_worker.worker.model_runner. model.parameters()) test_consistent_across_ranks(len(params)) From 2f4c0051bbc323d44190b21f108a9eaf136144d3 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 15:28:39 +0800 Subject: [PATCH 17/21] move the test to v1 Signed-off-by: youkaichao --- .buildkite/test-pipeline.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index 3b1dc20248b8..a3e4dbaa3094 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -196,7 +196,6 @@ steps: - tests/v1 commands: # split the test to avoid interference - - VLLM_USE_V1=1 torchrun --nproc-per-node=2 distributed/test_torchrun_example.py - VLLM_USE_V1=1 pytest -v -s v1/core - VLLM_USE_V1=1 pytest -v -s v1/engine - VLLM_USE_V1=1 pytest -v -s v1/sample @@ -504,6 +503,7 @@ steps: - entrypoints/llm/test_collective_rpc.py commands: - pytest -v -s entrypoints/llm/test_collective_rpc.py + - VLLM_USE_V1=1 torchrun --nproc-per-node=2 distributed/test_torchrun_example.py - torchrun --nproc-per-node=2 distributed/test_torchrun_example.py - pytest -v -s ./compile/test_basic_correctness.py - pytest -v -s ./compile/test_wrapper.py From 1983cd40e21f2d1cf6c28a4db9114b813ced4f90 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 16:40:35 +0800 Subject: [PATCH 18/21] use wrapper Signed-off-by: youkaichao --- vllm/v1/executor/multiproc_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 14492f273ed3..e82142c75e39 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -218,7 +218,7 @@ def __init__( "distributed_init_method": distributed_init_method, } wrapper.init_worker(all_kwargs) - self.worker = wrapper.worker + self.worker = wrapper pid = os.getpid() _add_prefix(sys.stdout, f"VllmWorker rank={rank}", pid) @@ -239,7 +239,7 @@ def __init__( ready_socket.send_string(WorkerProc.READY_STR) ready_socket.send(payload) - wrapper.init_device() + self.worker.init_device() self.worker.load_model() @staticmethod From b10ad89c74b2e14a8bae3e5b74042aa53dc7d3e5 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 17:05:22 +0800 Subject: [PATCH 19/21] fix Signed-off-by: youkaichao --- vllm/v1/executor/multiproc_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index e82142c75e39..d4582122fa6d 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -216,6 +216,7 @@ def __init__( "local_rank": local_rank, "rank": rank, "distributed_init_method": distributed_init_method, + "is_driver_worker": rank == 0, } wrapper.init_worker(all_kwargs) self.worker = wrapper From cce7f49586f9b318089d5896384b23af881cfcb8 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 18:20:43 +0800 Subject: [PATCH 20/21] add comments Signed-off-by: youkaichao --- vllm/worker/worker_base.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index eeef7d05ad63..445c0d3285bf 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -578,8 +578,11 @@ def init_device(self): def execute_method(self, method: Union[str, bytes], *args, **kwargs): try: - target = self if self.worker is None else self.worker - return run_method(target, method, args, kwargs) + # method resolution order: + # if a method is defined in this class, it will be called directly. + # otherwise, since we define `__getattr__` and redirect attribute + # query to `self.worker`, the method will be called on the worker. + return run_method(self, method, args, kwargs) except Exception as e: # if the driver worker also execute methods, # exceptions in the rest worker may cause deadlock in rpc like ray From 9f4888b6efc9c53c231c6e7da7cbd432b4d14a39 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sun, 23 Feb 2025 19:52:24 +0800 Subject: [PATCH 21/21] fix method resolution order Signed-off-by: youkaichao --- vllm/executor/ray_distributed_executor.py | 2 +- vllm/executor/ray_utils.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm/executor/ray_distributed_executor.py b/vllm/executor/ray_distributed_executor.py index 79ca45d55d96..b866413e3a62 100644 --- a/vllm/executor/ray_distributed_executor.py +++ b/vllm/executor/ray_distributed_executor.py @@ -541,7 +541,7 @@ def _compiled_ray_dag(self, enable_asyncio: bool): # and the TP group executes in SPMD fashion. if self.use_v1: outputs = [ - worker.execute_model. + worker.execute_model_ray. bind( # type: ignore[attr-defined] outputs[i]) for i, worker in enumerate(tp_group) ] diff --git a/vllm/executor/ray_utils.py b/vllm/executor/ray_utils.py index 8ad466a5572e..1734c670bf10 100644 --- a/vllm/executor/ray_utils.py +++ b/vllm/executor/ray_utils.py @@ -112,10 +112,12 @@ def setup_device_if_necessary(self): torch.cuda.set_device(self.worker.device) self.compiled_dag_cuda_device_set = True - def execute_model( + def execute_model_ray( self, scheduler_output: "SchedulerOutput", ) -> "ModelRunnerOutput": + # this method is used to compile ray CG, + # and it needs a special logic of self.setup_device_if_necessary() self.setup_device_if_necessary() assert self.worker is not None, "Worker is not initialized" if isinstance(scheduler_output, tuple):