Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion fastdeploy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
if envs.FD_DEBUG != 1:
import logging

pf_logger.logger.setLevel(logging.INFO)
# Set paddleformers logger to WARNING to suppress INFO logs but still show warnings and errors.
pf_logger.logger.setLevel(logging.WARNING)
import warnings

warnings.filterwarnings("ignore", module="paddleformers")
try:
import use_triton_in_paddle

Expand Down
129 changes: 74 additions & 55 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os
from dataclasses import field
from enum import Enum
from typing import Any, Dict, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Union

import paddle
import paddle.distributed as dist
Expand All @@ -33,9 +33,13 @@
from fastdeploy.platforms import current_platform
from fastdeploy.scheduler import SchedulerConfig
from fastdeploy.transformer_utils.config import get_pooling_config
from fastdeploy.utils import ceil_div, check_unified_ckpt, get_host_ip, get_logger

logger = get_logger("config", "config.log")
from fastdeploy.utils import (
ceil_div,
check_unified_ckpt,
console_logger,
get_host_ip,
llm_logger,
)

TaskOption = Literal["auto", "generate", "embedding", "embed"]

Expand Down Expand Up @@ -314,9 +318,9 @@ def reset_config_value(key, value):
if not hasattr(self, key.lower()):
if os.getenv(key, None):
value = eval(os.getenv(key))
logger.info(f"Get parameter `{key}` = {value} from environment.")
llm_logger.info(f"Get parameter `{key}` = {value} from environment.")
else:
logger.info(f"Parameter `{key}` will use default value {value}.")
llm_logger.info(f"Parameter `{key}` will use default value {value}.")
setattr(self, key.lower(), value)

