Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 5 additions & 28 deletions vllm/executor/ray_distributed_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import asyncio
import json
import os
from collections import defaultdict
from dataclasses import dataclass
Expand All @@ -20,6 +19,7 @@
from vllm.logger import init_logger
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.platforms import current_platform
from vllm.ray.ray_env import get_env_vars_to_copy
from vllm.sequence import ExecuteModelRequest
from vllm.utils import (_run_task_with_lock, get_distributed_init_method,
get_ip, get_open_port, make_async)
Expand Down Expand Up @@ -58,17 +58,6 @@ class RayDistributedExecutor(DistributedExecutorBase):
"VLLM_HOST_IP", "VLLM_HOST_PORT", "LOCAL_RANK", "CUDA_VISIBLE_DEVICES"
}

config_home = envs.VLLM_CONFIG_ROOT
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeaah this logic seems like a hack to me in the first place. Do you know in what context this was used? Who owns the content of the json file? What are the default values in there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #14099 (comment) . This allows customization of env vars.

# This file contains a list of env vars that should not be copied
# from the driver to the Ray workers.
non_carry_over_env_vars_file = os.path.join(
config_home, "ray_non_carry_over_env_vars.json")
if os.path.exists(non_carry_over_env_vars_file):
with open(non_carry_over_env_vars_file) as f:
non_carry_over_env_vars = set(json.load(f))
else:
non_carry_over_env_vars = set()

uses_ray: bool = True

def _init_executor(self) -> None:
Expand Down Expand Up @@ -335,13 +324,10 @@ def sort_by_driver_then_worker_ip(item: RayWorkerMetaData):
} for (node_id, _) in worker_node_and_gpu_ids]

# Environment variables to copy from driver to workers
env_vars_to_copy = [
v for v in envs.environment_variables
if v not in self.WORKER_SPECIFIC_ENV_VARS
and v not in self.non_carry_over_env_vars
]

env_vars_to_copy.extend(current_platform.additional_env_vars)
env_vars_to_copy = get_env_vars_to_copy(
exclude_vars=self.WORKER_SPECIFIC_ENV_VARS,
additional_vars=set(current_platform.additional_env_vars),
destination="workers")

# Copy existing env vars to each worker's args
for args in all_args_to_update_environment_variables:
Expand All @@ -350,15 +336,6 @@ def sort_by_driver_then_worker_ip(item: RayWorkerMetaData):
if name in os.environ:
args[name] = os.environ[name]

logger.info("non_carry_over_env_vars from config: %s",
self.non_carry_over_env_vars)
logger.info(
"Copying the following environment variables to workers: %s",
[v for v in env_vars_to_copy if v in os.environ])
logger.info(
"If certain env vars should NOT be copied to workers, add them to "
"%s file", self.non_carry_over_env_vars_file)

self._env_vars_for_all_workers = (
all_args_to_update_environment_variables)

Expand Down
71 changes: 71 additions & 0 deletions vllm/ray/ray_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import json
import os
from typing import Optional

import vllm.envs as envs
from vllm.logger import init_logger

logger = init_logger(__name__)

CONFIG_HOME = envs.VLLM_CONFIG_ROOT

# This file contains a list of env vars that should not be copied
# from the driver to the Ray workers.
RAY_NON_CARRY_OVER_ENV_VARS_FILE = os.path.join(
CONFIG_HOME, "ray_non_carry_over_env_vars.json")
Comment on lines +16 to +17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this there in the first place? is there really a need to customize it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah @youkaichao suggested this to allow customization: #14099 (comment)


try:
if os.path.exists(RAY_NON_CARRY_OVER_ENV_VARS_FILE):
with open(RAY_NON_CARRY_OVER_ENV_VARS_FILE) as f:
RAY_NON_CARRY_OVER_ENV_VARS = set(json.load(f))
else:
RAY_NON_CARRY_OVER_ENV_VARS = set()
except json.JSONDecodeError:
logger.warning(
"Failed to parse %s. Using an empty set for non-carry-over env vars.",
RAY_NON_CARRY_OVER_ENV_VARS_FILE)
RAY_NON_CARRY_OVER_ENV_VARS = set()


def get_env_vars_to_copy(exclude_vars: Optional[set[str]] = None,
additional_vars: Optional[set[str]] = None,
destination: Optional[str] = None) -> set[str]:
"""
Get the environment variables to copy to downstream Ray actors.
Example use cases:
- Copy environment variables from RayDistributedExecutor to Ray workers.
- Copy environment variables from RayDPClient to Ray DPEngineCoreActor.
Args:
exclude_vars: A set of vllm defined environment variables to exclude
from copying.
additional_vars: A set of additional environment variables to copy.
destination: The destination of the environment variables.
Returns:
A set of environment variables to copy.
"""
exclude_vars = exclude_vars or set()
additional_vars = additional_vars or set()

env_vars_to_copy = {
v
for v in envs.environment_variables
if v not in exclude_vars and v not in RAY_NON_CARRY_OVER_ENV_VARS
}
env_vars_to_copy.update(additional_vars)

to_destination = " to " + destination if destination is not None else ""

logger.info("RAY_NON_CARRY_OVER_ENV_VARS from config: %s",
RAY_NON_CARRY_OVER_ENV_VARS)
logger.info("Copying the following environment variables%s: %s",
to_destination,
[v for v in env_vars_to_copy if v in os.environ])
logger.info(
"If certain env vars should NOT be copied, add them to "
"%s file", RAY_NON_CARRY_OVER_ENV_VARS_FILE)

return env_vars_to_copy
24 changes: 17 additions & 7 deletions vllm/v1/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import contextlib
import os
import weakref
from collections.abc import Iterator
from dataclasses import dataclass
Expand All @@ -15,6 +16,7 @@

from vllm.config import CacheConfig, ParallelConfig, VllmConfig
from vllm.logger import init_logger
from vllm.ray.ray_env import get_env_vars_to_copy
from vllm.utils import get_mp_context, get_open_zmq_ipc_path, zmq_socket_ctx
from vllm.v1.engine.coordinator import DPCoordinator
from vllm.v1.executor.abstract import Executor
Expand Down Expand Up @@ -164,6 +166,7 @@ def __init__(
import copy

import ray
from ray.runtime_env import RuntimeEnv
from ray.util.scheduling_strategies import (
PlacementGroupSchedulingStrategy)

Expand All @@ -175,6 +178,12 @@ def __init__(
local_engine_count = \
vllm_config.parallel_config.data_parallel_size_local
world_size = vllm_config.parallel_config.world_size
env_vars_set = get_env_vars_to_copy(destination="DPEngineCoreActor")
env_vars_dict = {
name: os.environ[name]
for name in env_vars_set if name in os.environ
}
runtime_env = RuntimeEnv(env_vars=env_vars_dict)

if ray.is_initialized():
logger.info(
Expand Down Expand Up @@ -210,13 +219,14 @@ def __init__(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=world_size,
)).remote(vllm_config=dp_vllm_config,
executor_class=executor_class,
log_stats=log_stats,
local_client=local_client,
addresses=addresses,
dp_rank=index,
local_dp_rank=local_index)
),
runtime_env=runtime_env).remote(vllm_config=dp_vllm_config,
executor_class=executor_class,
log_stats=log_stats,
local_client=local_client,
addresses=addresses,
dp_rank=index,
local_dp_rank=local_index)
if local_client:
self.local_engine_actors.append(actor)
else:
Expand Down