1
- from typing import List , Optional , Type
1
+ import weakref
2
+ from abc import ABC , abstractmethod
3
+ from typing import List , Type
2
4
3
5
import msgspec
4
6
import zmq
18
20
logger = init_logger (__name__ )
19
21
20
22
21
- class EngineCoreClient :
23
+ class EngineCoreClient ( ABC ) :
22
24
"""
23
25
EngineCoreClient: subclasses handle different methods for pushing
24
26
and pulling from the EngineCore for asyncio / multiprocessing.
@@ -52,8 +54,9 @@ def make_client(
52
54
53
55
return InprocClient (vllm_config , executor_class , log_stats )
54
56
57
+ @abstractmethod
55
58
def shutdown (self ):
56
- pass
59
+ ...
57
60
58
61
def get_output (self ) -> List [EngineCoreOutput ]:
59
62
raise NotImplementedError
@@ -107,9 +110,6 @@ def abort_requests(self, request_ids: List[str]) -> None:
107
110
def shutdown (self ):
108
111
self .engine_core .shutdown ()
109
112
110
- def __del__ (self ):
111
- self .shutdown ()
112
-
113
113
def profile (self , is_start : bool = True ) -> None :
114
114
self .engine_core .profile (is_start )
115
115
@@ -139,10 +139,14 @@ def __init__(
139
139
self .decoder = msgspec .msgpack .Decoder (EngineCoreOutputs )
140
140
141
141
# ZMQ setup.
142
- if asyncio_mode :
143
- self .ctx = zmq .asyncio .Context ()
144
- else :
145
- self .ctx = zmq .Context () # type: ignore[attr-defined]
142
+ self .ctx = (
143
+ zmq .asyncio .Context () # type: ignore[attr-defined]
144
+ if asyncio_mode else zmq .Context ()) # type: ignore[attr-defined]
145
+
146
+ # Note(rob): shutdown function cannot be a bound method,
147
+ # else the gc cannot collect the object.
148
+ self ._finalizer = weakref .finalize (self , lambda x : x .destroy (linger = 0 ),
149
+ self .ctx )
146
150
147
151
# Paths and sockets for IPC.
148
152
output_path = get_open_zmq_ipc_path ()
@@ -153,7 +157,6 @@ def __init__(
153
157
zmq .constants .PUSH )
154
158
155
159
# Start EngineCore in background process.
156
- self .proc_handle : Optional [BackgroundProcHandle ]
157
160
self .proc_handle = BackgroundProcHandle (
158
161
input_path = input_path ,
159
162
output_path = output_path ,
@@ -166,12 +169,11 @@ def __init__(
166
169
})
167
170
168
171
def shutdown (self ):
169
- # Shut down the zmq context.
170
- self .ctx .destroy (linger = 0 )
171
-
172
- if hasattr (self , "proc_handle" ) and self .proc_handle :
172
+ """Clean up background resources."""
173
+ if hasattr (self , "proc_handle" ):
173
174
self .proc_handle .shutdown ()
174
- self .proc_handle = None
175
+
176
+ self ._finalizer ()
175
177
176
178
177
179
class SyncMPClient (MPClient ):
0 commit comments