Skip to content

Commit be9541a

Browse files
authored
[CI] add metrics case (#5115)
* add case * add case
1 parent 24e9e2d commit be9541a

File tree

2 files changed

+241
-0
lines changed

2 files changed

+241
-0
lines changed

.github/workflows/_pre_ce_test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ jobs:
8282
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
8383
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
8484
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
85+
FD_CONTROLLER_PORT=$((8018 + DEVICE_PORT * 100))
8586
FD_ZMQ_RECV_REQUEST_SERVER_PORT=$((8048 + DEVICE_PORT * 100))
8687
FD_ZMQ_SEND_RESPONSE_SERVER_PORT=$((8038 + DEVICE_PORT * 100))
8788
FD_ZMQ_CONTROL_CMD_SERVER_PORTS=$((8028 + DEVICE_PORT * 100))
@@ -92,6 +93,7 @@ jobs:
9293
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
9394
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
9495
echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}"
96+
echo "FD_CONTROLLER_PORT=${FD_CONTROLLER_PORT}"
9597
echo "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}"
9698
echo "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}"
9799
echo "FD_ZMQ_CONTROL_CMD_SERVER_PORTS=${FD_ZMQ_CONTROL_CMD_SERVER_PORTS}"
@@ -143,6 +145,7 @@ jobs:
143145
-e "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" \
144146
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
145147
-e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \
148+
-e "FD_CONTROLLER_PORT=${FD_CONTROLLER_PORT}" \
146149
-e "FLASK_PORT=${FLASK_PORT}" \
147150
-e "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}" \
148151
-e "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" \
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
import asyncio
2+
import os
3+
import shutil
4+
import signal
5+
import subprocess
6+
import sys
7+
import time
8+
9+
import httpx
10+
import pytest
11+
import requests
12+
13+
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
14+
sys.path.insert(0, tests_dir)
15+
16+
from e2e.utils.serving_utils import (
17+
FD_API_PORT,
18+
FD_CACHE_QUEUE_PORT,
19+
FD_ENGINE_QUEUE_PORT,
20+
FD_METRICS_PORT,
21+
clean_ports,
22+
is_port_open,
23+
)
24+
25+
26+
@pytest.fixture(scope="session", autouse=True)
27+
def setup_and_run_server():
28+
"""
29+
Pytest fixture that runs once per test session:
30+
- Cleans ports before tests
31+
- Starts the API server as a subprocess
32+
- Waits for server port to open (up to 30 seconds)
33+
- Tears down server after all tests finish
34+
"""
35+
print("Pre-test port cleanup...")
36+
FD_CONTROLLER_PORT = int(os.getenv("FD_CONTROLLER_PORT", 8333))
37+
clean_ports([FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT, FD_CONTROLLER_PORT])
38+
39+
env = os.environ.copy()
40+
env["CUDA_VISIBLE_DEVICES"] = "0,1"
41+
env["ENABLE_V1_KVCACHE_SCHEDULER"] = "1"
42+
43+
base_path = os.getenv("MODEL_PATH")
44+
if base_path:
45+
model_path = os.path.join(base_path, "TP2")
46+
else:
47+
model_path = "./TP2"
48+
49+
log_path = "server.log"
50+
cmd = [
51+
sys.executable,
52+
"-m",
53+
"fastdeploy.entrypoints.openai.api_server",
54+
"--model",
55+
model_path,
56+
"--port",
57+
str(FD_API_PORT),
58+
"--tensor-parallel-size",
59+
"2",
60+
"--engine-worker-queue-port",
61+
str(FD_ENGINE_QUEUE_PORT),
62+
"--metrics-port",
63+
str(FD_METRICS_PORT),
64+
"--cache-queue-port",
65+
str(FD_CACHE_QUEUE_PORT),
66+
"--controller-port",
67+
str(FD_CONTROLLER_PORT),
68+
"--max-model-len",
69+
"32768",
70+
"--max-num-seqs",
71+
"1",
72+
"--quantization",
73+
"wint8",
74+
"--gpu-memory-utilization",
75+
"0.9",
76+
"--load-strategy",
77+
"ipc_snapshot",
78+
"--dynamic-load-weight",
79+
]
80+
81+
# Start subprocess in new process group
82+
# 清除log目录
83+
if os.path.exists("log"):
84+
shutil.rmtree("log")
85+
with open(log_path, "w") as logfile:
86+
process = subprocess.Popen(
87+
cmd,
88+
stdout=logfile,
89+
stderr=subprocess.STDOUT,
90+
start_new_session=True, # Enables killing full group via os.killpg
91+
env=env,
92+
)
93+
94+
# Wait up to 300 seconds for API server to be ready
95+
for _ in range(300):
96+
if is_port_open("127.0.0.1", FD_API_PORT):
97+
print(f"API server is up on port {FD_API_PORT}")
98+
break
99+
time.sleep(1)
100+
else:
101+
print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...")
102+
try:
103+
os.killpg(process.pid, signal.SIGTERM)
104+
except Exception as e:
105+
print(f"Failed to kill process group: {e}")
106+
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
107+
108+
yield # Run tests
109+
110+
print("\n===== Post-test server cleanup... =====")
111+
try:
112+
os.killpg(process.pid, signal.SIGTERM)
113+
print(f"API server (pid={process.pid}) terminated")
114+
except Exception as e:
115+
print(f"Failed to terminate API server: {e}")
116+
117+
118+
async def send_inference(idx, client: httpx.AsyncClient):
119+
try:
120+
url = f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions"
121+
data = {
122+
"model": "dummy",
123+
"messages": [{"role": "user", "content": f"hello {idx}"}],
124+
"metadata": {"min_tokens": 1000},
125+
}
126+
resp = await client.post(url, json=data, timeout=20)
127+
return resp.status_code
128+
except Exception as e:
129+
print(f"infer {idx} error:", e)
130+
return None
131+
132+
133+
async def run_concurrent_inference(n):
134+
async with httpx.AsyncClient() as client:
135+
tasks = [send_inference(i, client) for i in range(n)]
136+
results = await asyncio.gather(*tasks, return_exceptions=True)
137+
return results
138+
139+
140+
def async_concurrency(n=10):
141+
print(f"Launching {n} concurrent async inference requests...")
142+
t0 = time.time()
143+
results = asyncio.run(run_concurrent_inference(n))
144+
print("Done in", time.time() - t0, "seconds")
145+
print("Status codes:", results)
146+
147+
148+
def parse_prometheus_to_dict(metrics_text: str):
149+
"""转换为dict格式"""
150+
result = {}
151+
for line in metrics_text.split("\n"):
152+
line = line.strip()
153+
# 跳过注释和空行
154+
if not line or line.startswith("#"):
155+
continue
156+
157+
if "{" in line: # 有 label
158+
metric_name = line.split("{", 1)[0]
159+
labels_str = line[line.index("{") + 1 : line.index("}")]
160+
value = float(line.split("}")[1].strip())
161+
162+
# 解析 labels
163+
labels = {}
164+
for kv in labels_str.split(","):
165+
k, v = kv.split("=")
166+
labels[k] = v.strip('"')
167+
168+
# 存储
169+
if metric_name not in result:
170+
result[metric_name] = []
171+
result[metric_name].append({"labels": labels, "value": value})
172+
173+
else: # 无 label
174+
metric_name, value_str = line.split()
175+
result[metric_name] = float(value_str)
176+
177+
return result
178+
179+
180+
def get_metrics_dict(metrics_url):
181+
"""获取metrics指标数据"""
182+
resp = requests.get(metrics_url, timeout=5)
183+
184+
assert resp.status_code == 200, f"Unexpected status code: {resp.status_code}"
185+
assert "text/plain" in resp.headers["Content-Type"], "Content-Type is not text/plain"
186+
187+
# Parse Prometheus metrics data
188+
metrics_data = resp.text
189+
print(metrics_data)
190+
metrics_dict = parse_prometheus_to_dict(metrics_data)
191+
# print("\nParsed dict:")
192+
# print(metrics_dict)
193+
print("num_requests_running:", metrics_dict["fastdeploy:num_requests_running"])
194+
print("num_requests_waiting", metrics_dict["fastdeploy:num_requests_waiting"])
195+
196+
return metrics_dict
197+
198+
199+
def test_metrics_with_clear_and_reset():
200+
"""
201+
Test the metrics monitoring endpoint.
202+
"""
203+
FD_CONTROLLER_PORT = int(os.getenv("FD_CONTROLLER_PORT", 8333))
204+
metrics_url = f"http://0.0.0.0:{FD_METRICS_PORT}/metrics"
205+
206+
async_concurrency(n=10)
207+
208+
time.sleep(0.3)
209+
210+
# ===== clear_load_weight =====
211+
clear_url = f"http://0.0.0.0:{FD_API_PORT}/clear_load_weight"
212+
print("Calling clear_load_weight...")
213+
r = requests.get(clear_url, timeout=30)
214+
assert r.status_code == 200, f"clear_load_weight failed: {r.status_code}"
215+
216+
metrics = get_metrics_dict(metrics_url)
217+
running = metrics["fastdeploy:num_requests_running"]
218+
waiting = metrics["fastdeploy:num_requests_waiting"]
219+
220+
print("ASSERT clear_load_weight后非0 running:", running, "waiting:", waiting)
221+
assert running != 0 or waiting != 0, "Expected running/waiting to be non-zero"
222+
223+
# ===== reset_scheduler =====
224+
reset_url = f"http://0.0.0.0:{FD_CONTROLLER_PORT}/controller/reset_scheduler"
225+
print("Calling reset_scheduler...")
226+
r = requests.post(reset_url, json={"reset": True}, timeout=30)
227+
assert r.status_code == 200, f"reset_scheduler failed: {r.status_code}"
228+
229+
metrics = get_metrics_dict(metrics_url)
230+
running = metrics["fastdeploy:num_requests_running"]
231+
waiting = metrics["fastdeploy:num_requests_waiting"]
232+
233+
print("ASSERT reset_scheduler后为0 running:", running, "waiting:", waiting)
234+
assert running == 0 and waiting == 0, "Expected running/waiting to be zero"
235+
236+
237+
if __name__ == "__main__":
238+
test_metrics_with_clear_and_reset()

0 commit comments

Comments
 (0)