Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions tests/basic_correctness/test_basic_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

MODELS = [
"google/gemma-2-2b-it",
"meta-llama/Llama-3.2-1B",
# "meta-llama/Llama-3.2-1B",
]

TARGET_TEST_SUITE = os.environ.get("TARGET_TEST_SUITE", "L4")
Expand All @@ -37,10 +37,11 @@ def test_vllm_gc_ed():


@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS", "FLASHINFER"])
# @pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS", "FLASHINFER"])
@pytest.mark.parametrize("backend", ["FLASH_ATTN_VLLM_V1"])
@pytest.mark.parametrize("dtype", ["half"])
@pytest.mark.parametrize("max_tokens", [5])
@pytest.mark.parametrize("enforce_eager", [False, True])
@pytest.mark.parametrize("enforce_eager", [True])
def test_models(
hf_runner,
model: str,
Expand Down Expand Up @@ -73,7 +74,9 @@ def test_models(
max_model_len=8192,
dtype=dtype,
enforce_eager=enforce_eager,
gpu_memory_utilization=0.7) as vllm_model:
distributed_executor_backend="ray",
gpu_memory_utilization=0.7,
tensor_parallel_size=4) as vllm_model:
vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens)

check_outputs_equal(
Expand Down
6 changes: 3 additions & 3 deletions vllm/distributed/parallel_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ def __init__(
from vllm.distributed.device_communicators.shm_broadcast import (
MessageQueue)
self.mq_broadcaster: Optional[MessageQueue] = None
if use_message_queue_broadcaster and self.world_size > 1:
self.mq_broadcaster = MessageQueue.create_from_process_group(
self.cpu_group, 1 << 22, 6)
# if use_message_queue_broadcaster and self.world_size > 1:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not needed anymore for V1?

# self.mq_broadcaster = MessageQueue.create_from_process_group(
# self.cpu_group, 1 << 22, 6)

@property
def first_rank(self):
Expand Down
3 changes: 2 additions & 1 deletion vllm/model_executor/layers/logits_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from vllm.distributed import (tensor_model_parallel_all_gather,
tensor_model_parallel_gather)
from vllm.envs import VLLM_USE_V1
from vllm.model_executor.layers.vocab_parallel_embedding import (
VocabParallelEmbedding)
from vllm.model_executor.sampling_metadata import SamplingMetadata
Expand Down Expand Up @@ -42,7 +43,7 @@ def __init__(self,
# Soft cap the logits. Used in Gemma 2.
self.soft_cap = soft_cap
# Whether to use gather or all-gather to gather the logits.
self.use_gather = not current_platform.is_tpu()
self.use_gather = not current_platform.is_tpu() and not VLLM_USE_V1

def forward(
self,
Expand Down
10 changes: 7 additions & 3 deletions vllm/v1/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from vllm.engine.arg_utils import EngineArgs
from vllm.engine.metrics_types import StatLoggerBase
from vllm.envs import VLLM_ENABLE_V1_MULTIPROCESSING
from vllm.executor.ray_utils import initialize_ray_cluster
from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
Expand All @@ -18,6 +19,7 @@
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
from vllm.v1.executor.gpu_executor import GPUExecutor
from vllm.v1.executor.ray_gpu_executor import RayGPUExecutor

logger = init_logger(__name__)

Expand Down Expand Up @@ -99,7 +101,11 @@ def from_engine_args(

@classmethod
def _get_executor_cls(cls, vllm_config: VllmConfig):
return GPUExecutor
if vllm_config.parallel_config.distributed_executor_backend == "ray":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to have this in AsyncLLM?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remeber to hear AsyncLLM will be removed or something. Can you sync with them before supporting it?

initialize_ray_cluster(vllm_config.parallel_config)
return RayGPUExecutor
else:
return GPUExecutor

def stop_remote_worker_execution_loop(self) -> None:
raise NotImplementedError("TP not implemented yet.")
Expand Down Expand Up @@ -158,8 +164,6 @@ def step(self) -> List[RequestOutput]:

return request_outputs

# TODO(rob): Can we get rid of these?

def get_model_config(self):
return self.model_config

Expand Down
96 changes: 96 additions & 0 deletions vllm/v1/executor/distributed_gpu_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from abc import abstractmethod
from typing import Any, Optional, Tuple

from vllm.config import VllmConfig
from vllm.logger import init_logger
from vllm.v1.core.scheduler import SchedulerOutput
from vllm.v1.outputs import ModelRunnerOutput

logger = init_logger(__name__)


class DistributedGPUExecutor:
"""Abstract superclass of multi-GPU executor implementations."""

def __init__(self, vllm_config: VllmConfig):
self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.cache_config = vllm_config.cache_config
self.lora_config = vllm_config.lora_config
self.load_config = vllm_config.load_config
self.parallel_config = vllm_config.parallel_config
self.scheduler_config = vllm_config.scheduler_config
self.device_config = vllm_config.device_config
self.speculative_config = vllm_config.speculative_config
self.prompt_adapter_config = vllm_config.prompt_adapter_config
self.observability_config = vllm_config.observability_config

def determine_num_available_blocks(self) -> Tuple[int, int]:
"""Determine the number of available KV blocks.

This invokes `determine_num_available_blocks` on each worker and takes
the min of the results, guaranteeing that the selected cache sizes are
compatible with all workers.

Returns:
- tuple[num_gpu_blocks, num_cpu_blocks]
"""
# Get the maximum number of blocks that can be allocated on GPU and CPU.
num_blocks = self._run_workers("determine_num_available_blocks")

# Since we use a shared centralized controller, we take the minimum
# number of blocks across all workers to make sure all the memory
# operators can be applied to all workers.
num_gpu_blocks = min(b[0] for b in num_blocks)
return num_gpu_blocks, 0

def initialize_cache(self, num_gpu_blocks: int) -> None:
"""Initialize the KV cache in all workers.
"""
# NOTE: This is logged in the executor because there can be >1 worker
# with other executors. We could log in the engine level, but work
# remains to abstract away the device for non-GPU configurations.
logger.info("# GPU blocks: %d", num_gpu_blocks)
self._run_workers("initialize_cache", num_gpu_blocks)
self._run_workers("compile_or_warm_up_model")

@abstractmethod
def execute_model(
self,
scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput:
raise NotImplementedError

def save_sharded_state(
self,
path: str,
pattern: Optional[str] = None,
max_size: Optional[int] = None,
) -> None:
self._run_workers("save_sharded_state",
path=path,
pattern=pattern,
max_size=max_size)

@abstractmethod
def _run_workers(
self,
method: str,
*args,
async_run_tensor_parallel_workers_only: bool = False,
max_concurrent_workers: Optional[int] = None,
**kwargs,
) -> Any:
"""Runs the given method on all workers.

Args:
async_run_tensor_parallel_workers_only: If True the method will be
run only in the remote TP workers, not the driver worker.
It will also be run asynchronously and return a list of futures
rather than blocking on the results.
"""
raise NotImplementedError

@abstractmethod
def check_health(self) -> None:
raise NotImplementedError
25 changes: 24 additions & 1 deletion vllm/v1/executor/gpu_executor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os
from typing import Optional, Tuple
from typing import Any, Callable, Dict, Optional, Tuple, Type

from vllm.config import VllmConfig
from vllm.logger import init_logger
from vllm.utils import get_distributed_init_method, get_ip, get_open_port
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.worker.gpu_worker import Worker
from vllm.worker.worker_base import WorkerBase

logger = init_logger(__name__)

Expand Down Expand Up @@ -75,3 +76,25 @@ def check_health(self) -> None:
# GPUExecutor will always be healthy as long as
# it's running.
return

def _get_worker_module_and_class(
self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]:
worker_module_name = "vllm.v1.worker.gpu_worker"
worker_class_name = "Worker"
return worker_module_name, worker_class_name

def _get_worker_kwargs(
self,
local_rank: int = 0,
rank: int = 0,
distributed_init_method: Optional[str] = None) -> Dict[str, Any]:
"""Return worker init args for a given rank."""
if distributed_init_method is None:
distributed_init_method = get_distributed_init_method(
get_ip(), get_open_port())
return dict(
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=rank,
distributed_init_method=distributed_init_method,
)
Loading