From 71bd6889d5059955e89773e1cd312aa58157902e Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 1 Jul 2025 17:34:14 -0700 Subject: [PATCH] [DP] Copy environment variables to all Ray DPEngineCoreActors Signed-off-by: Rui Qiao --- vllm/executor/ray_distributed_executor.py | 33 ++--------- vllm/ray/ray_env.py | 71 +++++++++++++++++++++++ vllm/v1/engine/utils.py | 24 +++++--- 3 files changed, 93 insertions(+), 35 deletions(-) create mode 100644 vllm/ray/ray_env.py diff --git a/vllm/executor/ray_distributed_executor.py b/vllm/executor/ray_distributed_executor.py index 84e8ddd8e274..6f11dcd19e9c 100644 --- a/vllm/executor/ray_distributed_executor.py +++ b/vllm/executor/ray_distributed_executor.py @@ -2,7 +2,6 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import asyncio -import json import os from collections import defaultdict from dataclasses import dataclass @@ -20,6 +19,7 @@ from vllm.logger import init_logger from vllm.model_executor.layers.sampler import SamplerOutput from vllm.platforms import current_platform +from vllm.ray.ray_env import get_env_vars_to_copy from vllm.sequence import ExecuteModelRequest from vllm.utils import (_run_task_with_lock, get_distributed_init_method, get_ip, get_open_port, make_async) @@ -58,17 +58,6 @@ class RayDistributedExecutor(DistributedExecutorBase): "VLLM_HOST_IP", "VLLM_HOST_PORT", "LOCAL_RANK", "CUDA_VISIBLE_DEVICES" } - config_home = envs.VLLM_CONFIG_ROOT - # This file contains a list of env vars that should not be copied - # from the driver to the Ray workers. - non_carry_over_env_vars_file = os.path.join( - config_home, "ray_non_carry_over_env_vars.json") - if os.path.exists(non_carry_over_env_vars_file): - with open(non_carry_over_env_vars_file) as f: - non_carry_over_env_vars = set(json.load(f)) - else: - non_carry_over_env_vars = set() - uses_ray: bool = True def _init_executor(self) -> None: @@ -335,13 +324,10 @@ def sort_by_driver_then_worker_ip(item: RayWorkerMetaData): } for (node_id, _) in worker_node_and_gpu_ids] # Environment variables to copy from driver to workers - env_vars_to_copy = [ - v for v in envs.environment_variables - if v not in self.WORKER_SPECIFIC_ENV_VARS - and v not in self.non_carry_over_env_vars - ] - - env_vars_to_copy.extend(current_platform.additional_env_vars) + env_vars_to_copy = get_env_vars_to_copy( + exclude_vars=self.WORKER_SPECIFIC_ENV_VARS, + additional_vars=set(current_platform.additional_env_vars), + destination="workers") # Copy existing env vars to each worker's args for args in all_args_to_update_environment_variables: @@ -350,15 +336,6 @@ def sort_by_driver_then_worker_ip(item: RayWorkerMetaData): if name in os.environ: args[name] = os.environ[name] - logger.info("non_carry_over_env_vars from config: %s", - self.non_carry_over_env_vars) - logger.info( - "Copying the following environment variables to workers: %s", - [v for v in env_vars_to_copy if v in os.environ]) - logger.info( - "If certain env vars should NOT be copied to workers, add them to " - "%s file", self.non_carry_over_env_vars_file) - self._env_vars_for_all_workers = ( all_args_to_update_environment_variables) diff --git a/vllm/ray/ray_env.py b/vllm/ray/ray_env.py new file mode 100644 index 000000000000..716d0bfafae5 --- /dev/null +++ b/vllm/ray/ray_env.py @@ -0,0 +1,71 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import json +import os +from typing import Optional + +import vllm.envs as envs +from vllm.logger import init_logger + +logger = init_logger(__name__) + +CONFIG_HOME = envs.VLLM_CONFIG_ROOT + +# This file contains a list of env vars that should not be copied +# from the driver to the Ray workers. +RAY_NON_CARRY_OVER_ENV_VARS_FILE = os.path.join( + CONFIG_HOME, "ray_non_carry_over_env_vars.json") + +try: + if os.path.exists(RAY_NON_CARRY_OVER_ENV_VARS_FILE): + with open(RAY_NON_CARRY_OVER_ENV_VARS_FILE) as f: + RAY_NON_CARRY_OVER_ENV_VARS = set(json.load(f)) + else: + RAY_NON_CARRY_OVER_ENV_VARS = set() +except json.JSONDecodeError: + logger.warning( + "Failed to parse %s. Using an empty set for non-carry-over env vars.", + RAY_NON_CARRY_OVER_ENV_VARS_FILE) + RAY_NON_CARRY_OVER_ENV_VARS = set() + + +def get_env_vars_to_copy(exclude_vars: Optional[set[str]] = None, + additional_vars: Optional[set[str]] = None, + destination: Optional[str] = None) -> set[str]: + """ + Get the environment variables to copy to downstream Ray actors. + + Example use cases: + - Copy environment variables from RayDistributedExecutor to Ray workers. + - Copy environment variables from RayDPClient to Ray DPEngineCoreActor. + + Args: + exclude_vars: A set of vllm defined environment variables to exclude + from copying. + additional_vars: A set of additional environment variables to copy. + destination: The destination of the environment variables. + Returns: + A set of environment variables to copy. + """ + exclude_vars = exclude_vars or set() + additional_vars = additional_vars or set() + + env_vars_to_copy = { + v + for v in envs.environment_variables + if v not in exclude_vars and v not in RAY_NON_CARRY_OVER_ENV_VARS + } + env_vars_to_copy.update(additional_vars) + + to_destination = " to " + destination if destination is not None else "" + + logger.info("RAY_NON_CARRY_OVER_ENV_VARS from config: %s", + RAY_NON_CARRY_OVER_ENV_VARS) + logger.info("Copying the following environment variables%s: %s", + to_destination, + [v for v in env_vars_to_copy if v in os.environ]) + logger.info( + "If certain env vars should NOT be copied, add them to " + "%s file", RAY_NON_CARRY_OVER_ENV_VARS_FILE) + + return env_vars_to_copy diff --git a/vllm/v1/engine/utils.py b/vllm/v1/engine/utils.py index c4012419411a..ae104bd6eb96 100644 --- a/vllm/v1/engine/utils.py +++ b/vllm/v1/engine/utils.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import contextlib +import os import weakref from collections.abc import Iterator from dataclasses import dataclass @@ -15,6 +16,7 @@ from vllm.config import CacheConfig, ParallelConfig, VllmConfig from vllm.logger import init_logger +from vllm.ray.ray_env import get_env_vars_to_copy from vllm.utils import get_mp_context, get_open_zmq_ipc_path, zmq_socket_ctx from vllm.v1.engine.coordinator import DPCoordinator from vllm.v1.executor.abstract import Executor @@ -164,6 +166,7 @@ def __init__( import copy import ray + from ray.runtime_env import RuntimeEnv from ray.util.scheduling_strategies import ( PlacementGroupSchedulingStrategy) @@ -175,6 +178,12 @@ def __init__( local_engine_count = \ vllm_config.parallel_config.data_parallel_size_local world_size = vllm_config.parallel_config.world_size + env_vars_set = get_env_vars_to_copy(destination="DPEngineCoreActor") + env_vars_dict = { + name: os.environ[name] + for name in env_vars_set if name in os.environ + } + runtime_env = RuntimeEnv(env_vars=env_vars_dict) if ray.is_initialized(): logger.info( @@ -210,13 +219,14 @@ def __init__( scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_bundle_index=world_size, - )).remote(vllm_config=dp_vllm_config, - executor_class=executor_class, - log_stats=log_stats, - local_client=local_client, - addresses=addresses, - dp_rank=index, - local_dp_rank=local_index) + ), + runtime_env=runtime_env).remote(vllm_config=dp_vllm_config, + executor_class=executor_class, + log_stats=log_stats, + local_client=local_client, + addresses=addresses, + dp_rank=index, + local_dp_rank=local_index) if local_client: self.local_engine_actors.append(actor) else: