17
17
maybe_register_config_serialize_by_value )
18
18
from vllm .utils import get_exception_traceback , zmq_socket_ctx
19
19
from vllm .v1 .core .scheduler import Scheduler
20
- from vllm .v1 .engine import (EngineCoreOutput , EngineCoreOutputs ,
21
- EngineCoreProfile , EngineCoreRequest ,
22
- EngineCoreRequestType , EngineCoreRequestUnion )
20
+ from vllm .v1 .engine import (EngineCoreOutputs , EngineCoreProfile ,
21
+ EngineCoreRequest , EngineCoreRequestType ,
22
+ EngineCoreRequestUnion )
23
23
from vllm .v1 .engine .mm_input_mapper import MMInputMapperServer
24
24
from vllm .v1 .executor .abstract import Executor
25
25
from vllm .v1 .request import Request , RequestStatus
28
28
29
29
logger = init_logger (__name__ )
30
30
31
- POLLING_TIMEOUT_MS = 5000
32
- POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
33
- LOGGING_TIME_S = 5
31
+ POLLING_TIMEOUT_S = 2.5
34
32
35
33
36
34
class EngineCore :
@@ -40,10 +38,8 @@ def __init__(
40
38
self ,
41
39
vllm_config : VllmConfig ,
42
40
executor_class : Type [Executor ],
43
- log_stats : bool = False ,
44
41
):
45
42
assert vllm_config .model_config .runner_type != "pooling"
46
- self .log_stats = log_stats
47
43
48
44
logger .info ("Initializing an LLM engine (v%s) with config: %s" ,
49
45
VLLM_VERSION , vllm_config )
@@ -62,8 +58,6 @@ def __init__(
62
58
vllm_config .cache_config ,
63
59
vllm_config .lora_config )
64
60
65
- self ._last_logging_time = time .time ()
66
-
67
61
self .mm_input_mapper_server = MMInputMapperServer (
68
62
vllm_config .model_config )
69
63
@@ -114,11 +108,12 @@ def abort_requests(self, request_ids: List[str]):
114
108
self .scheduler .finish_requests (request_ids ,
115
109
RequestStatus .FINISHED_ABORTED )
116
110
117
- def step (self ) -> List [ EngineCoreOutput ] :
111
+ def step (self ) -> EngineCoreOutputs :
118
112
"""Schedule, execute, and make output."""
119
113
120
114
if not self .scheduler .has_unfinished_requests ():
121
- return []
115
+ return EngineCoreOutputs (
116
+ outputs = [], scheduler_stats = self .scheduler .make_stats ())
122
117
123
118
scheduler_output = self .scheduler .schedule ()
124
119
output = self .model_executor .execute_model (scheduler_output )
@@ -145,15 +140,17 @@ def __init__(
145
140
executor_class : Type [Executor ],
146
141
log_stats : bool = False ,
147
142
):
148
- super ().__init__ (vllm_config , executor_class , log_stats )
143
+ super ().__init__ (vllm_config , executor_class )
144
+
145
+ self .log_stats = log_stats
149
146
150
147
# Background Threads and Queues for IO. These enable us to
151
148
# overlap ZMQ socket IO with GPU since they release the GIL,
152
149
# and to overlap some serialization/deserialization with the
153
150
# model forward pass.
154
151
# Threads handle Socket <-> Queues and core_busy_loop uses Queue.
155
152
self .input_queue : queue .Queue [EngineCoreRequestUnion ] = queue .Queue ()
156
- self .output_queue : queue .Queue [List [ EngineCoreOutput ] ] = queue .Queue ()
153
+ self .output_queue : queue .Queue [EngineCoreOutputs ] = queue .Queue ()
157
154
threading .Thread (target = self .process_input_socket ,
158
155
args = (input_path , ),
159
156
daemon = True ).start ()
@@ -217,8 +214,10 @@ def run_busy_loop(self):
217
214
self ._handle_client_request (req )
218
215
break
219
216
except queue .Empty :
220
- self ._log_stats ()
221
217
logger .debug ("EngineCore busy loop waiting." )
218
+ # Break out the loop so we can log_stats in step().
219
+ if self .log_stats :
220
+ break
222
221
except BaseException :
223
222
raise
224
223
@@ -230,28 +229,9 @@ def run_busy_loop(self):
230
229
# 3) Step the engine core.
231
230
outputs = self .step ()
232
231
233
- # 4 ) Put EngineCoreOutputs into the output queue.
232
+ # 5 ) Put EngineCoreOutputs into the output queue.
234
233
self .output_queue .put_nowait (outputs )
235
234
236
- self ._log_stats ()
237
-
238
- def _log_stats (self ):
239
- """Log basic stats every LOGGING_TIME_S"""
240
-
241
- if not self .log_stats :
242
- return
243
-
244
- now = time .time ()
245
-
246
- if now - self ._last_logging_time > LOGGING_TIME_S :
247
- logger .info (
248
- "RUNNING: %s | WAITING: %s" ,
249
- len (self .scheduler .running ),
250
- len (self .scheduler .waiting ),
251
- )
252
-
253
- self ._last_logging_time = now
254
-
255
235
def _handle_client_request (self , request : EngineCoreRequestUnion ) -> None :
256
236
"""Handle EngineCoreRequest or EngineCoreABORT from Client."""
257
237
@@ -301,7 +281,6 @@ def process_output_socket(self, output_path: str):
301
281
302
282
with zmq_socket_ctx (output_path , zmq .constants .PUSH ) as socket :
303
283
while True :
304
- engine_core_outputs = self .output_queue .get ()
305
- outputs = EngineCoreOutputs (outputs = engine_core_outputs )
284
+ outputs = self .output_queue .get ()
306
285
encoder .encode_into (outputs , buffer )
307
286
socket .send_multipart ((buffer , ), copy = False )
0 commit comments