Skip to content

Commit 1538318

Browse files
authored
test(watcher): improve kubernetes watcher unit coverage (#534)
Signed-off-by: Marc Nuri <[email protected]>
1 parent 314ea12 commit 1538318

File tree

3 files changed

+333
-65
lines changed

3 files changed

+333
-65
lines changed

internal/test/test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,24 @@ func WaitForHealthz(tcpAddr *net.TCPAddr) error {
7171
}
7272
return fmt.Errorf("healthz endpoint returned 404 after retries")
7373
}
74+
75+
func WaitForCondition(timeout time.Duration, condition func() bool) error {
76+
done := make(chan struct{})
77+
go func() {
78+
for {
79+
if condition() {
80+
close(done)
81+
return
82+
}
83+
time.Sleep(time.Millisecond)
84+
}
85+
}()
86+
87+
select {
88+
case <-done:
89+
// Condition met
90+
case <-time.After(timeout):
91+
return fmt.Errorf("timeout waiting for condition")
92+
}
93+
return nil
94+
}

pkg/kubernetes/watcher/cluster_test.go

Lines changed: 50 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,13 @@ func (s *ClusterStateTestSuite) TearDownTest() {
3333
}
3434
}
3535

36-
// waitForCondition polls a condition function until it returns true or times out.
37-
func (s *ClusterStateTestSuite) waitForCondition(condition func() bool, timeout time.Duration, failMsg string) {
38-
done := make(chan struct{})
39-
go func() {
40-
for {
41-
if condition() {
42-
close(done)
43-
return
44-
}
45-
time.Sleep(time.Millisecond)
46-
}
47-
}()
48-
49-
select {
50-
case <-done:
51-
// Condition met
52-
case <-time.After(timeout):
53-
s.Fail(failMsg)
54-
}
55-
}
56-
5736
// waitForWatcherState waits for the watcher to capture initial state
5837
func (s *ClusterStateTestSuite) waitForWatcherState(watcher *ClusterState) {
59-
s.waitForCondition(func() bool {
38+
s.NoError(test.WaitForCondition(watcherStateTimeout, func() bool {
6039
watcher.mu.Lock()
6140
defer watcher.mu.Unlock()
6241
return len(watcher.lastKnownState.apiGroups) > 0
63-
}, watcherStateTimeout, "timeout waiting for watcher to capture initial state")
42+
}), "timeout waiting for watcher to capture initial state")
6443
}
6544

6645
func (s *ClusterStateTestSuite) TestNewClusterState() {
@@ -212,7 +191,6 @@ func (s *ClusterStateTestSuite) TestWatch() {
212191
})
213192