reset_config_value("COMPRESSION_RATIO", 1.0)
Expand All @@ -334,10 +338,10 @@ def read_model_config(self):
)
elif "torch_dtype" in self.model_config:
self.model_format = "torch"
logger.info("The model format is Hugging Face")
llm_logger.info("The model format is Hugging Face")
elif "dtype" in self.model_config:
self.model_format = "paddle"
logger.info("The model format is Paddle")
llm_logger.info("The model format is Paddle")
else:
raise ValueError(
"Unknown model format. Please ensure your config.json contains "
Expand Down Expand Up @@ -400,7 +404,7 @@ def _get_runner_type(

runner_type = self._get_default_runner_type(architectures)
if runner_type != "generate":
logger.info(
llm_logger.info(
"Resolved `--runner auto` to `--runner %s`. " "Pass the value explicitly to silence this message.",
runner_type,
)
Expand All @@ -419,7 +423,7 @@ def _get_convert_type(
convert_type = self._get_default_convert_type(architectures, runner_type)

if convert_type != "none":
logger.info(
llm_logger.info(
"Resolved `--convert auto` to `--convert %s`. " "Pass the value explicitly to silence this message.",
convert_type,
)
Expand Down Expand Up @@ -512,10 +516,10 @@ def print(self):
"""
Print all configuration information.
"""
logger.info("Model Configuration Information :")
llm_logger.info("Model Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
llm_logger.info("{:<20}:{:<6}{}".format(k, "", v))
llm_logger.info("=============================================================")


class ParallelConfig:
Expand Down Expand Up @@ -559,7 +563,7 @@ def __init__(
setattr(self, key, value)
if isinstance(self.engine_worker_queue_port, str):
self.engine_worker_queue_port = [int(port) for port in self.engine_worker_queue_port.split(",")]
logger.info(f"engine_worker_queue_port: {self.engine_worker_queue_port}")
llm_logger.info(f"engine_worker_queue_port: {self.engine_worker_queue_port}")
elif isinstance(self.engine_worker_queue_port, int):
self.engine_worker_queue_port = [self.engine_worker_queue_port]
# currently, the expert parallel size is equal data parallel size
Expand All @@ -586,7 +590,7 @@ def __init__(
and self.expert_parallel_size > 1
and self.tensor_parallel_size > 1
)
logger.info(f"use_sequence_parallel_moe: {self.use_sequence_parallel_moe}")
llm_logger.info(f"use_sequence_parallel_moe: {self.use_sequence_parallel_moe}")

def set_communicate_group(self):
# different tp group id
Expand All @@ -606,7 +610,7 @@ def set_communicate_group(self):
dist.collective._set_custom_gid(self.data_parallel_size + tp_gid_offset)
self.ep_group = dist.new_group(range(self.expert_parallel_size))
dist.collective._set_custom_gid(None)
logger.info(
llm_logger.info(
f"data_parallel_size: {self.data_parallel_size}, tensor_parallel_size: {self.tensor_parallel_size}, expert_parallel_size: {self.expert_parallel_size}, data_parallel_rank: {self.data_parallel_rank}, tensor_parallel_rank: {self.tensor_parallel_rank}, expert_parallel_rank: {self.expert_parallel_rank}, tp_group: {self.tp_group}."
)

Expand All @@ -615,10 +619,10 @@ def print(self):
print all config

"""
logger.info("Parallel Configuration Information :")
llm_logger.info("Parallel Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
llm_logger.info("{:<20}:{:<6}{}".format(k, "", v))
llm_logger.info("=============================================================")


class SpeculativeConfig:
Expand Down Expand Up @@ -729,10 +733,10 @@ def print(self):
print all config

"""
logger.info("Speculative Decoding Configuration Information :")
llm_logger.info("Speculative Decoding Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
llm_logger.info("{:<20}:{:<6}{}".format(k, "", v))
llm_logger.info("=============================================================")

def check_legality_parameters(
self,
Expand All @@ -752,7 +756,7 @@ def check_legality_parameters(

if self.method in ["mtp", "hybrid_mtp_ngram"]:
if self.num_speculative_tokens < self.num_model_steps:
logger.warning(
llm_logger.warning(
f"Get num_model_steps > num_speculative_tokens. Reset num_speculative_tokens to {self.num_model_steps}"
)
self.num_speculative_tokens = self.num_model_steps
Expand Down Expand Up @@ -864,7 +868,7 @@ def init_with_cudagrpah_size(self, max_capture_size: int = 0) -> None:
self.cudagraph_capture_sizes = [size for size in self.cudagraph_capture_sizes if size <= max_capture_size]
dedup_sizes = list(set(self.cudagraph_capture_sizes))
if len(dedup_sizes) < len(self.cudagraph_capture_sizes):
logger.info(
llm_logger.info(
("cudagraph sizes specified by model runner" " %s is overridden by config %s"),
self.cudagraph_capture_sizes,
dedup_sizes,
Expand Down Expand Up @@ -1298,7 +1302,7 @@ def postprocess(self, num_total_tokens, number_of_tasks):
block_num = (length + self.block_size - 1 + self.dec_token_num) // self.block_size
self.total_block_num = block_num * number_of_tasks
self.prefill_kvcache_block_num = self.total_block_num
logger.info(f"Doing profile, the total_block_num:{self.total_block_num}")
llm_logger.info(f"Doing profile, the total_block_num:{self.total_block_num}")

def reset(self, num_gpu_blocks):
"""
Expand All @@ -1309,23 +1313,26 @@ def reset(self, num_gpu_blocks):
self.prefill_kvcache_block_num = self.total_block_num
else:
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
logger.info(
llm_logger.info(
f"Reset block num, the total_block_num:{self.total_block_num},"
f" prefill_kvcache_block_num:{self.prefill_kvcache_block_num}"
)
assert (
self.prefill_kvcache_block_num >= self.max_block_num_per_seq
), f"current block number :{self.prefill_kvcache_block_num} should be greater than or equal to current model len needed minimum block number :{self.max_block_num_per_seq}"
if self.prefill_kvcache_block_num < self.max_block_num_per_seq:
available_cache_tokens = self.prefill_kvcache_block_num * self.block_size
console_logger.error(
f"The current KV Cache can only support caching {available_cache_tokens} tokens, which is less than the set max_model_len={self.model_cfg.max_model_len}. Please deploy this model on GPUs with larger memory or reduce your `max_model_len` to {available_cache_tokens} or less."
)
raise RuntimeError("Resource is not sufficient.")

def print(self):
"""
print all config

"""
logger.info("Cache Configuration Information :")
llm_logger.info("Cache Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
llm_logger.info("{:<20}:{:<6}{}".format(k, "", v))
llm_logger.info("=============================================================")


class RouterConfig:
Expand Down Expand Up @@ -1388,19 +1395,19 @@ def _load_from_version_file(self, file_path: str = None):
elif line.startswith("CXX compiler version:"):
self.compiler_version = line.split(":")[1].strip()
except FileNotFoundError:
logger.info(f"Warning: Version file not found at {file_path}")
llm_logger.info(f"Warning: Version file not found at {file_path}")
except Exception as e:
logger.info(f"Warning: Could not read version file - {e!s}")
llm_logger.info(f"Warning: Could not read version file - {e!s}")

def print(self):
"""
print all config

"""
logger.info("Fasedeploy Commit Information :")
llm_logger.info("Fasedeploy Commit Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
llm_logger.info("{:<20}:{:<6}{}".format(k, "", v))
llm_logger.info("=============================================================")


class StructuredOutputsConfig:
Expand Down Expand Up @@ -1453,6 +1460,7 @@ def __init__(
use_warmup: bool = False,
limit_mm_per_prompt: Optional[Dict[str, Any]] = None,
mm_processor_kwargs: Optional[Dict[str, Any]] = None,
innode_prefill_ports: Optional[List[int]] = None,
max_num_partial_prefills: int = 1,
max_long_partial_prefills: int = 1,
long_prefill_token_threshold: int = 0,
Expand Down Expand Up @@ -1516,10 +1524,13 @@ def __init__(
self.limit_mm_per_prompt = limit_mm_per_prompt
self.mm_processor_kwargs = mm_processor_kwargs
self.use_warmup = use_warmup
self.innode_prefill_ports = innode_prefill_ports
self.max_num_partial_prefills = max_num_partial_prefills
self.max_long_partial_prefills = max_long_partial_prefills
self.long_prefill_token_threshold = long_prefill_token_threshold

self._str_to_list("innode_prefill_ports", int)

if envs.FD_FOR_TORCH_MODEL_FORMAT:
self.model_config.model_format = "torch"

Expand Down Expand Up @@ -1597,7 +1608,7 @@ def postprocess(self):
and self.structured_outputs_config.guided_decoding_backend == "auto"
):
if current_platform.is_xpu() or self.speculative_config.method is not None:
logger.warning("Speculative Decoding and XPU currently do not support Guided decoding, set off.")
llm_logger.warning("Speculative Decoding and XPU currently do not support Guided decoding, set off.")
self.structured_outputs_config.guided_decoding_backend = "off"
else:
self.structured_outputs_config.guided_decoding_backend = "xgrammar"
Expand All @@ -1607,7 +1618,7 @@ def postprocess(self):
self.cache_config.max_encoder_cache = self.scheduler_config.max_num_batched_tokens
elif self.cache_config.max_encoder_cache != 0:
if self.cache_config.max_encoder_cache < self.scheduler_config.max_num_batched_tokens:
logger.warning(
llm_logger.warning(
f"max_encoder_cache{self.cache_config.max_encoder_cache} is less than "
f"max_num_batched_tokens{self.scheduler_config.max_num_batched_tokens}, "
f"set to max_num_batched_tokens."
Expand All @@ -1621,16 +1632,16 @@ def postprocess(self):
self.graph_opt_config.use_cudagraph = self.graph_opt_config.cudagraph_only_prefill
if self.load_config is not None and self.load_config.dynamic_load_weight is True:
self.graph_opt_config.graph_opt_level = 0
logger.info(
llm_logger.info(
"Static Graph does not support to be started together with RL Training, and automatically switch to dynamic graph!"
)
if self.device_config is not None and self.device_config.device_type != "cuda":
self.graph_opt_config.use_cudagraph = False
logger.info(f"CUDAGraph only support on GPU, current device type is {self.device_config.device_type}!")
llm_logger.info(f"CUDAGraph only support on GPU, current device type is {self.device_config.device_type}!")

if self.model_config.enable_mm and self.graph_opt_config.use_cudagraph:
self.cache_config.enable_prefix_caching = False
logger.info("Multi-modal models do not support prefix caching when using CUDAGraph!")
llm_logger.info("Multi-modal models do not support prefix caching when using CUDAGraph!")

if self.scheduler_config.splitwise_role == "mixed":
self.model_config.moe_phase = MoEPhase(phase="prefill")
Expand Down Expand Up @@ -1747,11 +1758,11 @@ def print(self):
"""
print all config
"""
logger.info("=================== Configuration Information ===============")
llm_logger.info("=================== Configuration Information ===============")
for k, v in self.__dict__.items():
if k == "generation_config" and v is not None:
for gck, gcv in v.to_dict().items():
logger.info("{:<20}:{:<6}{}".format(gck, "", gcv))
llm_logger.info("{:<20}:{:<6}{}".format(gck, "", gcv))
elif (
k == "cache_config"
or k == "model_config"
Expand All @@ -1762,22 +1773,30 @@ def print(self):
if v is not None:
v.print()
else:
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
llm_logger.info("{:<20}:{:<6}{}".format(k, "", v))
llm_logger.info("=============================================================")

def init_cache_info(self):
"""
initialize cache info
"""
# TODO: group the splitiwse params
# There are two methods for splitwise deployment:
# 1. v0 splitwise_scheduler or dp_scheduler
# 2. v1 local_scheduler + router
# TODO: group the splitwise params, remove code of v0
# v0 requires prefill and decode in one node and it uses local scheduler
# v1 supports prefill and decode in multi node and it uses splitwise or dp scheduler
# v2 supports prefill and decode in multi node and it uses router and local scheduler
self.splitwise_version = None
if self.scheduler_config.name in ("splitwise", "dp"):
if self.scheduler_config.name == "local" and (self.router_config is None or self.router_config.router is None):
self.splitwise_version = "v0"
elif self.scheduler_config.name == "local" and self.router_config and self.router_config.router:
elif self.scheduler_config.name in ("splitwise", "dp"):
self.splitwise_version = "v1"
elif self.scheduler_config.name == "local" and self.router_config and self.router_config.router:
self.splitwise_version = "v2"
else:
raise ValueError(
f"Unsupported scheduler mode, scheduler_name: {self.scheduler_config.name}, "
f"router_config: {self.router_config}"
)
llm_logger.info(f"splitwise_version: {self.splitwise_version}")

if isinstance(self.parallel_config.engine_worker_queue_port, (int, str)):
engine_worker_queue_port = self.parallel_config.engine_worker_queue_port
Expand Down Expand Up @@ -1807,7 +1826,7 @@ def init_cache_info(self):
"port": connector_port,
"rdma_port": self.cache_config.rdma_comm_ports,
}
logger.info(f"disaggregate_info: {self.disaggregate_info}")
llm_logger.info(f"disaggregate_info: {self.disaggregate_info}")

if self.router_config:
self.register_info = {
Expand All @@ -1820,7 +1839,7 @@ def init_cache_info(self):
"device_ids": self.local_device_ids,
"transfer_protocol": self.cache_config.cache_transfer_protocol.split(","),
}
logger.info(f"register_info: {self.register_info}")
llm_logger.info(f"register_info: {self.register_info}")

def read_from_config(self):
"""
Expand All @@ -1831,7 +1850,7 @@ def reset_value(cls, value_name, key):
if hasattr(cls, key):
value = getattr(cls, key)
setattr(cls, value_name, value)
logger.info(f"Reset parameter {value_name} = {value} from configuration.")
llm_logger.info(f"Reset parameter {value_name} = {value} from configuration.")

reset_value(self.cache_config, "block_size", "infer_model_block_size")
reset_value(
Expand Down
Loading
Loading