Skip to content

Conversation

@Jiang-Jia-Jun
Copy link
Collaborator

@Jiang-Jia-Jun Jiang-Jia-Jun commented Nov 18, 2025

通过异步的方式从Engine同步新插入的请求至worker,从而将Worker进程被cpu打断的开销降至0

Motivation

当前分析在0.3B模型上,num_tasksget_tasks的cpu调用开销之和接近1ms,通过此PR优化消除

Modifications

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings November 18, 2025 05:42
@paddle-bot
Copy link

paddle-bot bot commented Nov 18, 2025

Thanks for your contribution!

@CLAassistant
Copy link

CLAassistant commented Nov 18, 2025

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ Jiang-Jia-Jun
❌ jiangjiajun
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

这个PR旨在通过异步方式从Engine进程获取新插入的请求,以减少Worker进程的CPU中断开销。在0.3B模型上,优化目标是消除接近1ms的CPU调用开销。

主要变更包括:

  • 添加了异步任务通信机制的开关环境变量
  • 实现了新的任务检查和获取方法,支持同步和异步两种模式
  • 调整了多节点场景下的读取完成标志设置逻辑

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
fastdeploy/worker/worker_process.py 新增 _exist_tasks_from_engine()_get_tasks_from_engine() 方法以支持异步任务获取;修改事件循环中的任务检查和获取逻辑;调整多节点场景下的同步标志设置
fastdeploy/envs.py 添加 DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM 环境变量用于控制异步任务通信功能的启用/禁用

重要问题提醒:

  1. 严重Bug: 代码引用了未初始化的实例变量 self.local_synced_tasksself.all_local_tp_synced,在异步模式下会导致运行时 AttributeError
  2. 严重Bug: 第472行错误调用了 self.task_queue._get_tasks_from_engine(),应为 self._get_tasks_from_engine()
  3. PR标题格式错误: 标题使用了 [Optimize] 标签,但正确的标签应该是 [Optimization]

建议在合并前修复这些关键问题。

self.parallel_config.engine_worker_queue_port,
)
logger.info(f"current task queue data: {self.task_queue.num_tasks()}")
logger.info(f"current task queue data: {self.local_synced_tasks}")
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Logging inconsistency: When async mode is enabled, this log message will output the raw object self.local_synced_tasks which could be None or a tuple/list. This differs from the original behavior which logged the number of tasks. Consider:

if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
    logger.info(f"current task queue data: {self.task_queue.num_tasks()}")
else:
    logger.info(f"current task queue data: {self.local_synced_tasks}")

Or provide more descriptive logging:

task_info = self.task_queue.num_tasks() if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM else len(self.local_synced_tasks) if self.local_synced_tasks else 0
logger.info(f"current task queue data: {task_info}")
Suggested change
logger.info(f"current task queue data: {self.local_synced_tasks}")
logger.info(f"current task queue data: {len(self.local_synced_tasks) if self.local_synced_tasks else 0}")

Copilot uses AI. Check for mistakes.
Comment on lines 162 to 181
def _exist_tasks_from_engine(self):
"""
Check if there exists new tasks sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.num_tasks() > 0
else:
return self.local_synced_tasks is not None

def _get_tasks_from_engine(self):
"""
Get new tasks that sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.get_tasks()
else:
new_tasks, read_finished = self.local_synced_tasks, self.all_local_tp_synced
self.local_synced_tasks = None
self.all_local_tp_synced = False
return new_tasks, read_finished
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance variables self.local_synced_tasks and self.all_local_tp_synced are referenced in this method but are never initialized in the __init__ method or elsewhere in the class. When DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM is False, this will cause an AttributeError at runtime.

These variables should be initialized in the __init__ method, for example:

self.local_synced_tasks = None
self.all_local_tp_synced = False

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 10 comments.

self.insert_step = True

tasks, read_finish = self.task_queue.get_tasks()
tasks, read_finish = self.task_queue._get_tasks_from_engine()
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name inconsistency: The method is defined as _get_requests_from_engine (line 177), but it's called as self.task_queue._get_tasks_from_engine() here. This will cause an AttributeError at runtime. Should be self._get_requests_from_engine() instead.

Suggested change
tasks, read_finish = self.task_queue._get_tasks_from_engine()
tasks, read_finish = self._get_requests_from_engine()

