Skip to content

Commit 5886aa4

Browse files
[V1] [6/N] API Server: Better Shutdown (#11586)
1 parent 8d9b672 commit 5886aa4

File tree

3 files changed

+40
-45
lines changed

3 files changed

+40
-45
lines changed

vllm/entrypoints/openai/api_server.py

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
from vllm.logger import init_logger
6969
from vllm.usage.usage_lib import UsageContext
7070
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
71-
is_valid_ipv6_address, kill_process_tree, set_ulimit)
71+
is_valid_ipv6_address, set_ulimit)
7272
from vllm.version import __version__ as VLLM_VERSION
7373

7474
TIMEOUT_KEEP_ALIVE = 5 # seconds
@@ -133,32 +133,21 @@ async def build_async_engine_client_from_engine_args(
133133
Returns the Client or None if the creation failed.
134134
"""
135135

136-
# Fall back
137-
# TODO: fill out feature matrix.
136+
# AsyncLLMEngine.
138137
if (MQLLMEngineClient.is_unsupported_config(engine_args)
139138
or envs.VLLM_USE_V1 or disable_frontend_multiprocessing):
140-
engine_config = engine_args.create_engine_config(
141-
UsageContext.OPENAI_API_SERVER)
142-
uses_ray = getattr(AsyncLLMEngine._get_executor_cls(engine_config),
143-
"uses_ray", False)
144-
145-
build_engine = partial(AsyncLLMEngine.from_engine_args,
146-
engine_args=engine_args,
147-
engine_config=engine_config,
148-
usage_context=UsageContext.OPENAI_API_SERVER)
149-
if uses_ray:
150-
# Must run in main thread with ray for its signal handlers to work
151-
engine_client = build_engine()
152-
else:
153-
engine_client = await asyncio.get_running_loop().run_in_executor(
154-
None, build_engine)
155139

156-
yield engine_client
157-
if hasattr(engine_client, "shutdown"):
158-
engine_client.shutdown()
159-
return
140+
engine_client: Optional[EngineClient] = None
141+
try:
142+
engine_client = AsyncLLMEngine.from_engine_args(
143+
engine_args=engine_args,
144+
usage_context=UsageContext.OPENAI_API_SERVER)
145+
yield engine_client
146+
finally:
147+
if engine_client and hasattr(engine_client, "shutdown"):
148+
engine_client.shutdown()
160149

161-
# Otherwise, use the multiprocessing AsyncLLMEngine.
150+
# MQLLMEngine.
162151
else:
163152
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
164153
# Make TemporaryDirectory for prometheus multiprocessing
@@ -737,15 +726,6 @@ def signal_handler(*_) -> None:
737726

738727
signal.signal(signal.SIGTERM, signal_handler)
739728

740-
# The child processes will send SIGQUIT to this process when
741-
# any error happens. This process then clean up the whole tree.
742-
# TODO(rob): move this into AsyncLLM.__init__ once we remove
743-
# the context manager below.
744-
def sigquit_handler(signum, frame):
745-
kill_process_tree(os.getpid())
746-
747-
signal.signal(signal.SIGQUIT, sigquit_handler)
748-
749729
async with build_async_engine_client(args) as engine_client:
750730
app = build_app(args)
751731

vllm/v1/engine/async_llm.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import os
3+
import signal
24
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union
35

46
from vllm.config import ModelConfig, VllmConfig
@@ -16,6 +18,7 @@
1618
from vllm.transformers_utils.tokenizer import AnyTokenizer
1719
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
1820
from vllm.usage.usage_lib import UsageContext
21+
from vllm.utils import kill_process_tree
1922
from vllm.v1.engine.core_client import EngineCoreClient
2023
from vllm.v1.engine.detokenizer import Detokenizer
2124
from vllm.v1.engine.processor import Processor
@@ -38,6 +41,22 @@ def __init__(
3841
log_requests: bool = True,
3942
start_engine_loop: bool = True,
4043
) -> None:
44+
45+
# The child processes will send SIGQUIT when unrecoverable
46+
# errors happen. We kill the process tree here so that the
47+
# stack trace is very evident.
48+
# TODO: rather than killing the main process, we should
49+
# figure out how to raise an AsyncEngineDeadError and
50+
# handle at the API server level so we can return a better
51+
# error code to the clients calling VLLM.
52+
def sigquit_handler(signum, frame):
53+
logger.fatal(
54+
"AsyncLLM got SIGQUIT from worker processes, shutting "
55+
"down. See stack trace above for root cause issue.")
56+
kill_process_tree(os.getpid())
57+
58+
signal.signal(signal.SIGQUIT, sigquit_handler)
59+
4160
assert start_engine_loop
4261

4362
self.log_requests = log_requests
@@ -276,9 +295,9 @@ async def _run_output_handler(self):
276295
# 4) Abort any requests that finished due to stop strings.
277296
await self.engine_core.abort_requests_async(reqs_to_abort)
278297

279-
except BaseException as e:
280-
logger.error(e)
281-
raise e
298+
except Exception as e:
299+
logger.exception("EngineCore output handler hit an error: %s", e)
300+
kill_process_tree(os.getpid())
282301

283302
async def abort(self, request_id: str) -> None:
284303
"""Abort RequestId in self, detokenizer, and engine core."""

vllm/v1/engine/core_client.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from vllm.config import VllmConfig
88
from vllm.logger import init_logger
9-
from vllm.utils import get_open_zmq_ipc_path
9+
from vllm.utils import get_open_zmq_ipc_path, make_zmq_socket
1010
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
1111
EngineCoreProfile, EngineCoreRequest,
1212
EngineCoreRequestType, EngineCoreRequestUnion)
@@ -144,17 +144,13 @@ def __init__(
144144
else:
145145
self.ctx = zmq.Context() # type: ignore[attr-defined]
146146

147-
# Path for IPC.
147+
# Paths and sockets for IPC.
148148
output_path = get_open_zmq_ipc_path()
149149
input_path = get_open_zmq_ipc_path()
150-
151-
# Get output (EngineCoreOutput) from EngineCore.
152-
self.output_socket = self.ctx.socket(zmq.constants.PULL)
153-
self.output_socket.connect(output_path)
154-
155-
# Send input (EngineCoreRequest) to EngineCore.
156-
self.input_socket = self.ctx.socket(zmq.constants.PUSH)
157-
self.input_socket.bind(input_path)
150+
self.output_socket = make_zmq_socket(self.ctx, output_path,
151+
zmq.constants.PULL)
152+
self.input_socket = make_zmq_socket(self.ctx, input_path,
153+
zmq.constants.PUSH)
158154

159155
# Start EngineCore in background process.
160156
self.proc_handle: Optional[BackgroundProcHandle]

0 commit comments

Comments
 (0)