214193
s.Run("detects cluster state changes", func() {
215-
// Reset handlers first to avoid invalid state
216194
s.mockServer.ResetHandlers()
217195
handler := &test.DiscoveryClientHandler{}
218196
s.mockServer.Handle(handler)
@@ -223,45 +201,31 @@ func (s *ClusterStateTestSuite) TestWatch() {
223201
watcher.pollInterval = 50 * time.Millisecond
224202
watcher.debounceWindow = 20 * time.Millisecond
225203

226-
// Channel to signal when onChange is called
227-
changeDetected := make(chan struct{}, 1)
228204
var callCount atomic.Int32
229205
onChange := func() error {
230-
count := callCount.Add(1)
231-
if count == 1 {
232-
select {
233-
case changeDetected <- struct{}{}:
234-
default:
235-
}
236-
}
206+
callCount.Add(1)
237207
return nil
238208
}
239209

240210
go func() {
241211
watcher.Watch(onChange)
242212
}()
243-
defer func() { _ = watcher.Close() }()
213+
s.T().Cleanup(func() { _ = watcher.Close() })
244214

245215
// Wait for initial state capture
246216
s.waitForWatcherState(watcher)
247217

248-
// Modify the existing handler to add new API groups (with proper synchronization)
218+
// Modify the handler to add new API groups
249219
handler.Groups = []string{
250220
`{"name":"custom.example.com","versions":[{"groupVersion":"custom.example.com/v1","version":"v1"}],"preferredVersion":{"groupVersion":"custom.example.com/v1","version":"v1"}}`,
251221
}
252222

253-
// Wait for change detection or timeout
254-
select {
255-
case <-changeDetected:
256-
s.Run("triggers onChange callback on detected changes", func() {
257-
s.GreaterOrEqual(callCount.Load(), int32(1), "onChange should be called at least once")
258-
})
259-
case <-time.After(200 * time.Millisecond):
260-
s.Run("triggers onChange callback on detected changes", func() {
261-
// Change might not be detected due to caching, which is acceptable
262-
s.GreaterOrEqual(callCount.Load(), int32(0), "watcher attempted to detect changes")
263-
})
264-
}
223+
// Wait for change detection - the watcher invalidates the cache on each poll
224+
s.Require().NoError(test.WaitForCondition(500*time.Millisecond, func() bool {
225+
return callCount.Load() >= 1
226+
}), "timeout waiting for onChange callback")
227+
228+
s.GreaterOrEqual(callCount.Load(), int32(1), "onChange should be called at least once")
265229
})
266230

267231
s.Run("detects OpenShift cluster", func() {
@@ -280,7 +244,7 @@ func (s *ClusterStateTestSuite) TestWatch() {
280244
go func() {
281245
watcher.Watch(onChange)
282246
}()
283-
defer func() { _ = watcher.Close() }()
247+
s.T().Cleanup(func() { _ = watcher.Close() })
284248

285249
// Wait for the watcher to capture initial state
286250
s.waitForWatcherState(watcher)
@@ -303,23 +267,32 @@ func (s *ClusterStateTestSuite) TestWatch() {
303267
watcher.pollInterval = 50 * time.Millisecond
304268
watcher.debounceWindow = 20 * time.Millisecond
305269

270+
var errorCallCount atomic.Int32
306271
expectedErr := errors.New("reload failed")
307272
onChange := func() error {
273+
errorCallCount.Add(1)
308274
return expectedErr
309275
}
310276

311277
go func() {
312278
watcher.Watch(onChange)
313279
}()
314-
defer func() { _ = watcher.Close() }()
280+
s.T().Cleanup(func() { _ = watcher.Close() })
315281

316282
// Wait for the watcher to start and capture initial state
317283
s.waitForWatcherState(watcher)
318284

319-
s.Run("does not panic on callback error", func() {
320-
// Test passes if we reach here without panic
321-
s.True(true, "watcher handles callback errors without panicking")
322-
})
285+
// Modify the handler to trigger a change
286+
handler.Groups = []string{
287+
`{"name":"error.trigger","versions":[{"groupVersion":"error.trigger/v1","version":"v1"}],"preferredVersion":{"groupVersion":"error.trigger/v1","version":"v1"}}`,
288+
}
289+
290+
// Wait for onChange to be called (which returns an error)
291+
s.Require().NoError(test.WaitForCondition(500*time.Millisecond, func() bool {
292+
return errorCallCount.Load() >= 1
293+
}), "timeout waiting for onChange callback")
294+
295+
s.GreaterOrEqual(errorCallCount.Load(), int32(1), "onChange should be called even when it returns an error")
323296
})
324297
}
325298

@@ -352,9 +325,11 @@ func (s *ClusterStateTestSuite) TestClose() {
352325
s.Run("stops polling", func() {
353326
beforeCount := callCount.Load()
354327
// Wait longer than poll interval to verify no more polling
355-
s.waitForCondition(func() bool {
356-
return true // Always true, just waiting
357-
}, 150*time.Millisecond, "")
328+
// We expect this to timeout because no callbacks should be triggered after close
329+
err := test.WaitForCondition(150*time.Millisecond, func() bool {
330+
return callCount.Load() > beforeCount
331+
})
332+
s.Error(err, "should timeout because no polling occurs after close")
358333
afterCount := callCount.Load()
359334
s.Equal(beforeCount, afterCount, "should not poll after close")
360335
})
@@ -387,9 +362,11 @@ func (s *ClusterStateTestSuite) TestClose() {
387362

388363
watcher := NewClusterState(discoveryClient)
389364
watcher.pollInterval = 30 * time.Millisecond
390-
watcher.debounceWindow = 200 * time.Millisecond // Long debounce
365+
watcher.debounceWindow = 500 * time.Millisecond // Long debounce window
391366

367+
var callCount atomic.Int32
392368
onChange := func() error {
369+
callCount.Add(1)
393370
return nil
394371
}
395372

@@ -400,16 +377,24 @@ func (s *ClusterStateTestSuite) TestClose() {
400377
// Wait for the watcher to start
401378
s.waitForWatcherState(watcher)
402379

403-
// Close the watcher
380+
// Modify the handler to trigger a change and start the debounce timer
381+
handler.Groups = []string{
382+
`{"name":"trigger.change","versions":[{"groupVersion":"trigger.change/v1","version":"v1"}],"preferredVersion":{"groupVersion":"trigger.change/v1","version":"v1"}}`,
383+
}
384+
385+
// Wait for the change to be detected (debounce timer starts)
386+
s.Require().NoError(test.WaitForCondition(200*time.Millisecond, func() bool {
387+
watcher.mu.Lock()
388+
defer watcher.mu.Unlock()
389+
return watcher.debounceTimer != nil
390+
}), "timeout waiting for debounce timer to start")
391+
392+
// Close the watcher before debounce window expires
404393
err := watcher.Close()
394+
s.NoError(err, "close should succeed")
405395

406-
s.Run("closes without error", func() {
407-
s.NoError(err)
408-
})
409-
s.Run("debounce timer is stopped", func() {
410-
// Test passes if Close() completes without hanging
411-
s.True(true, "watcher closed successfully")
412-
})
396+
// Verify onChange was not called (debounce timer was stopped)
397+
s.Equal(int32(0), callCount.Load(), "onChange should not be called because debounce timer was stopped")
413398
})
414399

415400
s.Run("handles close with nil channels", func() {

0 commit comments

Comments
 (0)