Copilot uses AI. Check for mistakes.
Comment on lines +194 to +205
"""
try:
while True:
if self.local_synced_requests is None and self.task_queue.num_tasks() > 0:
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
except Exception as e:
logger.error(
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(
e
)
)

Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error handling issue: The exception handler logs the error but doesn't take any recovery action or signal to the main thread that the background thread has failed. This means:

  1. The worker process will silently stop receiving new requests without any indication
  2. No attempt is made to restart the thread or notify the system
  3. The exception breaks the infinite loop, but the daemon thread continues to exist

Consider adding recovery logic, such as restarting the thread after a delay, or setting a flag that the main loop can check to detect the failure and handle it appropriately.

Suggested change
"""
try:
while True:
if self.local_synced_requests is None and self.task_queue.num_tasks() > 0:
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
except Exception as e:
logger.error(
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(
e
)
)
If an exception occurs, the thread will log the error, wait, and restart the loop.
"""
while True:
try:
while True:
if self.local_synced_requests is None and self.task_queue.num_tasks() > 0:
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
except Exception as e:
logger.error(
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(
e
)
)
time.sleep(1) # Wait before retrying to avoid tight loop

Copilot uses AI. Check for mistakes.
Comment on lines +163 to +201
# synced requests from engine
self.local_synced_requests = None
# flag to determin if all tp process synced
self.all_local_tp_synced = False

def _exist_requests_from_engine(self):
"""
Check if there exists new requests sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.num_tasks() > 0
else:
return self.local_synced_requests is not None

