Skip to content

Commit 7f06d60

Browse files
authored
Merge branch 'develop' into new_add_prompt_logprobs_online
2 parents b948a52 + 5c8c2d4 commit 7f06d60

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4856
-625
lines changed

.github/workflows/ci_xpu.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ concurrency:
1616

1717
jobs:
1818
CI_XPU:
19+
timeout-minutes: 60
1920
runs-on: [self-hosted, XPU-P800-8Card]
2021
steps:
2122
- name: Print current runner name

custom_ops/xpu_ops/src/ops/moe_expert_ffn.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ std::vector<paddle::Tensor> MoeExpertFFN(
441441
const std::string& quant_method,
442442
const int hadamard_blocksize,
443443
const int valid_token_num) {
444-
if (ffn_in.numel() == 0) {
444+
if (ffn_in.numel() == 0 || valid_token_num == 0) {
445445
paddle::Tensor ffn2_out =
446446
paddle::empty_like(ffn_in, paddle::DataType::BFLOAT16);
447447
return {ffn2_out};

fastdeploy/cache_manager/ops.py

Lines changed: 93 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,107 @@
1+
"""
2+
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
"""
16+
117
import paddle
218

319
from fastdeploy.platforms import current_platform
420

5-
if current_platform.is_cuda():
6-
from fastdeploy.model_executor.ops.gpu import (
7-
cuda_host_alloc,
8-
cuda_host_free,
9-
get_data_ptr_ipc,
10-
get_output_kv_signal,
11-
ipc_sent_key_value_cache_by_remote_ptr,
12-
ipc_sent_key_value_cache_by_remote_ptr_block_sync,
13-
set_data_ipc,
14-
share_external_data,
15-
swap_cache_all_layers,
16-
unset_data_ipc,
17-
)
18-
19-
memory_allocated = paddle.device.cuda.memory_allocated
20-
21-
def get_peer_mem_addr(*args, **kwargs):
22-
raise RuntimeError("CUDA no need of get_peer_mem_addr!")
23-
24-
elif current_platform.is_xpu():
25-
from fastdeploy.model_executor.ops.xpu import (
26-
cuda_host_alloc,
27-
cuda_host_free,
28-
get_output_kv_signal,
29-
get_peer_mem_addr,
30-
set_data_ipc,
31-
share_external_data,
32-
swap_cache_all_layers,
33-
)
34-
35-
unset_data_ipc = None
36-
memory_allocated = paddle.device.xpu.memory_allocated
37-
38-
def get_data_ptr_ipc(*args, **kwargs):
39-
raise RuntimeError("XPU get_data_ptr_ipc UNIMPLENENTED!")
40-
41-
def ipc_sent_key_value_cache_by_remote_ptr(*args, **kwargs):
42-
raise RuntimeError("XPU ipc_sent_key_value_cache_by_remote_ptr UNIMPLENENTED")
43-
44-
def ipc_sent_key_value_cache_by_remote_ptr_block_sync(*args, **kwargs):
45-
raise RuntimeError("XPU No ipc_sent_key_value_cache_by_remote_ptr UNIMPLENENTED")
46-
47-
else:
48-
raise RuntimeError("Prefix cache ops only supported CUDA nor XPU platform ")
49-
50-
51-
def set_device(device):
21+
try:
5222
if current_platform.is_cuda():
53-
paddle.set_device(f"gpu:{device}")
23+
from fastdeploy.model_executor.ops.gpu import (
24+
cuda_host_alloc,
25+
cuda_host_free,
26+
get_data_ptr_ipc,
27+
get_output_kv_signal,
28+
ipc_sent_key_value_cache_by_remote_ptr,
29+
ipc_sent_key_value_cache_by_remote_ptr_block_sync,
30+
set_data_ipc,
31+
share_external_data,
32+
swap_cache_all_layers,
33+
unset_data_ipc,
34+
)
35+
36+
memory_allocated = paddle.device.cuda.memory_allocated
37+
38+
def get_peer_mem_addr(*args, **kwargs):
39+
raise RuntimeError("CUDA no need of get_peer_mem_addr!")
40+
5441
elif current_platform.is_xpu():
55-
paddle.set_device(f"xpu:{device}")
56-
else:
57-
raise RuntimeError("No supported platform")
42+
from fastdeploy.model_executor.ops.xpu import (
43+
cuda_host_alloc,
44+
cuda_host_free,
45+
get_output_kv_signal,
46+
get_peer_mem_addr,
47+
set_data_ipc,
48+
share_external_data,
49+
swap_cache_all_layers,
50+
)
5851

52+
unset_data_ipc = None
53+
memory_allocated = paddle.device.xpu.memory_allocated
5954

60-
def share_external_data_(cache, cache_name, cache_shape, use_ipc):
61-
if current_platform.is_cuda():
62-
cache = share_external_data(cache, cache_name, cache_shape)
63-
elif current_platform.is_xpu():
64-
cache = share_external_data(cache, cache_name, cache_shape, use_ipc)
65-
else:
66-
raise RuntimeError("No supported platform")
67-
return cache
55+
def get_data_ptr_ipc(*args, **kwargs):
56+
raise RuntimeError("XPU get_data_ptr_ipc UNIMPLENENTED!")
6857

58+
def ipc_sent_key_value_cache_by_remote_ptr(*args, **kwargs):
59+
raise RuntimeError("XPU ipc_sent_key_value_cache_by_remote_ptr UNIMPLENENTED")
60+
61+
def ipc_sent_key_value_cache_by_remote_ptr_block_sync(*args, **kwargs):
62+
raise RuntimeError("XPU No ipc_sent_key_value_cache_by_remote_ptr UNIMPLENENTED")
6963

70-
def get_all_visible_devices():
71-
if current_platform.is_xpu():
72-
return "XPU_VISIBLE_DEVICES=0,1,2,3,4,5,6,7"
7364
else:
74-
return "CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7"
65+
raise RuntimeError("Prefix cache ops only supported CUDA nor XPU platform ")
66+
67+
def set_device(device):
68+
if current_platform.is_cuda():
69+
paddle.set_device(f"gpu:{device}")
70+
elif current_platform.is_xpu():
71+
paddle.set_device(f"xpu:{device}")
72+
else:
73+
raise RuntimeError("No supported platform")
74+
75+
def share_external_data_(cache, cache_name, cache_shape, use_ipc):
76+
if current_platform.is_cuda():
77+
cache = share_external_data(cache, cache_name, cache_shape)
78+
elif current_platform.is_xpu():
79+
cache = share_external_data(cache, cache_name, cache_shape, use_ipc)
80+
else:
81+
raise RuntimeError("No supported platform")
82+
return cache
83+
84+
def get_all_visible_devices():
85+
if current_platform.is_xpu():
86+
return "XPU_VISIBLE_DEVICES=0,1,2,3,4,5,6,7"
87+
else:
88+
return "CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7"
89+
90+
except:
91+
cuda_host_alloc = None
92+
cuda_host_free = None
93+
set_data_ipc = None
94+
share_external_data_ = None
95+
swap_cache_all_layers = None
96+
unset_data_ipc = None
97+
set_device = None
98+
memory_allocated = None
99+
get_output_kv_signal = None
100+
get_data_ptr_ipc = None
101+
ipc_sent_key_value_cache_by_remote_ptr = None
102+
ipc_sent_key_value_cache_by_remote_ptr_block_sync = None
103+
get_peer_mem_addr = None
104+
get_all_visible_devices = None
75105

76106

77107
__all__ = [

fastdeploy/config.py

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,12 @@ def __init__(
180180
):
181181
self.model = ""
182182
self.is_quantized = False
183+
self.is_moe_quantized = False
183184
self.max_model_len = 0
184185
self.dtype = "bfloat16"
185186
self.enable_logprob = False
186187
self.max_logprobs = 20
187188
self.logprobs_mode = "raw_logprobs"
188-
self.enable_redundant_experts = False
189189
self.redundant_experts_num = 0
190190
self.seed = 0
191191
self.quantization = None
@@ -1159,20 +1159,54 @@ class EPLBConfig:
11591159

11601160
def __init__(
11611161
self,
1162+
args,
11621163
):
1163-
self.enable_redundant_experts = envs.FD_ENABLE_REDUNDANT_EXPERTS
1164-
self.redundant_experts_num = envs.FD_REDUNDANT_EXPERTS_NUM
1165-
self.redundant_expert_ip_shm_size = envs.FD_REDUNDANT_EXPERT_IP_SHM_SIZE
1166-
self.redundant_expert_meta_dir = envs.FD_REDUNDANT_EXPERT_META_DIR
1167-
self.redundant_expert_api_user = envs.FD_REDUNDANT_EXPERT_API_USER
1168-
self.redundant_expert_api_password = envs.FD_REDUNDANT_EXPERT_API_PASSWORD
1169-
self.redundant_expert_eplb_strategy = envs.FD_REDUNDANT_EXPERT_EPLB_STRATEGY
1170-
self.redundant_expert_dump_workload_interval = envs.FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL
1171-
self.redundant_expert_async_load_model_shmem_size_gb = envs.FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB
1172-
self.redundant_expert_enable_schedule_cordon = envs.FD_REDUNDANT_EXPERT_ENABLE_SCHEDULE_CORDON
1173-
self.model_use_safetensors = envs.FD_MODEL_USE_SAFETENSORS
1174-
self.model_use_offline_quant = envs.FD_MODEL_USE_OFFLINE_QUANT
1175-
self.moe_quant_type = envs.FD_MOE_QUANT_TYPE
1164+
if args is None:
1165+
args = {}
1166+
1167+
# enable eplb
1168+
self.enable_eplb: bool = False
1169+
# redundant experts num
1170+
self.redundant_experts_num: int = 0
1171+
# expert ip shm size
1172+
self.redundant_expert_ip_shm_size: int = 1024
1173+
# expert meta dir
1174+
self.redundant_expert_meta_dir: str = "/tmp/redundant_expert_meta"
1175+
# expert api user and password
1176+
self.redundant_expert_api_user: str = ""
1177+
self.redundant_expert_api_password: str = ""
1178+
# expert eplb strategy
1179+
self.redundant_expert_eplb_strategy: str = ""
1180+
# expert dump workload interval
1181+
self.redundant_expert_dump_workload_interval: int = 10
1182+
# expert async load model shmem size gb
1183+
self.redundant_expert_async_load_model_shmem_size_gb: int = 0
1184+
# expert enable schedule cordon
1185+
self.redundant_expert_enable_schedule_cordon: bool = True
1186+
# model use safetensors
1187+
self.model_use_safetensors: bool = True
1188+
# model use offline quant
1189+
self.model_use_offline_quant: bool = True
1190+
# moe quant type
1191+
self.moe_quant_type: str = "w4a8"
1192+
for key, value in args.items():
1193+
if hasattr(self, key):
1194+
setattr(self, key, value)
1195+
1196+
def to_json_string(self):
1197+
"""
1198+
Convert eplb_config to json string.
1199+
"""
1200+
return json.dumps({key: value for key, value in self.__dict__.items() if value is not None})
1201+
1202+
def print(self):
1203+
"""
1204+
Print all configuration information.
1205+
"""
1206+
logger.info("EPLB Configuration Information :")
1207+
for k, v in self.__dict__.items():
1208+
logger.info("{:<20}:{:<6}{}".format(k, "", v))
1209+
logger.info("=============================================================")
11761210

11771211

11781212
class CacheConfig:
@@ -1601,10 +1635,6 @@ def postprocess(self):
16011635
else:
16021636
self.scheduler_config.max_num_batched_tokens = self.model_config.max_model_len
16031637

1604-
self.scheduler_config.max_chunk_len = (
1605-
self.scheduler_config.max_num_batched_tokens + self.scheduler_config.max_extra_num_batched_tokens
1606-
)
1607-
16081638
if self.long_prefill_token_threshold == 0:
16091639
self.long_prefill_token_threshold = int(self.model_config.max_model_len * 0.04)
16101640

fastdeploy/engine/args_utils.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,16 @@ class EngineArgs:
467467
Url for router server, such as `0.0.0.0:30000`.
468468
"""
469469

470+
enable_eplb: bool = False
471+
"""
472+
Flag to enable eplb
473+
"""
474+
475+
eplb_config: Optional[Dict[str, Any]] = None
476+
"""
477+
Configuration for eplb.
478+
"""
479+
470480
def __post_init__(self):
471481
"""
472482
Post-initialization processing to set default tokenizer if not provided.
@@ -523,7 +533,7 @@ def __post_init__(self):
523533
f"= {expected_ports}, but got {len(self.rdma_comm_ports)}."
524534
)
525535

526-
if not current_platform.is_cuda() and not current_platform.is_xpu():
536+
if not (current_platform.is_cuda() or current_platform.is_xpu() or current_platform.is_maca()):
527537
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
528538
if self.guided_decoding_backend != "off":
529539
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
@@ -850,6 +860,18 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
850860
default=EngineArgs.enable_expert_parallel,
851861
help="Enable expert parallelism.",
852862
)
863+
parallel_group.add_argument(
864+
"--enable-eplb",
865+
action="store_true",
866+
default=EngineArgs.enable_eplb,
867+
help="Enable eplb.",
868+
)
869+
parallel_group.add_argument(
870+
"--eplb-config",
871+
type=json.loads,
872+
default=EngineArgs.eplb_config,
873+
help="Config of eplb.",
874+
)
853875

854876
# Load group
855877
load_group = parser.add_argument_group("Load Configuration")
@@ -1126,7 +1148,7 @@ def create_speculative_config(self) -> SpeculativeConfig:
11261148

11271149
def create_scheduler_config(self) -> SchedulerConfig:
11281150
"""
1129-
Create and retuan a SchedulerConfig object based on the current settings.
1151+
Create and return a SchedulerConfig object based on the current settings.
11301152
"""
11311153
prefix = "scheduler_"
11321154
prefix_len = len(prefix)
@@ -1173,13 +1195,22 @@ def create_early_stop_config(self) -> EarlyStopConfig:
11731195
early_stop_args[k] = v
11741196
return EarlyStopConfig(early_stop_args)
11751197

1198+
def create_eplb_config(self) -> EPLBConfig:
1199+
"""
1200+
Create and retuan an EPLBConfig object based on the current settings.
1201+
"""
1202+
eplb_args = asdict(self)
1203+
if self.eplb_config is not None:
1204+
for k, v in self.eplb_config.items():
1205+
eplb_args[k] = v
1206+
eplb_args["enable_eplb"] = self.enable_eplb
1207+
return EPLBConfig(eplb_args)
1208+
11761209
def create_engine_config(self, port_availability_check=True) -> FDConfig:
11771210
"""
11781211
Create and return a Config object based on the current settings.
11791212
"""
11801213
all_dict = asdict(self)
1181-
eplb_cfg = EPLBConfig()
1182-
all_dict["enable_redundant_experts"] = eplb_cfg.enable_redundant_experts
11831214
model_cfg = ModelConfig(all_dict)
11841215

11851216
# XPU currently disable prefix cache for VL model
@@ -1221,6 +1252,7 @@ def create_engine_config(self, port_availability_check=True) -> FDConfig:
12211252
scheduler_cfg = self.create_scheduler_config()
12221253
graph_opt_cfg = self.create_graph_optimization_config()
12231254
plas_attention_config = self.create_plas_attention_config()
1255+
eplb_cfg = self.create_eplb_config()
12241256
router_config = RouterConfig(all_dict)
12251257

12261258
early_stop_cfg = self.create_early_stop_config()

fastdeploy/engine/async_llm.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,7 @@ def _start_worker_service(self):
833833
f" --override-pooler-config {self.cfg.model_config.override_pooler_config}"
834834
f" --logprobs_mode {self.cfg.model_config.logprobs_mode}"
835835
f" --max_logprobs {self.cfg.model_config.max_logprobs}"
836+
f" --eplb_config '{self.cfg.eplb_config.to_json_string()}'"
836837
)
837838

838839
worker_store_true_flag = {

0 commit comments

Comments
 (0)