From 4efef725de034fc6fbe57c79f1ed7f506cb13bed Mon Sep 17 00:00:00 2001 From: rickyx Date: Wed, 4 Dec 2024 23:07:46 +0000 Subject: [PATCH 1/6] initial Signed-off-by: rickyx --- tests/v1/test_stats.py | 281 +++++++++++++++++++++++++++ vllm/v1/stats/__init__.py | 0 vllm/v1/stats/common.py | 389 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 670 insertions(+) create mode 100644 tests/v1/test_stats.py create mode 100644 vllm/v1/stats/__init__.py create mode 100644 vllm/v1/stats/common.py diff --git a/tests/v1/test_stats.py b/tests/v1/test_stats.py new file mode 100644 index 000000000000..841848bb1cf3 --- /dev/null +++ b/tests/v1/test_stats.py @@ -0,0 +1,281 @@ +from typing import Optional + +import pytest + +from vllm.sampling_params import SamplingParams +from vllm.v1.engine import EngineCoreRequest +from vllm.v1.stats.common import RequestStats, RequestStatsUpdate + + +def test_lifecycle_updates(): + request_id = "test_request" + stats = RequestStats(request_id=request_id) + + # Test the below scenario: + arrived_ts = 0 + input_processed_ts = 1 + queued_ts = 2 + running_ts = 3 + running_2_ts = 4 + decoded_ts = 5 + detokenized_ts = 6 + decoded_2_ts = 7 + preempted_ts = 8 + resumed_ts = 9 + decoded_3_ts = 10 + finished_ts = 11 + + # Test ARRIVED + arrived_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.ARRIVED, + monotonic_ts_s=arrived_ts, + ) + stats.update_from(arrived_update) + assert stats.arrival_ts_s == arrived_ts + assert stats.last_updated_ts_s == arrived_ts + + # Test INPUT_PROCESSED + sampling_params = SamplingParams(n=1) + engine_request = EngineCoreRequest( + prompt_token_ids=[1, 2, 3, 4, 5, 6], + sampling_params=sampling_params, + request_id=request_id, + prompt="test_prompt", + mm_inputs=None, + mm_placeholders=None, + eos_token_id=None, + arrival_time=arrived_ts, + lora_request=None, + ) + input_processed_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.INPUT_PROCESSED, + monotonic_ts_s=input_processed_ts, + engine_request=engine_request, + ) + stats.update_from(input_processed_update) + assert stats.input_processor_end_ts_s == input_processed_ts + assert stats.engine_request == engine_request + assert stats.last_updated_ts_s == input_processed_ts + assert stats.num_prompt_tokens == 6 + assert stats.sampling_params == sampling_params + + assert stats.first_token_ts_s is None + assert stats.first_scheduled_ts_s is None + + # Test QUEUED + queued_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.QUEUED, + monotonic_ts_s=queued_ts, + ) + stats.update_from(queued_update) + assert stats.waiting_ts_s == queued_ts + assert stats.last_updated_ts_s == queued_ts + + # Test RUNNING + running_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.RUNNING, + monotonic_ts_s=running_ts, + was_running=False, + num_computed_tokens=3, + num_cached_tokens=1, + ) + stats.update_from(running_update) + assert stats.first_scheduled_ts_s == running_ts + assert stats.num_computed_tokens == 3 + assert stats.num_cached_tokens == 1 + assert stats.queue_duration_s == running_ts - arrived_ts + + # Test RUNNING again shouldn't update first_scheduled_ts_s + running_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.RUNNING, + monotonic_ts_s=running_2_ts, + was_running=True, + num_computed_tokens=6, + num_cached_tokens=0, + ) + stats.update_from(running_update) + assert stats.first_scheduled_ts_s == running_ts + assert stats.num_computed_tokens == 6 + # num_cached_tokens is not updated + assert stats.num_cached_tokens == 1 + assert stats.last_updated_ts_s == running_2_ts + # running_ts_s_lst should only contain the first running/resumed running + # update + assert stats.running_ts_s_lst == [ + running_ts, + ] + + # Test DECODED + decoded_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DECODED, + monotonic_ts_s=decoded_ts, + num_new_tokens=1, + token_perf_ts_ns=decoded_ts * 1e9, + ) + stats.update_from(decoded_update) + assert stats.last_updated_ts_s == decoded_ts + # Since arrival + assert stats.first_token_latency_s == decoded_ts - arrived_ts + assert stats.num_output_tokens == 1 + # Since first scheduled + assert stats.prefill_latency_s == 2 + + # Test DETOKENIZED + detokenized_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DETOKENIZED, + monotonic_ts_s=detokenized_ts, + ) + stats.update_from(detokenized_update) + assert stats.last_updated_ts_s == detokenized_ts + + # Test another DECODE should yield correct inter token latency + decoded_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DECODED, + monotonic_ts_s=decoded_2_ts, + num_new_tokens=1, + token_perf_ts_ns=decoded_2_ts * 1e9, + ) + stats.update_from(decoded_update) + assert stats.output_token_latency_s_lst == [ + decoded_2_ts - decoded_ts, + ] + assert stats.num_output_tokens == 2 + + # Test PREEMPTED + preempted_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.PREEMPTED, + monotonic_ts_s=preempted_ts, + ) + stats.update_from(preempted_update) + assert stats.last_updated_ts_s == preempted_ts + assert stats.preempted_ts_s_lst == [preempted_ts] + # States should be reset + assert stats.num_computed_tokens == 0 + assert stats.num_cached_tokens == 0 + assert stats.num_output_tokens == 0 + assert stats.output_token_latency_s_lst == [] + + # Test resumed + resumed_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.RUNNING, + monotonic_ts_s=resumed_ts, + was_running=False, + num_computed_tokens=6, + num_cached_tokens=2, + ) + stats.update_from(resumed_update) + # First scheduled ts should NOT be updated + assert stats.first_scheduled_ts_s == running_ts + assert stats.num_computed_tokens == 6 + assert stats.num_cached_tokens == 2 + assert stats.running_ts_s_lst == [ + running_ts, + resumed_ts, + ] + assert stats.last_updated_ts_s == resumed_ts + + # Test another DECODED should yield correct first token latency. + decoded_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DECODED, + monotonic_ts_s=decoded_3_ts, + num_new_tokens=1, + token_perf_ts_ns=decoded_3_ts * 1e9, + ) + stats.update_from(decoded_update) + assert stats.first_token_ts_s == decoded_3_ts + assert stats.num_output_tokens == 1 + + # Test FINISHED + finished_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DETOKENIZED, + monotonic_ts_s=finished_ts, + finish_reason="test_reason", + ) + stats.update_from(finished_update) + assert stats.last_updated_ts_s == finished_ts + assert stats.e2e_latency_s == finished_ts - arrived_ts + assert stats.inference_latency_s == finished_ts - running_ts + assert stats.decode_latency_s == finished_ts - decoded_3_ts + assert stats.is_finished + assert stats.finish_reason == "test_reason" + + +@pytest.mark.parametrize("finish_reason", + ["test-decode", "test-detokenize", None]) +def test_finish_reason(finish_reason: Optional[str]): + """ + Test that a request could be finished when decoded and detokenized + at different times. + """ + request_id = "test_request" + r = RequestStats(request_id=request_id) + + # Test FINISHED + updates = [ + RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.ARRIVED, + monotonic_ts_s=0, + ), + RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.INPUT_PROCESSED, + monotonic_ts_s=1, + ), + RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.QUEUED, + monotonic_ts_s=2, + ), + RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.RUNNING, + was_running=False, + monotonic_ts_s=3, + num_computed_tokens=3, + ), + ] + + if finish_reason is not None: + if finish_reason == "test-decode": + updates.append( + RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DECODED, + monotonic_ts_s=4, + finish_reason=finish_reason, + token_perf_ts_ns=4 * 1e9, + num_new_tokens=1, + )) + elif finish_reason == "test-detokenize": + updates.append( + RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DETOKENIZED, + monotonic_ts_s=4, + finish_reason=finish_reason, + )) + + for update in updates: + r.update_from(update) + + if finish_reason is not None: + assert r.finish_reason == finish_reason + assert r.is_finished + assert r.e2e_latency_s == 4 + else: + assert r.finish_reason is None + assert not r.is_finished + assert r.e2e_latency_s is None diff --git a/vllm/v1/stats/__init__.py b/vllm/v1/stats/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/v1/stats/common.py b/vllm/v1/stats/common.py new file mode 100644 index 000000000000..f85d7c0455c9 --- /dev/null +++ b/vllm/v1/stats/common.py @@ -0,0 +1,389 @@ +import time +from dataclasses import dataclass +from dataclasses import field as dataclass_field +from enum import IntEnum +from typing import List, Optional + +import msgspec +from msgspec import field as msgspec_field + +from vllm.sampling_params import SamplingParams +from vllm.v1.engine import EngineCoreRequest + + +def ns_to_s(ns: int) -> float: + return ns / 1e9 + + +class RequestStatsUpdate(msgspec.Struct, + array_like=True, + omit_defaults=True, + gc=False): + """ + An update to the request stats. + + NOTE: + - We should try to keep the size of this struct minimal by avoiding + keeping references to additional objects if not necessary, especially + when the referenced object could have been GCed already if not for + this reference (examples include per decoded token RequestOutput, + EngineCoreOutput, etc.). + """ + + class Type(IntEnum): + # Request arrived at the engine frontend. + ARRIVED = 0 + # Input processed by the input processor. + INPUT_PROCESSED = 1 + # Queued on the engine core. + QUEUED = 2 + # Scheduled running by the scheduler. + RUNNING = 3 + # Preempted by the scheduler. + PREEMPTED = 4 + # Token decoded by the engine. + DECODED = 5 + # Token detokenized by the detokenizer. + DETOKENIZED = 6 + + request_id: str + + type: Type + + # Timestamp when the update is recorded. This is used to record time + # intervals between events. + monotonic_ts_s: float = msgspec_field( + default_factory=lambda: time.monotonic()) + + ############################################################ + # Metadata associated with the update. + ############################################################ + # For input_processed. + engine_request: Optional[EngineCoreRequest] = None + + # For running. + # If the request was already running. + was_running: Optional[bool] = None + # Number of tokens computed. + num_computed_tokens: Optional[int] = None + # Number of cached tokens. + num_cached_tokens: Optional[int] = None + + # For decoded. + # The perfcounter timestamp for each output token. + token_perf_ts_ns: Optional[int] = None + # The number of new output tokens. + num_new_tokens: Optional[int] = None + + # For both detokenized and decoded. + # Finished reason. + finish_reason: Optional[str] = None + + +@dataclass +class RequestStats: + """Stats associated with a request. + + A request would go through the following lifecycles upon arriving + the llm engine: + - Arrival: when the request is first added to the llm engine. + - Inputs processed: when the input processor is completed. + - Waiting: added to the waiting queue of the scheduler in the EngineCore. + - Scheduled: when the request is scheduled by the scheduler. + - [Preempted]: a request could be temporarily unscheduled by the scheduler + under contention of resources. This will go back to the + waiting queue of the scheduler, and the request will be + scheduled again. + - Finished: a request is finished (aborted or stopped) + """ + + ############################################################ + # Metadata + ############################################################ + request_id: str + # The original request object from the engine core. + engine_request: Optional[EngineCoreRequest] = None + + ############################################################ + # Metrics and Stats + ############################################################ + # Timestamp when the request was last updated. + last_updated_ts_s: Optional[float] = None + + # Timestamp when the request arrived at the llm engine. + arrival_ts_s: Optional[float] = None + + # Number of tokens cached. When part of the request prefix is cached, + # this will be set. + num_cached_tokens: int = 0 + + # Number of tokens computed. + num_computed_tokens: int = 0 + + # The timestamp when the request was first added to the scheduler, waiting + # in the queue. + waiting_ts_s: Optional[float] = None + + # When the input processor is completed. + input_processor_end_ts_s: Optional[float] = None + + # A sorted list of timestamps when the request was scheduled to run. + running_ts_s_lst: List[float] = dataclass_field(default_factory=list) + + # A sorted list of perf counter timestamps for each output token. + output_token_perf_counter_ns_lst: List[int] = dataclass_field( + default_factory=list) + + # First token's timestamp. + first_token_ts_s: Optional[float] = None + + # TODO(rickyx): we need model runner to surface these. + model_forward_duration_s: float = 0.0 + # Includes model forward, block/sync across workers, cpu-gpu sync time + # and sampling time. + model_execute_duration_s: float = 0.0 + + # A sorted list of timestamps when the request was preempted at the + # scheduler. + preempted_ts_s_lst: List[float] = dataclass_field(default_factory=list) + + # Timestamp when the request was finished at the engine core. + finished_ts_s: Optional[float] = None + + # Finish reason. + finish_reason: Optional[str] = None + + ############################################################ + # Derived properties. + ############################################################ + @property + def num_prompt_tokens(self) -> Optional[int]: + return (len(self.engine_request.prompt_token_ids) + if self.engine_request else None) + + @property + def first_scheduled_ts_s(self) -> Optional[float]: + return self.running_ts_s_lst[0] if self.running_ts_s_lst else None + + @property + def e2e_latency_s(self) -> Optional[float]: + if self.finished_ts_s is None or self.arrival_ts_s is None: + return None + assert self.finished_ts_s >= self.arrival_ts_s + return self.finished_ts_s - self.arrival_ts_s + + @property + def queue_duration_s(self) -> Optional[float]: + if self.first_scheduled_ts_s is None or self.arrival_ts_s is None: + return None + assert self.first_scheduled_ts_s >= self.arrival_ts_s + return self.first_scheduled_ts_s - self.arrival_ts_s + + @property + def inference_latency_s(self) -> Optional[float]: + if self.e2e_latency_s is None or self.queue_duration_s is None: + return None + assert self.e2e_latency_s >= self.queue_duration_s + return self.e2e_latency_s - self.queue_duration_s + + @property + def first_token_latency_s(self) -> Optional[float]: + if self.first_token_ts_s is None or self.arrival_ts_s is None: + return None + assert self.first_token_ts_s >= self.arrival_ts_s + return self.first_token_ts_s - self.arrival_ts_s + + @property + def prefill_latency_s(self) -> Optional[float]: + if self.first_token_ts_s is None or self.first_scheduled_ts_s is None: + return None + assert self.first_token_ts_s >= self.first_scheduled_ts_s + return self.first_token_ts_s - self.first_scheduled_ts_s + + @property + def decode_latency_s(self) -> Optional[float]: + if self.e2e_latency_s is None or self.first_token_latency_s is None: + return None + assert self.e2e_latency_s >= self.first_token_latency_s + return self.e2e_latency_s - self.first_token_latency_s + + @property + def output_token_latency_s_lst(self) -> List[float]: + if len(self.output_token_perf_counter_ns_lst) == 0: + return [] + latency_s_lst = [] + for i in range(1, len(self.output_token_perf_counter_ns_lst)): + assert (self.output_token_perf_counter_ns_lst[i] >= + self.output_token_perf_counter_ns_lst[i - 1]) + latency_s = ns_to_s(self.output_token_perf_counter_ns_lst[i] - + self.output_token_perf_counter_ns_lst[i - 1]) + latency_s_lst.append(latency_s) + return latency_s_lst + + @property + def num_output_tokens(self) -> int: + return len(self.output_token_perf_counter_ns_lst) + + @property + def is_finished(self) -> bool: + return self.finished_ts_s is not None + + @property + def sampling_params(self) -> Optional[SamplingParams]: + return (self.engine_request.sampling_params + if self.engine_request else None) + + def update_from(self, update: "RequestStatsUpdate"): + ts = update.monotonic_ts_s + self.last_updated_ts_s = ts + if update.type == RequestStatsUpdate.Type.ARRIVED: + self.arrival_ts_s = ts + elif update.type == RequestStatsUpdate.Type.INPUT_PROCESSED: + self.input_processor_end_ts_s = ts + self.engine_request = update.engine_request + elif update.type == RequestStatsUpdate.Type.QUEUED: + self.waiting_ts_s = ts + elif update.type == RequestStatsUpdate.Type.RUNNING: + assert (update.was_running is not None + and update.num_computed_tokens is not None) + self._record_running( + update.num_computed_tokens, + update.was_running, + ts, + update.num_cached_tokens, + ) + elif update.type == RequestStatsUpdate.Type.PREEMPTED: + self._reset_for_preemption(ts) + elif update.type == RequestStatsUpdate.Type.DECODED: + assert update.token_perf_ts_ns is not None + self._record_engine_output( + ts, + update.token_perf_ts_ns, + update.num_new_tokens, + update.finish_reason, + ) + elif update.type == RequestStatsUpdate.Type.DETOKENIZED: + self._record_request_output(update.finish_reason, ts) + else: + raise ValueError(f"Unknown update type: {update.type}") + + def _record_running( + self, + num_computed_tokens: int, + was_running: bool, + ts_s: float, + num_cached_tokens: Optional[int] = None, + ): + if not was_running: + # Was preempted or newly run. + self.running_ts_s_lst.append(ts_s) + self.num_cached_tokens = num_cached_tokens + + self.num_computed_tokens = num_computed_tokens + + def _record_engine_output( + self, + ts_s: float, + perf_ts_ns: int, + num_new_tokens: int, + finish_reason: Optional[str], + ): + # Update if first output token is generated. + if len(self.output_token_perf_counter_ns_lst) == 0: + self.first_token_ts_s = ts_s + assert self.first_scheduled_ts_s is not None + + self.output_token_perf_counter_ns_lst.extend([perf_ts_ns] * + num_new_tokens) + + # Update if the request is finished. + if finish_reason is not None: + self.finished_ts_s = ts_s + self.finish_reason = finish_reason + + def _record_request_output(self, finish_reason: Optional[str], + ts_s: float): + if finish_reason is not None and self.finished_ts_s is None: + self.finished_ts_s = ts_s + self.finish_reason = finish_reason + + def _reset_for_preemption(self, ts_s: float): + self.preempted_ts_s_lst.append(ts_s) + self.num_computed_tokens = 0 + self.num_cached_tokens = 0 + self.output_token_perf_counter_ns_lst.clear() + self.model_forward_duration_s = 0.0 + self.model_execute_duration_s = 0.0 + self.first_token_ts_s = None + + +@dataclass +class KVCacheStats: + # KV Cache Usage in % + gpu_cache_usage_sys: float = 0.0 + gpu_prefix_cache_hit_rate: float = 0.0 + + +@dataclass +class SchedulerStats: + """Stats associated with the scheduler.""" + + # Number of requests currently running. + num_running_reqs: int = 0 + # Number of requests currently waiting. + num_waiting_reqs: int = 0 + + kv_cache_stats: KVCacheStats = dataclass_field( + default_factory=KVCacheStats) + + +@dataclass +class EngineCoreProcessStats: + """Stats associated with the engine core process.""" + + # Number of requests currently in the input queue. None if the engine core + # is not running in multiprocess mode. + input_queue_size: Optional[int] = None + # Number of outputs currently in the output queue. None if the engine core + # is not running in multiprocess mode. + output_queue_size: Optional[int] = None + + +class EngineStatsSnapshot(msgspec.Struct, + array_like=True, + omit_defaults=True, + gc=False): + """ + A snapshot of the engine's current stats. + This represents a snapshot of the current engine core's stats over a + period of time. + + A snapshot is created periodically (e.g. every 5 seconds) on the frontend of + the engine, and engine core stats would be gathered from the engine core: + including the current state of the scheduler, the requests updated since + the last snapshot. + + This decouples stats collection from actual processing of the requests such + that: + 1. Stats collection is lightweight and could be aligned with the same + interval as the upper level stats logging (e.g. Prometheus scraping + time, logging interval, etc.). + 2. Stats collection could happen independently of the request processing + so even if no requests were processed, stats would still be propagated + reliably. + """ + + # Snapshot of the scheduler stats. + scheduler_stats: SchedulerStats = msgspec_field( + default_factory=SchedulerStats) + + # Per request stats updates. + requests_stats_updates: List[RequestStatsUpdate] = msgspec_field( + default_factory=list) + + # Engine core's queue stats. + engine_core_process_stats: EngineCoreProcessStats = msgspec_field( + default_factory=EngineCoreProcessStats) + + # TODO(rickyx): Add other components' stats, + # e.g. model runner/worker and etc. From 81816c269fc33afdca81a198a0217a86919a2b5b Mon Sep 17 00:00:00 2001 From: rickyx Date: Thu, 5 Dec 2024 00:22:14 +0000 Subject: [PATCH 2/6] fix Signed-off-by: rickyx --- tests/v1/test_stats.py | 34 +++++++-------- vllm/v1/stats/common.py | 91 ++++++++++++++++++++++++----------------- 2 files changed, 70 insertions(+), 55 deletions(-) diff --git a/tests/v1/test_stats.py b/tests/v1/test_stats.py index 841848bb1cf3..442d63d3e3e4 100644 --- a/tests/v1/test_stats.py +++ b/tests/v1/test_stats.py @@ -62,7 +62,7 @@ def test_lifecycle_updates(): assert stats.sampling_params == sampling_params assert stats.first_token_ts_s is None - assert stats.first_scheduled_ts_s is None + assert stats.prefill_ts_s is None # Test QUEUED queued_update = RequestStatsUpdate( @@ -71,7 +71,7 @@ def test_lifecycle_updates(): monotonic_ts_s=queued_ts, ) stats.update_from(queued_update) - assert stats.waiting_ts_s == queued_ts + assert stats.queued_ts_s == queued_ts assert stats.last_updated_ts_s == queued_ts # Test RUNNING @@ -79,34 +79,34 @@ def test_lifecycle_updates(): request_id=request_id, type=RequestStatsUpdate.Type.RUNNING, monotonic_ts_s=running_ts, - was_running=False, + new_prefill=True, num_computed_tokens=3, num_cached_tokens=1, ) stats.update_from(running_update) - assert stats.first_scheduled_ts_s == running_ts + assert stats.prefill_ts_s == running_ts assert stats.num_computed_tokens == 3 assert stats.num_cached_tokens == 1 - assert stats.queue_duration_s == running_ts - arrived_ts + assert stats.queue_duration_s == running_ts - queued_ts - # Test RUNNING again shouldn't update first_scheduled_ts_s + # Test RUNNING again shouldn't update prefill_ts_s running_update = RequestStatsUpdate( request_id=request_id, type=RequestStatsUpdate.Type.RUNNING, monotonic_ts_s=running_2_ts, - was_running=True, + new_prefill=False, num_computed_tokens=6, num_cached_tokens=0, ) stats.update_from(running_update) - assert stats.first_scheduled_ts_s == running_ts + assert stats.prefill_ts_s == running_ts assert stats.num_computed_tokens == 6 # num_cached_tokens is not updated assert stats.num_cached_tokens == 1 assert stats.last_updated_ts_s == running_2_ts - # running_ts_s_lst should only contain the first running/resumed running - # update - assert stats.running_ts_s_lst == [ + # prefill_start_ts_s_lst should only contain the first running/resumed + # running prefill update. + assert stats.prefill_start_ts_s_lst == [ running_ts, ] @@ -169,16 +169,16 @@ def test_lifecycle_updates(): request_id=request_id, type=RequestStatsUpdate.Type.RUNNING, monotonic_ts_s=resumed_ts, - was_running=False, + new_prefill=True, num_computed_tokens=6, num_cached_tokens=2, ) stats.update_from(resumed_update) - # First scheduled ts should NOT be updated - assert stats.first_scheduled_ts_s == running_ts + # Resumed prefill timestamp should be updated + assert stats.prefill_ts_s == resumed_ts assert stats.num_computed_tokens == 6 assert stats.num_cached_tokens == 2 - assert stats.running_ts_s_lst == [ + assert stats.prefill_start_ts_s_lst == [ running_ts, resumed_ts, ] @@ -206,7 +206,7 @@ def test_lifecycle_updates(): stats.update_from(finished_update) assert stats.last_updated_ts_s == finished_ts assert stats.e2e_latency_s == finished_ts - arrived_ts - assert stats.inference_latency_s == finished_ts - running_ts + assert stats.inference_latency_s == finished_ts - resumed_ts assert stats.decode_latency_s == finished_ts - decoded_3_ts assert stats.is_finished assert stats.finish_reason == "test_reason" @@ -242,7 +242,7 @@ def test_finish_reason(finish_reason: Optional[str]): RequestStatsUpdate( request_id=request_id, type=RequestStatsUpdate.Type.RUNNING, - was_running=False, + new_prefill=True, monotonic_ts_s=3, num_computed_tokens=3, ), diff --git a/vllm/v1/stats/common.py b/vllm/v1/stats/common.py index f85d7c0455c9..a1a6d49d1928 100644 --- a/vllm/v1/stats/common.py +++ b/vllm/v1/stats/common.py @@ -24,13 +24,14 @@ class RequestStatsUpdate(msgspec.Struct, NOTE: - We should try to keep the size of this struct minimal by avoiding - keeping references to additional objects if not necessary, especially + keeping references to additional objects that are unnecessary, especially when the referenced object could have been GCed already if not for this reference (examples include per decoded token RequestOutput, EngineCoreOutput, etc.). """ class Type(IntEnum): + """See `RequestStats` for the lifecycle of a request.""" # Request arrived at the engine frontend. ARRIVED = 0 # Input processed by the input processor. @@ -51,7 +52,7 @@ class Type(IntEnum): type: Type # Timestamp when the update is recorded. This is used to record time - # intervals between events. + # intervals between events rather than wall clock time. monotonic_ts_s: float = msgspec_field( default_factory=lambda: time.monotonic()) @@ -59,20 +60,24 @@ class Type(IntEnum): # Metadata associated with the update. ############################################################ # For input_processed. + # NOTE: it's fine to keep a reference to the engine request here + # because there will only be 1 event with this reference for a + # request. We need metadata (e.g. prompt tokens, sampling_param) + # from the request. engine_request: Optional[EngineCoreRequest] = None # For running. - # If the request was already running. - was_running: Optional[bool] = None - # Number of tokens computed. + # If the request was a newly scheduled request running for prefill. + new_prefill: Optional[bool] = None + # Number of tokens computed when scheduled to run. num_computed_tokens: Optional[int] = None - # Number of cached tokens. + # Number of cached tokens when scheduled to run. num_cached_tokens: Optional[int] = None # For decoded. # The perfcounter timestamp for each output token. token_perf_ts_ns: Optional[int] = None - # The number of new output tokens. + # The number of new output tokens generated. num_new_tokens: Optional[int] = None # For both detokenized and decoded. @@ -82,19 +87,21 @@ class Type(IntEnum): @dataclass class RequestStats: - """Stats associated with a request. + """Stats associated with a request (`Request`) A request would go through the following lifecycles upon arriving - the llm engine: + at the llm engine: - Arrival: when the request is first added to the llm engine. - Inputs processed: when the input processor is completed. - - Waiting: added to the waiting queue of the scheduler in the EngineCore. - - Scheduled: when the request is scheduled by the scheduler. + - Queued: added to the waiting queue of the scheduler in the EngineCore. + - Running(prefill): when the request is scheduled by the scheduler + for prefill. - [Preempted]: a request could be temporarily unscheduled by the scheduler - under contention of resources. This will go back to the - waiting queue of the scheduler, and the request will be - scheduled again. - - Finished: a request is finished (aborted or stopped) + under contention of resources. This will go back to the waiting queue + of the scheduler, and the request will be scheduled again. + - Decoding: when a request is in decoding stage, and tokens are generated. + - Detokenized: when tokens are detokenized by the detokenizer. + - Finished: a request is finished. """ ############################################################ @@ -120,15 +127,14 @@ class RequestStats: # Number of tokens computed. num_computed_tokens: int = 0 - # The timestamp when the request was first added to the scheduler, waiting - # in the queue. - waiting_ts_s: Optional[float] = None + # The timestamp when the request become waiting in the queue. + queued_ts_s: Optional[float] = None # When the input processor is completed. input_processor_end_ts_s: Optional[float] = None # A sorted list of timestamps when the request was scheduled to run. - running_ts_s_lst: List[float] = dataclass_field(default_factory=list) + prefill_start_ts_s_lst: List[float] = dataclass_field(default_factory=list) # A sorted list of perf counter timestamps for each output token. output_token_perf_counter_ns_lst: List[int] = dataclass_field( @@ -162,8 +168,10 @@ def num_prompt_tokens(self) -> Optional[int]: if self.engine_request else None) @property - def first_scheduled_ts_s(self) -> Optional[float]: - return self.running_ts_s_lst[0] if self.running_ts_s_lst else None + def prefill_ts_s(self) -> Optional[float]: + """The timestamp when the request started prefilling.""" + return (self.prefill_start_ts_s_lst[-1] + if self.prefill_start_ts_s_lst else None) @property def e2e_latency_s(self) -> Optional[float]: @@ -174,17 +182,21 @@ def e2e_latency_s(self) -> Optional[float]: @property def queue_duration_s(self) -> Optional[float]: - if self.first_scheduled_ts_s is None or self.arrival_ts_s is None: + """How long the request was waiting to run.""" + if self.queued_ts_s is None or self.prefill_ts_s is None: + # Either not queued or not running yet. return None - assert self.first_scheduled_ts_s >= self.arrival_ts_s - return self.first_scheduled_ts_s - self.arrival_ts_s + assert self.queued_ts_s <= self.prefill_ts_s + return self.prefill_ts_s - self.queued_ts_s @property def inference_latency_s(self) -> Optional[float]: - if self.e2e_latency_s is None or self.queue_duration_s is None: + """How long the request was running inference + (prefill and decode).""" + if self.finished_ts_s is None or self.prefill_ts_s is None: return None - assert self.e2e_latency_s >= self.queue_duration_s - return self.e2e_latency_s - self.queue_duration_s + assert self.finished_ts_s >= self.prefill_ts_s + return self.finished_ts_s - self.prefill_ts_s @property def first_token_latency_s(self) -> Optional[float]: @@ -195,10 +207,10 @@ def first_token_latency_s(self) -> Optional[float]: @property def prefill_latency_s(self) -> Optional[float]: - if self.first_token_ts_s is None or self.first_scheduled_ts_s is None: + if self.first_token_ts_s is None or self.prefill_ts_s is None: return None - assert self.first_token_ts_s >= self.first_scheduled_ts_s - return self.first_token_ts_s - self.first_scheduled_ts_s + assert self.first_token_ts_s >= self.prefill_ts_s + return self.first_token_ts_s - self.prefill_ts_s @property def decode_latency_s(self) -> Optional[float]: @@ -242,13 +254,13 @@ def update_from(self, update: "RequestStatsUpdate"): self.input_processor_end_ts_s = ts self.engine_request = update.engine_request elif update.type == RequestStatsUpdate.Type.QUEUED: - self.waiting_ts_s = ts + self.queued_ts_s = ts elif update.type == RequestStatsUpdate.Type.RUNNING: - assert (update.was_running is not None + assert (update.new_prefill is not None and update.num_computed_tokens is not None) self._record_running( update.num_computed_tokens, - update.was_running, + update.new_prefill, ts, update.num_cached_tokens, ) @@ -270,13 +282,14 @@ def update_from(self, update: "RequestStatsUpdate"): def _record_running( self, num_computed_tokens: int, - was_running: bool, + new_prefill: bool, ts_s: float, num_cached_tokens: Optional[int] = None, ): - if not was_running: - # Was preempted or newly run. - self.running_ts_s_lst.append(ts_s) + if new_prefill: + # Was preempted or a newly scheduled request - record the + # prefill start timestamp. + self.prefill_start_ts_s_lst.append(ts_s) self.num_cached_tokens = num_cached_tokens self.num_computed_tokens = num_computed_tokens @@ -291,7 +304,9 @@ def _record_engine_output( # Update if first output token is generated. if len(self.output_token_perf_counter_ns_lst) == 0: self.first_token_ts_s = ts_s - assert self.first_scheduled_ts_s is not None + assert ( + self.prefill_ts_s is not None + ), "Request must be running before generating output tokens." self.output_token_perf_counter_ns_lst.extend([perf_ts_ns] * num_new_tokens) From f6cd34770a8b6d621c448631508ee06e5bd956ee Mon Sep 17 00:00:00 2001 From: rickyx Date: Thu, 5 Dec 2024 00:28:39 +0000 Subject: [PATCH 3/6] up Signed-off-by: rickyx --- vllm/v1/stats/common.py | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/vllm/v1/stats/common.py b/vllm/v1/stats/common.py index a1a6d49d1928..94538a0a7e5d 100644 --- a/vllm/v1/stats/common.py +++ b/vllm/v1/stats/common.py @@ -330,6 +330,11 @@ def _reset_for_preemption(self, ts_s: float): self.model_forward_duration_s = 0.0 self.model_execute_duration_s = 0.0 self.first_token_ts_s = None + # NOTE: the below fields should not be reset: + # - prefill_start_ts_s_lst + # - arrival_ts_s + # - engine_request + # - input_processor_end_ts_s @dataclass @@ -364,28 +369,12 @@ class EngineCoreProcessStats: output_queue_size: Optional[int] = None -class EngineStatsSnapshot(msgspec.Struct, - array_like=True, - omit_defaults=True, - gc=False): +class EngineCoreStatsSnapshot(msgspec.Struct, + array_like=True, + omit_defaults=True, + gc=False): """ - A snapshot of the engine's current stats. - This represents a snapshot of the current engine core's stats over a - period of time. - - A snapshot is created periodically (e.g. every 5 seconds) on the frontend of - the engine, and engine core stats would be gathered from the engine core: - including the current state of the scheduler, the requests updated since - the last snapshot. - - This decouples stats collection from actual processing of the requests such - that: - 1. Stats collection is lightweight and could be aligned with the same - interval as the upper level stats logging (e.g. Prometheus scraping - time, logging interval, etc.). - 2. Stats collection could happen independently of the request processing - so even if no requests were processed, stats would still be propagated - reliably. + A snapshot of the EngineCore's current stats over a period of time. """ # Snapshot of the scheduler stats. From ec684636c5930c47cea808633a8dbd14eb181edc Mon Sep 17 00:00:00 2001 From: rickyx Date: Thu, 12 Dec 2024 23:55:06 +0000 Subject: [PATCH 4/6] updates Signed-off-by: rickyx --- tests/v1/test_stats.py | 252 ++++++++++++++-------------------- vllm/v1/stats/common.py | 296 +++++++++++++++++++++++----------------- 2 files changed, 276 insertions(+), 272 deletions(-) diff --git a/tests/v1/test_stats.py b/tests/v1/test_stats.py index 442d63d3e3e4..17fee514e37e 100644 --- a/tests/v1/test_stats.py +++ b/tests/v1/test_stats.py @@ -1,12 +1,41 @@ -from typing import Optional - +from typing import Any import pytest - from vllm.sampling_params import SamplingParams -from vllm.v1.engine import EngineCoreRequest from vllm.v1.stats.common import RequestStats, RequestStatsUpdate +def test_invalid_request_update(): + request_id = "test_request" + update_specific_required_fields = { + RequestStatsUpdate.Type.INPUT_PROCESSED: [ + "sampling_params", + "num_prompt_tokens", + ], + RequestStatsUpdate.Type.PREFILLING: [ + "num_computed_tokens", + "num_cached_tokens", + ], + RequestStatsUpdate.Type.DETOKENIZED: ["num_new_tokens"], + RequestStatsUpdate.Type.FINISHED: ["finish_reason"], + } + + # Missing a required field should raise an assertion error. + for update_type in RequestStatsUpdate.Type: + required_fields = update_specific_required_fields.get(update_type, []) + + # Try to miss one of the required fields. + kwargs = {field: object() for field in required_fields} + for field in required_fields: + copy_kwargs = kwargs.copy() + copy_kwargs.pop(field) + with pytest.raises(ValueError): + RequestStatsUpdate( + request_id=request_id, + type=update_type, + **copy_kwargs, + ) + + def test_lifecycle_updates(): request_id = "test_request" stats = RequestStats(request_id=request_id) @@ -15,15 +44,16 @@ def test_lifecycle_updates(): arrived_ts = 0 input_processed_ts = 1 queued_ts = 2 - running_ts = 3 - running_2_ts = 4 + prefilling_ts = 3 decoded_ts = 5 detokenized_ts = 6 decoded_2_ts = 7 - preempted_ts = 8 - resumed_ts = 9 - decoded_3_ts = 10 - finished_ts = 11 + detokenized_2_ts = 8 + preempted_ts = 9 + resumed_ts = 10 + decoded_3_ts = 11 + detokenized_3_ts = 12 + finished_ts = 13 # Test ARRIVED arrived_update = RequestStatsUpdate( @@ -37,26 +67,15 @@ def test_lifecycle_updates(): # Test INPUT_PROCESSED sampling_params = SamplingParams(n=1) - engine_request = EngineCoreRequest( - prompt_token_ids=[1, 2, 3, 4, 5, 6], - sampling_params=sampling_params, - request_id=request_id, - prompt="test_prompt", - mm_inputs=None, - mm_placeholders=None, - eos_token_id=None, - arrival_time=arrived_ts, - lora_request=None, - ) input_processed_update = RequestStatsUpdate( request_id=request_id, type=RequestStatsUpdate.Type.INPUT_PROCESSED, monotonic_ts_s=input_processed_ts, - engine_request=engine_request, + sampling_params=sampling_params, + num_prompt_tokens=6, ) stats.update_from(input_processed_update) assert stats.input_processor_end_ts_s == input_processed_ts - assert stats.engine_request == engine_request assert stats.last_updated_ts_s == input_processed_ts assert stats.num_prompt_tokens == 6 assert stats.sampling_params == sampling_params @@ -74,78 +93,61 @@ def test_lifecycle_updates(): assert stats.queued_ts_s == queued_ts assert stats.last_updated_ts_s == queued_ts - # Test RUNNING - running_update = RequestStatsUpdate( + # Test PREFILLING + prefilling_update = RequestStatsUpdate( request_id=request_id, - type=RequestStatsUpdate.Type.RUNNING, - monotonic_ts_s=running_ts, - new_prefill=True, + type=RequestStatsUpdate.Type.PREFILLING, + monotonic_ts_s=prefilling_ts, num_computed_tokens=3, num_cached_tokens=1, ) - stats.update_from(running_update) - assert stats.prefill_ts_s == running_ts + stats.update_from(prefilling_update) + assert stats.prefill_ts_s == prefilling_ts assert stats.num_computed_tokens == 3 assert stats.num_cached_tokens == 1 - assert stats.queue_duration_s == running_ts - queued_ts - - # Test RUNNING again shouldn't update prefill_ts_s - running_update = RequestStatsUpdate( - request_id=request_id, - type=RequestStatsUpdate.Type.RUNNING, - monotonic_ts_s=running_2_ts, - new_prefill=False, - num_computed_tokens=6, - num_cached_tokens=0, - ) - stats.update_from(running_update) - assert stats.prefill_ts_s == running_ts - assert stats.num_computed_tokens == 6 - # num_cached_tokens is not updated - assert stats.num_cached_tokens == 1 - assert stats.last_updated_ts_s == running_2_ts - # prefill_start_ts_s_lst should only contain the first running/resumed - # running prefill update. - assert stats.prefill_start_ts_s_lst == [ - running_ts, - ] + assert stats.queue_duration_s == prefilling_ts - queued_ts - # Test DECODED + # Test DECODING decoded_update = RequestStatsUpdate( request_id=request_id, - type=RequestStatsUpdate.Type.DECODED, + type=RequestStatsUpdate.Type.DECODING, monotonic_ts_s=decoded_ts, - num_new_tokens=1, - token_perf_ts_ns=decoded_ts * 1e9, ) stats.update_from(decoded_update) assert stats.last_updated_ts_s == decoded_ts - # Since arrival - assert stats.first_token_latency_s == decoded_ts - arrived_ts - assert stats.num_output_tokens == 1 - # Since first scheduled - assert stats.prefill_latency_s == 2 # Test DETOKENIZED detokenized_update = RequestStatsUpdate( request_id=request_id, type=RequestStatsUpdate.Type.DETOKENIZED, monotonic_ts_s=detokenized_ts, + num_new_tokens=1, ) stats.update_from(detokenized_update) assert stats.last_updated_ts_s == detokenized_ts + assert stats.num_output_tokens == 1 + # Since arrival + assert stats.first_token_latency_s == detokenized_ts - arrived_ts + # Since first scheduled + assert stats.prefill_latency_s == detokenized_ts - prefilling_ts - # Test another DECODE should yield correct inter token latency + # Test another DECODING and DETOKENIZED should yield correct inter token latency decoded_update = RequestStatsUpdate( request_id=request_id, - type=RequestStatsUpdate.Type.DECODED, + type=RequestStatsUpdate.Type.DECODING, monotonic_ts_s=decoded_2_ts, - num_new_tokens=1, - token_perf_ts_ns=decoded_2_ts * 1e9, ) stats.update_from(decoded_update) + + detokenized_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DETOKENIZED, + monotonic_ts_s=detokenized_2_ts, + num_new_tokens=1, + ) + stats.update_from(detokenized_update) assert stats.output_token_latency_s_lst == [ - decoded_2_ts - decoded_ts, + detokenized_2_ts - detokenized_ts, ] assert stats.num_output_tokens == 2 @@ -161,121 +163,73 @@ def test_lifecycle_updates(): # States should be reset assert stats.num_computed_tokens == 0 assert stats.num_cached_tokens == 0 - assert stats.num_output_tokens == 0 - assert stats.output_token_latency_s_lst == [] + # These states should not be reset + assert stats.num_output_tokens == 2 + assert stats.output_token_latency_s_lst == [ + detokenized_2_ts - detokenized_ts, + ] + assert stats.prefill_latency_s == prefilling_ts - arrived_ts + assert stats.num_prompt_tokens == 6 + assert stats.prefill_start_ts_s_lst == [prefilling_ts] # Test resumed resumed_update = RequestStatsUpdate( request_id=request_id, - type=RequestStatsUpdate.Type.RUNNING, + type=RequestStatsUpdate.Type.PREFILLING, monotonic_ts_s=resumed_ts, - new_prefill=True, num_computed_tokens=6, num_cached_tokens=2, ) stats.update_from(resumed_update) - # Resumed prefill timestamp should be updated - assert stats.prefill_ts_s == resumed_ts + # prefill timestamp should not be updated since it's a resumed prefill + assert stats.prefill_ts_s == prefilling_ts assert stats.num_computed_tokens == 6 assert stats.num_cached_tokens == 2 assert stats.prefill_start_ts_s_lst == [ - running_ts, + prefilling_ts, resumed_ts, ] assert stats.last_updated_ts_s == resumed_ts - # Test another DECODED should yield correct first token latency. + # Test another DECODED/DETOKENIZED should yield correct first token latency. decoded_update = RequestStatsUpdate( request_id=request_id, - type=RequestStatsUpdate.Type.DECODED, + type=RequestStatsUpdate.Type.DECODING, monotonic_ts_s=decoded_3_ts, + ) + detokenized_update = RequestStatsUpdate( + request_id=request_id, + type=RequestStatsUpdate.Type.DETOKENIZED, + monotonic_ts_s=detokenized_3_ts, num_new_tokens=1, - token_perf_ts_ns=decoded_3_ts * 1e9, ) stats.update_from(decoded_update) - assert stats.first_token_ts_s == decoded_3_ts - assert stats.num_output_tokens == 1 + stats.update_from(detokenized_update) + assert stats.first_token_ts_s == detokenized_ts - arrived_ts + assert stats.num_output_tokens == 3 + assert stats.output_token_latency_s_lst == [ + detokenized_2_ts - detokenized_ts, + detokenized_3_ts - detokenized_2_ts, + ] # Test FINISHED finished_update = RequestStatsUpdate( request_id=request_id, - type=RequestStatsUpdate.Type.DETOKENIZED, + type=RequestStatsUpdate.Type.FINISHED, monotonic_ts_s=finished_ts, finish_reason="test_reason", ) stats.update_from(finished_update) assert stats.last_updated_ts_s == finished_ts assert stats.e2e_latency_s == finished_ts - arrived_ts - assert stats.inference_latency_s == finished_ts - resumed_ts - assert stats.decode_latency_s == finished_ts - decoded_3_ts + assert stats.inference_latency_s == finished_ts - prefilling_ts + assert stats.prefill_latency_s == detokenized_ts - prefilling_ts + assert stats.decode_latency_s == finished_ts - detokenized_ts + assert stats.first_token_latency_s == detokenized_ts - arrived_ts + assert stats.queue_duration_s == prefilling_ts - queued_ts assert stats.is_finished assert stats.finish_reason == "test_reason" - -@pytest.mark.parametrize("finish_reason", - ["test-decode", "test-detokenize", None]) -def test_finish_reason(finish_reason: Optional[str]): - """ - Test that a request could be finished when decoded and detokenized - at different times. - """ - request_id = "test_request" - r = RequestStats(request_id=request_id) - - # Test FINISHED - updates = [ - RequestStatsUpdate( - request_id=request_id, - type=RequestStatsUpdate.Type.ARRIVED, - monotonic_ts_s=0, - ), - RequestStatsUpdate( - request_id=request_id, - type=RequestStatsUpdate.Type.INPUT_PROCESSED, - monotonic_ts_s=1, - ), - RequestStatsUpdate( - request_id=request_id, - type=RequestStatsUpdate.Type.QUEUED, - monotonic_ts_s=2, - ), - RequestStatsUpdate( - request_id=request_id, - type=RequestStatsUpdate.Type.RUNNING, - new_prefill=True, - monotonic_ts_s=3, - num_computed_tokens=3, - ), - ] - - if finish_reason is not None: - if finish_reason == "test-decode": - updates.append( - RequestStatsUpdate( - request_id=request_id, - type=RequestStatsUpdate.Type.DECODED, - monotonic_ts_s=4, - finish_reason=finish_reason, - token_perf_ts_ns=4 * 1e9, - num_new_tokens=1, - )) - elif finish_reason == "test-detokenize": - updates.append( - RequestStatsUpdate( - request_id=request_id, - type=RequestStatsUpdate.Type.DETOKENIZED, - monotonic_ts_s=4, - finish_reason=finish_reason, - )) - - for update in updates: - r.update_from(update) - - if finish_reason is not None: - assert r.finish_reason == finish_reason - assert r.is_finished - assert r.e2e_latency_s == 4 - else: - assert r.finish_reason is None - assert not r.is_finished - assert r.e2e_latency_s is None + # TODO(rickyx): Add model forward/execute time. + assert stats.model_forward_duration_s == 0.0 + assert stats.model_execute_duration_s == 0.0 diff --git a/vllm/v1/stats/common.py b/vllm/v1/stats/common.py index 94538a0a7e5d..1d7bd1308b47 100644 --- a/vllm/v1/stats/common.py +++ b/vllm/v1/stats/common.py @@ -2,17 +2,12 @@ from dataclasses import dataclass from dataclasses import field as dataclass_field from enum import IntEnum -from typing import List, Optional +from typing import ClassVar, Dict, List, Optional, Set import msgspec from msgspec import field as msgspec_field from vllm.sampling_params import SamplingParams -from vllm.v1.engine import EngineCoreRequest - - -def ns_to_s(ns: int) -> float: - return ns / 1e9 class RequestStatsUpdate(msgspec.Struct, @@ -22,30 +17,94 @@ class RequestStatsUpdate(msgspec.Struct, """ An update to the request stats. - NOTE: - - We should try to keep the size of this struct minimal by avoiding - keeping references to additional objects that are unnecessary, especially - when the referenced object could have been GCed already if not for - this reference (examples include per decoded token RequestOutput, - EngineCoreOutput, etc.). + This represents a stats update at a specific timestamp with metadata + associated with the update. + + NOTE: since there might be multiple processes generating updates at + different parts of the engine (e.g. input processor, scheduler, engine core, + etc.), we use the monotonic timestamp to record the update to compute any + intervals, and explicit wall-clock timestamp should be used for timestamps. + + WARNING: This assumes stats are generated in a single machine. If there are + potentially multiple machines, one should always generate the stats updates + on one single machine or use something else. """ class Type(IntEnum): """See `RequestStats` for the lifecycle of a request.""" + # Request arrived at the engine frontend. ARRIVED = 0 # Input processed by the input processor. INPUT_PROCESSED = 1 # Queued on the engine core. QUEUED = 2 - # Scheduled running by the scheduler. - RUNNING = 3 + # Scheduled running prefill by the scheduler. + # A request could be running a new prefill on the prompt tokens or + # a resumed prefill on the original prefill tokens + generated output + # tokens before preemption. + PREFILLING = 3 # Preempted by the scheduler. PREEMPTED = 4 - # Token decoded by the engine. - DECODED = 5 + # Output token is generated by the engine core. + DECODING = 5 # Token detokenized by the detokenizer. + # We will record the timestamp for each output token, as well as the + # finish reason. DETOKENIZED = 6 + # Request finishes (or aborts). + FINISHED = 7 + + """ + Valid state updates: + ARRIVED + │ + ├──────► INPUT_PROCESSED ──────► QUEUED ──────► PREFILLING ◄────┐ + │ │ │ │ │ + │ │ │ ▼ │ + │ │ │ -──► DECODING │ + │ │ │ | │ │ + │ │ │ | ▼ │ + │ │ │ └─ DETOKENIZED │ + │ │ │ │ │ + │ │ │ ▼ │ + │ ▼ ▼ PREEMPTED ◄──────┘ + │ │ │ │ + └──────────────┴───────────────────┴──────────────┴ + │ + ▼ + FINISHED (All could go to FINISHED) + """ + _VALID_TRANSITIONS: ClassVar[Dict[Type, Set[Type]]] = { + Type.ARRIVED: { + Type.INPUT_PROCESSED, + Type.FINISHED, + }, + Type.INPUT_PROCESSED: { + Type.QUEUED, + Type.FINISHED, + }, + Type.QUEUED: { + Type.PREFILLING, + Type.FINISHED, + }, + Type.PREFILLING: { + Type.DECODING, + Type.PREEMPTED, + Type.FINISHED, + }, + Type.DECODING: { + Type.DETOKENIZED, + Type.FINISHED, + }, + Type.DETOKENIZED: { + Type.DECODING, + Type.PREEMPTED, + Type.FINISHED, + }, + Type.PREEMPTED: {Type.PREFILLING, Type.FINISHED}, + Type.FINISHED: set(), + } request_id: str @@ -59,24 +118,17 @@ class Type(IntEnum): ############################################################ # Metadata associated with the update. ############################################################ - # For input_processed. - # NOTE: it's fine to keep a reference to the engine request here - # because there will only be 1 event with this reference for a - # request. We need metadata (e.g. prompt tokens, sampling_param) - # from the request. - engine_request: Optional[EngineCoreRequest] = None + # For input_processed. Metadata needed for stats logging. + num_prompt_tokens: Optional[int] = None + sampling_params: Optional[SamplingParams] = None # For running. - # If the request was a newly scheduled request running for prefill. - new_prefill: Optional[bool] = None # Number of tokens computed when scheduled to run. num_computed_tokens: Optional[int] = None # Number of cached tokens when scheduled to run. num_cached_tokens: Optional[int] = None # For decoded. - # The perfcounter timestamp for each output token. - token_perf_ts_ns: Optional[int] = None # The number of new output tokens generated. num_new_tokens: Optional[int] = None @@ -84,32 +136,32 @@ class Type(IntEnum): # Finished reason. finish_reason: Optional[str] = None + # Non-optional fields for each update type. + _REQUIRED_FIELDS: ClassVar[Dict[Type, List[str]]] = { + Type.INPUT_PROCESSED: ["num_prompt_tokens", "sampling_params"], + Type.PREFILLING: ["num_computed_tokens", "num_cached_tokens"], + Type.DETOKENIZED: ["num_new_tokens"], + Type.FINISHED: ["finish_reason"], + } + + def __post_init__(self): + required_fields = self._REQUIRED_FIELDS.get(self.type, []) + for field in required_fields: + if getattr(self, field) is None: + raise ValueError( + f"Field {field} is required for update type {self.type}.") + @dataclass class RequestStats: - """Stats associated with a request (`Request`) - - A request would go through the following lifecycles upon arriving - at the llm engine: - - Arrival: when the request is first added to the llm engine. - - Inputs processed: when the input processor is completed. - - Queued: added to the waiting queue of the scheduler in the EngineCore. - - Running(prefill): when the request is scheduled by the scheduler - for prefill. - - [Preempted]: a request could be temporarily unscheduled by the scheduler - under contention of resources. This will go back to the waiting queue - of the scheduler, and the request will be scheduled again. - - Decoding: when a request is in decoding stage, and tokens are generated. - - Detokenized: when tokens are detokenized by the detokenizer. - - Finished: a request is finished. - """ + """Stats associated with a request (`Request`).""" ############################################################ # Metadata ############################################################ request_id: str - # The original request object from the engine core. - engine_request: Optional[EngineCoreRequest] = None + sampling_params: Optional[SamplingParams] = None + num_prompt_tokens: Optional[int] = None ############################################################ # Metrics and Stats @@ -117,6 +169,9 @@ class RequestStats: # Timestamp when the request was last updated. last_updated_ts_s: Optional[float] = None + # Last update stats type. + last_update_type: Optional[RequestStatsUpdate.Type] = None + # Timestamp when the request arrived at the llm engine. arrival_ts_s: Optional[float] = None @@ -133,12 +188,19 @@ class RequestStats: # When the input processor is completed. input_processor_end_ts_s: Optional[float] = None - # A sorted list of timestamps when the request was scheduled to run. + # A sorted list of timestamps when the request was scheduled to prefill. + # This could be when: + # 1. the request is newly scheduled, so it's a new prefill. + # 2. the request was preempted and resumed. It is equivalent to running + # a prefill of the original prefill tokens + generated output tokens + # before preemption. prefill_start_ts_s_lst: List[float] = dataclass_field(default_factory=list) - # A sorted list of perf counter timestamps for each output token. - output_token_perf_counter_ns_lst: List[int] = dataclass_field( - default_factory=list) + # A list of timestamps when a token is decoded by the engine core. + decoding_ts_s_lst: List[float] = dataclass_field(default_factory=list) + + # A sorted list of timestamps for each output token. + output_token_ts_s_lst: List[float] = dataclass_field(default_factory=list) # First token's timestamp. first_token_ts_s: Optional[float] = None @@ -151,6 +213,10 @@ class RequestStats: # A sorted list of timestamps when the request was preempted at the # scheduler. + # TODO(rickyx): right now, we don't actually have a good high-level + # metric to measure the impact of preemption other than observation of + # large P99 TPOT. Ideally we could quantify the impact of preemption by + # measuring the number of tokens re-computed due to preemption. preempted_ts_s_lst: List[float] = dataclass_field(default_factory=list) # Timestamp when the request was finished at the engine core. @@ -162,15 +228,13 @@ class RequestStats: ############################################################ # Derived properties. ############################################################ - @property - def num_prompt_tokens(self) -> Optional[int]: - return (len(self.engine_request.prompt_token_ids) - if self.engine_request else None) - @property def prefill_ts_s(self) -> Optional[float]: - """The timestamp when the request started prefilling.""" - return (self.prefill_start_ts_s_lst[-1] + """The timestamp when the request started prefilling. + Since a request could be preempted in decoding and later resumed + to prefill the decoded tokens, we use the first prefill start timestamp. + """ + return (self.prefill_start_ts_s_lst[0] if self.prefill_start_ts_s_lst else None) @property @@ -221,120 +285,106 @@ def decode_latency_s(self) -> Optional[float]: @property def output_token_latency_s_lst(self) -> List[float]: - if len(self.output_token_perf_counter_ns_lst) == 0: + if len(self.output_token_ts_s_lst) == 0: return [] latency_s_lst = [] - for i in range(1, len(self.output_token_perf_counter_ns_lst)): - assert (self.output_token_perf_counter_ns_lst[i] >= - self.output_token_perf_counter_ns_lst[i - 1]) - latency_s = ns_to_s(self.output_token_perf_counter_ns_lst[i] - - self.output_token_perf_counter_ns_lst[i - 1]) + for i in range(1, len(self.output_token_ts_s_lst)): + assert (self.output_token_ts_s_lst[i] >= + self.output_token_ts_s_lst[i - 1]) + latency_s = (self.output_token_ts_s_lst[i] - + self.output_token_ts_s_lst[i - 1]) latency_s_lst.append(latency_s) return latency_s_lst @property def num_output_tokens(self) -> int: - return len(self.output_token_perf_counter_ns_lst) + return len(self.output_token_ts_s_lst) @property def is_finished(self) -> bool: return self.finished_ts_s is not None - @property - def sampling_params(self) -> Optional[SamplingParams]: - return (self.engine_request.sampling_params - if self.engine_request else None) - def update_from(self, update: "RequestStatsUpdate"): + self._check_valid_update(update) ts = update.monotonic_ts_s self.last_updated_ts_s = ts + self.last_update_type = update.type if update.type == RequestStatsUpdate.Type.ARRIVED: self.arrival_ts_s = ts elif update.type == RequestStatsUpdate.Type.INPUT_PROCESSED: self.input_processor_end_ts_s = ts - self.engine_request = update.engine_request + self.sampling_params = update.sampling_params + self.num_prompt_tokens = update.num_prompt_tokens elif update.type == RequestStatsUpdate.Type.QUEUED: self.queued_ts_s = ts - elif update.type == RequestStatsUpdate.Type.RUNNING: - assert (update.new_prefill is not None - and update.num_computed_tokens is not None) - self._record_running( - update.num_computed_tokens, - update.new_prefill, - ts, - update.num_cached_tokens, - ) + elif update.type == RequestStatsUpdate.Type.PREFILLING: + self.prefill_start_ts_s_lst.append(ts) + self.num_cached_tokens = update.num_cached_tokens + self.num_computed_tokens = update.num_computed_tokens elif update.type == RequestStatsUpdate.Type.PREEMPTED: self._reset_for_preemption(ts) - elif update.type == RequestStatsUpdate.Type.DECODED: - assert update.token_perf_ts_ns is not None - self._record_engine_output( + elif update.type == RequestStatsUpdate.Type.DECODING: + self.decoding_ts_s_lst.append(ts) + elif update.type == RequestStatsUpdate.Type.DETOKENIZED: + self._record_detokenized_output( ts, - update.token_perf_ts_ns, update.num_new_tokens, - update.finish_reason, ) - elif update.type == RequestStatsUpdate.Type.DETOKENIZED: - self._record_request_output(update.finish_reason, ts) + elif update.type == RequestStatsUpdate.Type.FINISHED: + self.finished_ts_s = ts + self.finish_reason = update.finish_reason else: raise ValueError(f"Unknown update type: {update.type}") - def _record_running( - self, - num_computed_tokens: int, - new_prefill: bool, - ts_s: float, - num_cached_tokens: Optional[int] = None, - ): - if new_prefill: - # Was preempted or a newly scheduled request - record the - # prefill start timestamp. - self.prefill_start_ts_s_lst.append(ts_s) - self.num_cached_tokens = num_cached_tokens - - self.num_computed_tokens = num_computed_tokens - - def _record_engine_output( + def _check_valid_update(self, update: "RequestStatsUpdate"): + if self.last_update_type is None: + assert update.type == RequestStatsUpdate.Type.ARRIVED + else: + valid_cur_update_types = RequestStatsUpdate._VALID_TRANSITIONS[ + self.last_update_type] + assert update.type in valid_cur_update_types, ( + f"Invalid update type: {update.type} for last_update_type: " + f"{self.last_update_type}.") + + assert (self.last_updated_ts_s is None + or update.monotonic_ts_s >= self.last_updated_ts_s), ( + "Update timestamp must be monotonically increasing, but " + f"last_updated_ts_s={self.last_updated_ts_s} and " + f"update.monotonic_ts_s={update.monotonic_ts_s}.") + + def _record_detokenized_output( self, ts_s: float, - perf_ts_ns: int, num_new_tokens: int, - finish_reason: Optional[str], ): # Update if first output token is generated. - if len(self.output_token_perf_counter_ns_lst) == 0: + if len(self.output_token_ts_s_lst) == 0: self.first_token_ts_s = ts_s assert ( self.prefill_ts_s is not None ), "Request must be running before generating output tokens." - self.output_token_perf_counter_ns_lst.extend([perf_ts_ns] * - num_new_tokens) - - # Update if the request is finished. - if finish_reason is not None: - self.finished_ts_s = ts_s - self.finish_reason = finish_reason - - def _record_request_output(self, finish_reason: Optional[str], - ts_s: float): - if finish_reason is not None and self.finished_ts_s is None: - self.finished_ts_s = ts_s - self.finish_reason = finish_reason + # Some X new tokens were generated at the ts. + self.output_token_ts_s_lst.extend([ts_s] * num_new_tokens) def _reset_for_preemption(self, ts_s: float): self.preempted_ts_s_lst.append(ts_s) + # Reset the computed tokens since it might restart the prefill. self.num_computed_tokens = 0 + # Cached token count might also change when resumed. self.num_cached_tokens = 0 - self.output_token_perf_counter_ns_lst.clear() - self.model_forward_duration_s = 0.0 - self.model_execute_duration_s = 0.0 - self.first_token_ts_s = None - # NOTE: the below fields should not be reset: - # - prefill_start_ts_s_lst + # These stats don't change since they happen before request running. # - arrival_ts_s - # - engine_request # - input_processor_end_ts_s + # - sampling_params + # - num_prompt_tokens + # - first_token_ts_s + # + # These stats are accumulated over preemptions: + # - output_token_ts_s_lst + # - prefill_start_ts_s_lst (after preemption, it will prefill the + # original prefill tokens and any output tokens generated before + # preemption.) @dataclass From e940b4bbe5cb0045b38d20ca47388f8705e11328 Mon Sep 17 00:00:00 2001 From: rickyx Date: Thu, 12 Dec 2024 23:57:35 +0000 Subject: [PATCH 5/6] style Signed-off-by: rickyx --- tests/v1/test_stats.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/v1/test_stats.py b/tests/v1/test_stats.py index 17fee514e37e..baac32d2f85b 100644 --- a/tests/v1/test_stats.py +++ b/tests/v1/test_stats.py @@ -1,5 +1,5 @@ -from typing import Any import pytest + from vllm.sampling_params import SamplingParams from vllm.v1.stats.common import RequestStats, RequestStatsUpdate @@ -131,7 +131,8 @@ def test_lifecycle_updates(): # Since first scheduled assert stats.prefill_latency_s == detokenized_ts - prefilling_ts - # Test another DECODING and DETOKENIZED should yield correct inter token latency + # Test another DECODING and DETOKENIZED should + # yield correct inter token latency decoded_update = RequestStatsUpdate( request_id=request_id, type=RequestStatsUpdate.Type.DECODING, From 1ba7e05d8b4ad218413facc94b73c9a7078d1df3 Mon Sep 17 00:00:00 2001 From: rickyx Date: Fri, 13 Dec 2024 01:38:27 +0000 Subject: [PATCH 6/6] tests Signed-off-by: rickyx --- tests/v1/test_stats.py | 64 +++++++++++++++++++++++++++++++++++++++++ vllm/v1/stats/common.py | 40 +++++++++++++++----------- 2 files changed, 87 insertions(+), 17 deletions(-) diff --git a/tests/v1/test_stats.py b/tests/v1/test_stats.py index baac32d2f85b..580392ac5f44 100644 --- a/tests/v1/test_stats.py +++ b/tests/v1/test_stats.py @@ -4,6 +4,31 @@ from vllm.v1.stats.common import RequestStats, RequestStatsUpdate +def make_update( + request_id: str, + update_type: RequestStatsUpdate.Type, + monotonic_ts_s: float, + **kwargs, +): + if update_type == RequestStatsUpdate.Type.INPUT_PROCESSED: + kwargs.setdefault("sampling_params", SamplingParams(n=1)) + kwargs.setdefault("num_prompt_tokens", 10) + elif update_type == RequestStatsUpdate.Type.PREFILLING: + kwargs.setdefault("num_computed_tokens", 10) + kwargs.setdefault("num_cached_tokens", 10) + elif update_type == RequestStatsUpdate.Type.DETOKENIZED: + kwargs.setdefault("num_new_tokens", 10) + elif update_type == RequestStatsUpdate.Type.FINISHED: + kwargs.setdefault("finish_reason", "test_reason") + + return RequestStatsUpdate( + request_id=request_id, + type=update_type, + monotonic_ts_s=monotonic_ts_s, + **kwargs, + ) + + def test_invalid_request_update(): request_id = "test_request" update_specific_required_fields = { @@ -36,6 +61,45 @@ def test_invalid_request_update(): ) +def test_invalid_request_update_transition(): + # Test invalid transition type. + for src in RequestStatsUpdate.Type: + for dst in RequestStatsUpdate.Type: + if dst not in RequestStatsUpdate._VALID_TRANSITIONS[src]: + with pytest.raises(AssertionError): + RequestStatsUpdate.check_valid_update( + make_update( + update_type=dst, + request_id="test_request", + monotonic_ts_s=1, + ), + last_update_type=src, + last_updated_ts_s=0, + ) + else: + RequestStatsUpdate.check_valid_update( + make_update( + request_id="test_request", + update_type=dst, + monotonic_ts_s=1, + ), + last_update_type=src, + last_updated_ts_s=0, + ) + + # Test invalid timestamp. + with pytest.raises(AssertionError): + RequestStatsUpdate.check_valid_update( + make_update( + request_id="test_request", + update_type=RequestStatsUpdate.Type.ARRIVED, + monotonic_ts_s=1, + ), + last_update_type=None, + last_updated_ts_s=2, + ) + + def test_lifecycle_updates(): request_id = "test_request" stats = RequestStats(request_id=request_id) diff --git a/vllm/v1/stats/common.py b/vllm/v1/stats/common.py index 1d7bd1308b47..099d82c5904c 100644 --- a/vllm/v1/stats/common.py +++ b/vllm/v1/stats/common.py @@ -151,6 +151,27 @@ def __post_init__(self): raise ValueError( f"Field {field} is required for update type {self.type}.") + @staticmethod + def check_valid_update( + update: "RequestStatsUpdate", + last_update_type: Optional[Type], + last_updated_ts_s: Optional[float], + ): + if last_update_type is None: + assert update.type == RequestStatsUpdate.Type.ARRIVED + else: + valid_cur_update_types = RequestStatsUpdate._VALID_TRANSITIONS[ + last_update_type] + assert update.type in valid_cur_update_types, ( + f"Invalid update type: {update.type} for last_update_type: " + f"{last_update_type}.") + + if last_updated_ts_s is not None: + assert update.monotonic_ts_s >= last_updated_ts_s, ( + "Update timestamp must be monotonically increasing, but " + f"last_updated_ts_s={last_updated_ts_s} and " + f"update.monotonic_ts_s={update.monotonic_ts_s}.") + @dataclass class RequestStats: @@ -305,7 +326,8 @@ def is_finished(self) -> bool: return self.finished_ts_s is not None def update_from(self, update: "RequestStatsUpdate"): - self._check_valid_update(update) + RequestStatsUpdate.check_valid_update(update, self.last_update_type, + self.last_updated_ts_s) ts = update.monotonic_ts_s self.last_updated_ts_s = ts self.last_update_type = update.type @@ -336,22 +358,6 @@ def update_from(self, update: "RequestStatsUpdate"): else: raise ValueError(f"Unknown update type: {update.type}") - def _check_valid_update(self, update: "RequestStatsUpdate"): - if self.last_update_type is None: - assert update.type == RequestStatsUpdate.Type.ARRIVED - else: - valid_cur_update_types = RequestStatsUpdate._VALID_TRANSITIONS[ - self.last_update_type] - assert update.type in valid_cur_update_types, ( - f"Invalid update type: {update.type} for last_update_type: " - f"{self.last_update_type}.") - - assert (self.last_updated_ts_s is None - or update.monotonic_ts_s >= self.last_updated_ts_s), ( - "Update timestamp must be monotonically increasing, but " - f"last_updated_ts_s={self.last_updated_ts_s} and " - f"update.monotonic_ts_s={update.monotonic_ts_s}.") - def _record_detokenized_output( self, ts_s: float,