def _get_requests_from_engine(self):
"""
Get new requests that sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.get_tasks()
else:
new_requests, read_finished = self.local_synced_requests, self.all_local_tp_synced
self.local_synced_requests = None
self.all_local_tp_synced = False
return new_requests, read_finished

def _sync_requests_from_engine_loop(self):
"""
A thread that keeps sync all the new requests from engine process to worker process.
This function must be called in `event_loop_normal` since the `task_queue` is available
in that function.
"""
try:
while True:
if self.local_synced_requests is None and self.task_queue.num_tasks() > 0:
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
except Exception as e:
logger.error(
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best practice violation: The naming inconsistency between "tasks" and "requests" throughout the code makes it difficult to maintain. The new code introduces methods named _exist_requests_from_engine and _get_requests_from_engine, but they're called with different names (_exist_tasks_from_engine, _get_tasks_from_engine). Additionally, variables use both local_synced_requests and local_synced_tasks.

For consistency with the existing codebase (which uses task_queue, num_tasks(), get_tasks()), consider using "tasks" terminology throughout instead of mixing "tasks" and "requests".

Suggested change
# synced requests from engine
self.local_synced_requests = None
# flag to determin if all tp process synced
self.all_local_tp_synced = False
def _exist_requests_from_engine(self):
"""
Check if there exists new requests sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.num_tasks() > 0
else:
return self.local_synced_requests is not None
def _get_requests_from_engine(self):
"""
Get new requests that sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.get_tasks()
else:
new_requests, read_finished = self.local_synced_requests, self.all_local_tp_synced
self.local_synced_requests = None
self.all_local_tp_synced = False
return new_requests, read_finished
def _sync_requests_from_engine_loop(self):
"""
A thread that keeps sync all the new requests from engine process to worker process.
This function must be called in `event_loop_normal` since the `task_queue` is available
in that function.
"""
try:
while True:
if self.local_synced_requests is None and self.task_queue.num_tasks() > 0:
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
except Exception as e:
logger.error(
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(
# synced tasks from engine
self.local_synced_tasks = None
# flag to determin if all tp process synced
self.all_local_tp_synced = False
def _exist_tasks_from_engine(self):
"""
Check if there exists new tasks sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.num_tasks() > 0
else:
return self.local_synced_tasks is not None
def _get_tasks_from_engine(self):
"""
Get new tasks that sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.get_tasks()
else:
new_tasks, read_finished = self.local_synced_tasks, self.all_local_tp_synced
self.local_synced_tasks = None
self.all_local_tp_synced = False
return new_tasks, read_finished
def _sync_tasks_from_engine_loop(self):
"""
A thread that keeps sync all the new tasks from engine process to worker process.
This function must be called in `event_loop_normal` since the `task_queue` is available
in that function.
"""
try:
while True:
if self.local_synced_tasks is None and self.task_queue.num_tasks() > 0:
self.local_synced_tasks, self.all_local_tp_synced = self.task_queue.get_tasks()
except Exception as e:
logger.error(
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new task! error={}".format(

Copilot uses AI. Check for mistakes.

# synced requests from engine
self.local_synced_requests = None
# flag to determin if all tp process synced
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling error: "determin" should be "determine".

Suggested change
# flag to determin if all tp process synced
# flag to determine if all tp process synced

Copilot uses AI. Check for mistakes.
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
except Exception as e:
logger.error(
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling errors in error message: "unexcepted" should be "unexpected" and "happend" should be "happened".

Suggested change
"There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(
"There's unexpected issue happened to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format(

Copilot uses AI. Check for mistakes.
# The first worker detects whether there are tasks in the task queue
if tp_rank == 0:
if self.task_queue.num_tasks() > 0:
if self._exist_tasks_from_engine():
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name inconsistency: The method is defined as _exist_requests_from_engine (line 168), but it's called as _exist_tasks_from_engine() here. This will cause an AttributeError at runtime.

Suggested change
if self._exist_tasks_from_engine():
if self._exist_requests_from_engine():

Copilot uses AI. Check for mistakes.
self.parallel_config.engine_worker_queue_port,
)
logger.info(f"current task queue data: {self.task_queue.num_tasks()}")
logger.info(f"current task queue data: {self.local_synced_tasks}")
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name inconsistency: The variable is defined as self.local_synced_requests (line 164), but it's referenced as self.local_synced_tasks here. This will cause an AttributeError at runtime.

Suggested change
logger.info(f"current task queue data: {self.local_synced_tasks}")
logger.info(f"current task queue data: {self.local_synced_requests}")

Copilot uses AI. Check for mistakes.
Comment on lines +189 to +198
def _sync_requests_from_engine_loop(self):
"""
A thread that keeps sync all the new requests from engine process to worker process.
This function must be called in `event_loop_normal` since the `task_queue` is available
in that function.
"""
try:
while True:
if self.local_synced_requests is None and self.task_queue.num_tasks() > 0:
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrency issue: The background thread (_sync_requests_from_engine_loop) and the main event loop both access self.local_synced_requests and self.all_local_tp_synced without any synchronization mechanism (e.g., locks or atomic operations). This can lead to race conditions where:

  1. The main thread reads self.local_synced_requests while the background thread is writing to it
  2. The main thread sets self.local_synced_requests = None while the background thread is checking it

Consider using threading.Lock() or threading.Condition() to protect access to these shared variables.

Copilot uses AI. Check for mistakes.
try:
while True:
if self.local_synced_requests is None and self.task_queue.num_tasks() > 0:
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance issue: The infinite while loop runs without any delay when self.local_synced_requests is not None, which will cause the thread to spin at 100% CPU usage. Consider adding a small sleep (e.g., time.sleep(0.0001)) in the loop to prevent excessive CPU consumption, especially when waiting for new tasks.

Suggested change
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks()
time.sleep(0.0001) # Prevent 100% CPU usage when waiting for new tasks

Copilot uses AI. Check for mistakes.
Comment on lines +168 to +187
def _exist_requests_from_engine(self):
"""
Check if there exists new requests sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.num_tasks() > 0
else:
return self.local_synced_requests is not None

def _get_requests_from_engine(self):
"""
Get new requests that sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.get_tasks()
else:
new_requests, read_finished = self.local_synced_requests, self.all_local_tp_synced
self.local_synced_requests = None
self.all_local_tp_synced = False
return new_requests, read_finished
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation issue: The docstrings for _exist_requests_from_engine and _get_requests_from_engine should include:

  1. Return types and values
  2. Explanation of the behavior difference between async and sync modes (controlled by DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM)
  3. Thread safety considerations

For example:

def _exist_requests_from_engine(self):
    \"\"\"
    Check if there exists new requests sent from engine process.
    
    Returns:
        bool: True if new requests are available, False otherwise.
        
    Note:
        In async mode (default), checks if local_synced_requests is not None.
        In sync mode (DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM=1), directly polls the task queue.
    \"\"\"

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants