Skip to content

Commit 5cca671

Browse files
committed
Resolves zetxqx review feedback
Signed-off-by: Daneyon Hansen <[email protected]>
1 parent 6f3d314 commit 5cca671

File tree

1 file changed

+67
-62
lines changed

1 file changed

+67
-62
lines changed

conformance/tests/gateway_weighted_two_pools.go

Lines changed: 67 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"fmt"
2121
"math"
2222
"net/http"
23-
"slices"
2423
"strings"
2524
"sync/atomic"
2625
"testing"
@@ -52,7 +51,6 @@ var GatewayWeightedAcrossTwoInferencePools = suite.ConformanceTest{
5251
Features: []features.FeatureName{
5352
features.SupportGateway,
5453
features.FeatureName("SupportInferencePool"),
55-
features.SupportGateway,
5654
},
5755
Test: func(t *testing.T, s *suite.ConformanceTestSuite) {
5856
const (
@@ -98,119 +96,126 @@ var GatewayWeightedAcrossTwoInferencePools = suite.ConformanceTest{
9896
primaryPodNames := make([]string, 0, len(primaryPods))
9997
primaryPodIPs := make([]string, 0, len(primaryPods))
10098
for _, p := range primaryPods {
99+
require.NotEmpty(t, p.Status.PodIP, "primary pod %s has no IP yet", p.Name)
101100
primaryPodNames = append(primaryPodNames, p.Name)
102101
primaryPodIPs = append(primaryPodIPs, p.Status.PodIP)
103102
}
103+
104104
secondaryPodNames := make([]string, 0, len(secondaryPods))
105105
secondaryPodIPs := make([]string, 0, len(secondaryPods))
106106
for _, p := range secondaryPods {
107+
require.NotEmpty(t, p.Status.PodIP, "secondary pod %s has no IP yet", p.Name)
107108
secondaryPodNames = append(secondaryPodNames, p.Name)
108109
secondaryPodIPs = append(secondaryPodIPs, p.Status.PodIP)
109110
}
110111

112+
// Send one targeted request per backend Pod to ensure EPP readiness.
113+
allIPs := append(append([]string{}, primaryPodIPs...), secondaryPodIPs...)
114+
allNames := append(append([]string{}, primaryPodNames...), secondaryPodNames...)
115+
for i := 0; i < len(allIPs); i++ {
116+
traffic.MakeRequestAndExpectSuccess(
117+
t,
118+
s.RoundTripper,
119+
s.TimeoutConfig,
120+
gwAddr,
121+
traffic.Request{
122+
Host: hostname,
123+
Path: path,
124+
Headers: map[string]string{
125+
test.HeaderTestEppEndPointSelectionKey: allIPs[i],
126+
},
127+
Method: http.MethodPost,
128+
Body: `{"model":"conformance-fake-model","prompt":"Warmup"}`,
129+
Backend: allNames[i],
130+
Namespace: resources.AppBackendNamespace,
131+
},
132+
)
133+
}
134+
111135
// Provide a union list of eligible endpoints for the test. Each pool's EPP
112136
// should filter to endpoints that actually belong to its pool.
113-
allIPs := append(append([]string{}, primaryPodIPs...), secondaryPodIPs...)
114137
eppHeaderValue := strings.Join(allIPs, ",")
115138

116139
requestBody := `{
117140
"model": "conformance-fake-model",
118141
"prompt": "Write as if you were a critic: San Francisco"
119142
}`
120143

121-
// Send requests with the union header and verify the split roughly matches the
122-
// weight distribution of the test manifest.
123-
var (
124-
roundTripper = s.RoundTripper
125-
g errgroup.Group
126-
primaryHits int64
127-
secondaryHits int64
128-
headers = map[string]string{
129-
test.HeaderTestEppEndPointSelectionKey: eppHeaderValue,
130-
}
131-
expected = gwhttp.ExpectedResponse{
132-
Request: gwhttp.Request{
133-
Host: hostname,
134-
Path: path,
135-
Method: http.MethodPost,
136-
Headers: headers,
137-
},
138-
Response: gwhttp.Response{
139-
StatusCode: http.StatusOK,
140-
},
141-
// Leave backend empty to avoid enforcing a specific pod prefix in CompareRequest.
142-
Namespace: resources.AppBackendNamespace,
143-
}
144-
)
145-
146-
primarySet := func() map[string]struct{} {
147-
m := make(map[string]struct{}, len(primaryPodNames))
148-
for _, n := range primaryPodNames {
149-
m[n] = struct{}{}
150-
}
151-
return m
152-
}()
153-
secondarySet := func() map[string]struct{} {
154-
m := make(map[string]struct{}, len(secondaryPodNames))
155-
for _, n := range secondaryPodNames {
156-
m[n] = struct{}{}
157-
}
158-
return m
159-
}()
144+
// Build quick lookup sets for attributing each hit to a pool by backend pod name.
145+
primarySet := make(map[string]struct{}, len(primaryPodNames))
146+
for _, n := range primaryPodNames {
147+
primarySet[n] = struct{}{}
148+
}
149+
secondarySet := make(map[string]struct{}, len(secondaryPodNames))
150+
for _, n := range secondaryPodNames {
151+
secondarySet[n] = struct{}{}
152+
}
160153

154+
headers := map[string]string{
155+
test.HeaderTestEppEndPointSelectionKey: eppHeaderValue,
156+
}
157+
expected := gwhttp.ExpectedResponse{
158+
Request: gwhttp.Request{
159+
Host: hostname,
160+
Path: path,
161+
Method: http.MethodPost,
162+
Headers: headers,
163+
},
164+
Response: gwhttp.Response{
165+
StatusCode: http.StatusOK,
166+
},
167+
Namespace: resources.AppBackendNamespace,
168+
}
161169
req := gwhttp.MakeRequest(t, &expected, gwAddr, "HTTP", "http")
170+
171+
var primaryHits, secondaryHits atomic.Int64
172+
var g errgroup.Group
162173
g.SetLimit(concurrentRequests)
163-
for range totalRequests {
174+
175+
for i := 0; i < totalRequests; i++ {
164176
g.Go(func() error {
165-
cReq, cRes, err := traffic.MakeCallRoundTripper(t, roundTripper, &traffic.RequestWithBody{
177+
cReq, cRes, err := traffic.MakeCallRoundTripper(t, s.RoundTripper, &traffic.RequestWithBody{
166178
Request: req,
167179
Body: strings.NewReader(requestBody),
168180
})
169181
if err != nil {
170182
return fmt.Errorf("failed to roundtrip request: %w", err)
171183
}
172-
if err := gwhttp.CompareRequest(t, &req, cReq, cRes, expected); err != nil {
184+
if err := gwhttp.CompareRoundTrip(t, &req, cReq, cRes, expected); err != nil {
173185
return fmt.Errorf("response expectation failed: %w", err)
174186
}
175187

176-
// Attribute response to pool by the backend pod name.
188+
// Attribute response to pool by backend pod name.
177189
if _, ok := primarySet[cReq.Pod]; ok {
178-
atomic.AddInt64(&primaryHits, 1)
190+
primaryHits.Add(1)
179191
} else if _, ok := secondarySet[cReq.Pod]; ok {
180-
atomic.AddInt64(&secondaryHits, 1)
192+
secondaryHits.Add(1)
181193
} else {
182194
return fmt.Errorf("request was handled by unexpected pod %q (not in either pool)", cReq.Pod)
183195
}
184-
185196
return nil
186197
})
187198
}
188199
require.NoError(t, g.Wait(), "requests failed")
189200

190-
ph := float64(atomic.LoadInt64(&primaryHits))
191-
sh := float64(atomic.LoadInt64(&secondaryHits))
201+
ph := float64(primaryHits.Load())
202+
sh := float64(secondaryHits.Load())
192203
total := ph + sh
204+
require.Equal(t, int64(totalRequests), int64(total), "sum of hits must equal number of attempts")
193205
require.Greater(t, total, 0.0)
194206

195207
observedPrimary := ph / total
196208
expectedPrimary := float64(primaryWeight) / float64(primaryWeight+secondaryWeight)
197209

198-
// Allow either a 10 percentage-point absolute error, or a 3-sigma
199-
// binomial confidence interval (whichever is larger). This keeps
200-
// flakiness low while still detecting obvious mis-weighting.
210+
// Allow either a 10 percentage-point absolute error, or a 3-sigma binomial CI.
201211
sigma := math.Sqrt(expectedPrimary * (1.0 - expectedPrimary) / total)
202212
absTolerance := math.Max(0.10, 3.0*sigma)
203213

204214
diff := math.Abs(observedPrimary - expectedPrimary)
205-
if diff > absTolerance {
206-
t.Fatalf("weighted split out of bounds: observed primary=%.3f (hits=%d/%d), expected=%.3f, tolerance=±%.3f",
207-
observedPrimary, int64(ph), int64(total), expectedPrimary, absTolerance)
208-
}
209-
215+
require.LessOrEqualf(t, diff, absTolerance,
216+
"weighted split out of bounds: observed primary=%.3f (hits=%d/%d), expected=%.3f, tolerance=±%.3f",
217+
observedPrimary, int64(ph), int64(total), expectedPrimary, absTolerance)
210218
t.Logf("Weighted split OK: primary=%.3f (hits=%d/%d), expected=%.3f, tolerance=±%.3f; secondary hits=%d",
211219
observedPrimary, int64(ph), int64(total), expectedPrimary, absTolerance, int64(sh))
212-
213-
// Sanity: ensure responses only came from pods we enumerated.
214-
require.True(t, slices.Contains([]int{int(ph + sh)}, int(total)))
215220
},
216221
}

0 commit comments

Comments
 (0)