Skip to content

Commit d3430b3

Browse files
committed
fix(kubernetes)!: Provider updated for each watcher callback
The kubernetes.Provider watch-related functionality is restarted for each watch notification that affects its state. Signed-off-by: Marc Nuri <[email protected]>
1 parent 1538318 commit d3430b3

File tree

9 files changed

+326
-110
lines changed

9 files changed

+326
-110
lines changed

pkg/kubernetes/provider_kubeconfig.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"reflect"
78

89
"github.com/containers/kubernetes-mcp-server/pkg/config"
910
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes/watcher"
@@ -18,6 +19,7 @@ const KubeConfigTargetParameterName = "context"
1819
// Kubernetes clusters using different contexts from a kubeconfig file.
1920
// It lazily initializes managers for each context as they are requested.
2021
type kubeConfigClusterProvider struct {
22+
staticConfig *config.StaticConfig
2123
defaultContext string
2224
managers map[string]*Manager
2325
kubeconfigWatcher *watcher.Kubeconfig
@@ -35,37 +37,44 @@ func init() {
3537
// Internally, it leverages a KubeconfigManager for each context, initializing them
3638
// lazily when requested.
3739
func newKubeConfigClusterProvider(cfg *config.StaticConfig) (Provider, error) {
38-
m, err := NewKubeconfigManager(cfg, "")
40+
ret := &kubeConfigClusterProvider{staticConfig: cfg}
41+
if err := ret.reset(); err != nil {
42+
return nil, err
43+
}
44+
return ret, nil
45+
}
46+
47+
func (p *kubeConfigClusterProvider) reset() error {
48+
m, err := NewKubeconfigManager(p.staticConfig, "")
3949
if err != nil {
4050
if errors.Is(err, ErrorKubeconfigInClusterNotAllowed) {
41-
return nil, fmt.Errorf("kubeconfig ClusterProviderStrategy is invalid for in-cluster deployments: %v", err)
51+
return fmt.Errorf("kubeconfig ClusterProviderStrategy is invalid for in-cluster deployments: %v", err)
4252
}
43-
return nil, err
53+
return err
4454
}
4555

4656
rawConfig, err := m.accessControlClientset.clientCmdConfig.RawConfig()
4757
if err != nil {
48-
return nil, err
58+
return err
4959
}
5060

51-
allClusterManagers := map[string]*Manager{
61+
p.managers = map[string]*Manager{
5262
rawConfig.CurrentContext: m, // we already initialized a manager for the default context, let's use it
5363
}
5464

5565
for name := range rawConfig.Contexts {
5666
if name == rawConfig.CurrentContext {
5767
continue // already initialized this, don't want to set it to nil
5868
}
59-
60-
allClusterManagers[name] = nil
69+
p.managers[name] = nil
6170
}
6271

63-
return &kubeConfigClusterProvider{
64-
defaultContext: rawConfig.CurrentContext,
65-
managers: allClusterManagers,
66-
kubeconfigWatcher: watcher.NewKubeconfig(m.accessControlClientset.clientCmdConfig),
67-
clusterStateWatcher: watcher.NewClusterState(m.accessControlClientset.DiscoveryClient()),
68-
}, nil
72+
p.Close()
73+
p.kubeconfigWatcher = watcher.NewKubeconfig(m.accessControlClientset.clientCmdConfig)
74+
p.clusterStateWatcher = watcher.NewClusterState(m.accessControlClientset.DiscoveryClient())
75+
p.defaultContext = rawConfig.CurrentContext
76+
77+
return nil
6978
}
7079

7180
func (p *kubeConfigClusterProvider) managerForContext(context string) (*Manager, error) {
@@ -124,20 +133,21 @@ func (p *kubeConfigClusterProvider) GetDefaultTarget() string {
124133
}
125134

126135
func (p *kubeConfigClusterProvider) WatchTargets(reload McpReload) {
127-
reloadWithCacheInvalidate := func() error {
128-
// Invalidate all cached managers to force reloading on next access
129-
for contextName := range p.managers {
130-
if m := p.managers[contextName]; m != nil {
131-
m.Invalidate()
132-
}
136+
reloadWithReset := func() error {
137+
if err := p.reset(); err != nil {
138+
return err
133139
}
140+
p.WatchTargets(reload)
134141
return reload()
135142
}
136-
p.kubeconfigWatcher.Watch(reloadWithCacheInvalidate)
137-
p.clusterStateWatcher.Watch(reloadWithCacheInvalidate)
143+
p.kubeconfigWatcher.Watch(reloadWithReset)
144+
p.clusterStateWatcher.Watch(reload)
138145
}
139146

140147
func (p *kubeConfigClusterProvider) Close() {
141-
_ = p.kubeconfigWatcher.Close()
142-
_ = p.clusterStateWatcher.Close()
148+
for _, w := range []watcher.Watcher{p.kubeconfigWatcher, p.clusterStateWatcher} {
149+
if !reflect.ValueOf(w).IsNil() {
150+
w.Close()
151+
}
152+
}
143153
}

pkg/kubernetes/provider_single.go

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"reflect"
78

89
"github.com/containers/kubernetes-mcp-server/pkg/config"
910
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes/watcher"
@@ -14,6 +15,7 @@ import (
1415
// Kubernetes cluster. Used for in-cluster deployments or when multi-cluster
1516
// support is disabled.
1617
type singleClusterProvider struct {
18+
staticConfig *config.StaticConfig
1719
strategy string
1820
manager *Manager
1921
kubeconfigWatcher *watcher.Kubeconfig
@@ -32,31 +34,41 @@ func init() {
3234
// Otherwise, it uses a KubeconfigManager.
3335
func newSingleClusterProvider(strategy string) ProviderFactory {
3436
return func(cfg *config.StaticConfig) (Provider, error) {
35-
if cfg != nil && cfg.KubeConfig != "" && strategy == config.ClusterProviderInCluster {
36-
return nil, fmt.Errorf("kubeconfig file %s cannot be used with the in-cluster ClusterProviderStrategy", cfg.KubeConfig)
37+
ret := &singleClusterProvider{
38+
staticConfig: cfg,
39+
strategy: strategy,
3740
}
38-
39-
var m *Manager
40-
var err error
41-
if strategy == config.ClusterProviderInCluster || IsInCluster(cfg) {
42-
m, err = NewInClusterManager(cfg)
43-
} else {
44-
m, err = NewKubeconfigManager(cfg, "")
45-
}
46-
if err != nil {
47-
if errors.Is(err, ErrorInClusterNotInCluster) {
48-
return nil, fmt.Errorf("server must be deployed in cluster for the %s ClusterProviderStrategy: %v", strategy, err)
49-
}
41+
if err := ret.reset(); err != nil {
5042
return nil, err
5143
}
44+
return ret, nil
45+
}
46+
}
47+
48+
func (p *singleClusterProvider) reset() error {
49+
if p.staticConfig != nil && p.staticConfig.KubeConfig != "" && p.strategy == config.ClusterProviderInCluster {
50+
return fmt.Errorf("kubeconfig file %s cannot be used with the in-cluster ClusterProviderStrategy",
51+
p.staticConfig.KubeConfig)
52+
}
5253

53-
return &singleClusterProvider{
54-
manager: m,
55-
strategy: strategy,
56-
kubeconfigWatcher: watcher.NewKubeconfig(m.accessControlClientset.clientCmdConfig),
57-
clusterStateWatcher: watcher.NewClusterState(m.accessControlClientset.DiscoveryClient()),
58-
}, nil
54+
var err error
55+
if p.strategy == config.ClusterProviderInCluster || IsInCluster(p.staticConfig) {
56+
p.manager, err = NewInClusterManager(p.staticConfig)
57+
} else {
58+
p.manager, err = NewKubeconfigManager(p.staticConfig, "")
5959
}
60+
if err != nil {
61+
if errors.Is(err, ErrorInClusterNotInCluster) {
62+
return fmt.Errorf("server must be deployed in cluster for the %s ClusterProviderStrategy: %v",
63+
p.strategy, err)
64+
}
65+
return err
66+
}
67+
68+
p.Close()
69+
p.kubeconfigWatcher = watcher.NewKubeconfig(p.manager.accessControlClientset.clientCmdConfig)
70+
p.clusterStateWatcher = watcher.NewClusterState(p.manager.accessControlClientset.DiscoveryClient())
71+
return nil
6072
}
6173

6274
func (p *singleClusterProvider) IsOpenShift(ctx context.Context) bool {
@@ -91,16 +103,21 @@ func (p *singleClusterProvider) GetTargetParameterName() string {
91103
}
92104

93105
func (p *singleClusterProvider) WatchTargets(reload McpReload) {
94-
reloadWithCacheInvalidate := func() error {
95-
// Invalidate all cached managers to force reloading on next access
96-
p.manager.Invalidate()
106+
reloadWithReset := func() error {
107+
if err := p.reset(); err != nil {
108+
return err
109+
}
110+
p.WatchTargets(reload)
97111
return reload()
98112
}
99-
p.kubeconfigWatcher.Watch(reloadWithCacheInvalidate)
100-
p.clusterStateWatcher.Watch(reloadWithCacheInvalidate)
113+
p.kubeconfigWatcher.Watch(reloadWithReset)
114+
p.clusterStateWatcher.Watch(reload)
101115
}
102116

103117
func (p *singleClusterProvider) Close() {
104-
_ = p.kubeconfigWatcher.Close()
105-
_ = p.clusterStateWatcher.Close()
118+
for _, w := range []watcher.Watcher{p.kubeconfigWatcher, p.clusterStateWatcher} {
119+
if !reflect.ValueOf(w).IsNil() {
120+
w.Close()
121+
}
122+
}
106123
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package kubernetes
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"reflect"
7+
"testing"
8+
"time"
9+
10+
"github.com/containers/kubernetes-mcp-server/internal/test"
11+
"github.com/containers/kubernetes-mcp-server/pkg/config"
12+
"github.com/stretchr/testify/suite"
13+
"k8s.io/client-go/tools/clientcmd"
14+
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
15+
)
16+
17+
type ProviderWatchTargetsTestSuite struct {
18+
suite.Suite
19+
mockServer *test.MockServer
20+
discoveryClientHandler *test.DiscoveryClientHandler
21+
kubeconfig *clientcmdapi.Config
22+
staticConfig *config.StaticConfig
23+
}
24+
25+
func (s *ProviderWatchTargetsTestSuite) SetupTest() {
26+
s.mockServer = test.NewMockServer()
27+
s.discoveryClientHandler = &test.DiscoveryClientHandler{}
28+
s.mockServer.Handle(s.discoveryClientHandler)
29+
30+
s.T().Setenv("CLUSTER_STATE_POLL_INTERVAL_MS", "100")
31+
s.T().Setenv("CLUSTER_STATE_DEBOUNCE_WINDOW_MS", "50")
32+
33+
// Add multiple fake contexts to allow testing of context changes
34+
s.kubeconfig = s.mockServer.Kubeconfig()
35+
for i := 0; i < 10; i++ {
36+
name := fmt.Sprintf("context-%d", i)
37+
s.kubeconfig.Contexts[name] = clientcmdapi.NewContext()
38+
s.kubeconfig.Contexts[name].Cluster = s.kubeconfig.Contexts[s.kubeconfig.CurrentContext].Cluster
39+
s.kubeconfig.Contexts[name].AuthInfo = s.kubeconfig.Contexts[s.kubeconfig.CurrentContext].AuthInfo
40+
}
41+
42+
s.staticConfig = &config.StaticConfig{KubeConfig: test.KubeconfigFile(s.T(), s.kubeconfig)}
43+
}
44+
45+
func (s *ProviderWatchTargetsTestSuite) TearDownTest() {
46+
s.mockServer.Close()
47+
}
48+
49+
func (s *ProviderWatchTargetsTestSuite) TestClusterStateChanges() {
50+
testCases := []func() (Provider, error){
51+
func() (Provider, error) { return newKubeConfigClusterProvider(s.staticConfig) },
52+
func() (Provider, error) {
53+
return newSingleClusterProvider(config.ClusterProviderDisabled)(s.staticConfig)
54+
},
55+
}
56+
for _, tc := range testCases {
57+
provider, err := tc()
58+
s.Require().NoError(err, "Expected no error from provider creation")
59+
60+
s.Run("With provider "+reflect.TypeOf(provider).String(), func() {
61+
callback, waitForCallback := CallbackWaiter()
62+
provider.WatchTargets(callback)
63+
s.Run("Reloads provider on cluster changes", func() {
64+
s.discoveryClientHandler.Groups = append(s.discoveryClientHandler.Groups, `{"name":"alex.example.com","versions":[{"groupVersion":"alex.example.com/v1","version":"v1"}],"preferredVersion":{"groupVersion":"alex.example.com/v1","version":"v1"}}`)
65+
66+
s.Require().NoError(waitForCallback(5 * time.Second))
67+
// Provider-wise the watcher.ClusterState which triggers the callback has no effect.
68+
// We might consider removing it at some point? (20251202)
69+
})
70+
})
71+
}
72+
}
73+
74+
func (s *ProviderWatchTargetsTestSuite) TestKubeConfigClusterProvider() {
75+
provider, err := newKubeConfigClusterProvider(s.staticConfig)
76+
s.Require().NoError(err, "Expected no error from provider creation")
77+
78+
callback, waitForCallback := CallbackWaiter()
79+
provider.WatchTargets(callback)
80+
81+
s.Run("KubeConfigClusterProvider updates targets (reset) on kubeconfig change", func() {
82+
s.kubeconfig.CurrentContext = "context-1"
83+
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
84+
s.Require().NoError(waitForCallback(5 * time.Second))
85+
86+
s.Run("Replaces default target with new context", func() {
87+
s.Equal("context-1", provider.GetDefaultTarget(), "Expected default target context to be updated")
88+
})
89+
s.Run("Adds new context to targets", func() {
90+
targets, err := provider.GetTargets(s.T().Context())
91+
s.Require().NoError(err, "Expected no error from GetTargets")
92+
s.Contains(targets, "context-1")
93+
})
94+
s.Run("Has derived Kubernetes for new context", func() {
95+
k, err := provider.GetDerivedKubernetes(s.T().Context(), "context-1")
96+
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes for context-1")
97+
s.NotNil(k, "Expected Kubernetes from GetDerivedKubernetes for context-1")
98+
s.Run("Derived Kubernetes points to correct context", func() {
99+
cfg, err := k.AccessControlClientset().ToRawKubeConfigLoader().RawConfig()
100+
s.Require().NoError(err, "Expected no error from ToRawKubeConfigLoader")
101+
s.Equal("context-1", cfg.CurrentContext, "Expected Kubernetes to point to changed-context")
102+
})
103+
})
104+
105+
s.Run("Keeps watching for further changes", func() {
106+
s.kubeconfig.CurrentContext = "context-2"
107+
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
108+
s.Require().NoError(waitForCallback(5 * time.Second))
109+
110+
s.Run("Replaces default target with new context", func() {
111+
s.Equal("context-2", provider.GetDefaultTarget(), "Expected default target context to be updated")
112+
})
113+
})
114+
})
115+
}
116+
117+
func (s *ProviderWatchTargetsTestSuite) TestSingleClusterProvider() {
118+
provider, err := newSingleClusterProvider(config.ClusterProviderDisabled)(s.staticConfig)
119+
s.Require().NoError(err, "Expected no error from provider creation")
120+
121+
callback, waitForCallback := CallbackWaiter()
122+
provider.WatchTargets(callback)
123+
124+
s.Run("SingleClusterProvider reloads/resets on kubeconfig change", func() {
125+
s.kubeconfig.CurrentContext = "context-1"
126+
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
127+
s.Require().NoError(waitForCallback(5 * time.Second))
128+
129+
s.Run("Derived Kubernetes points to updated context", func() {
130+
k, err := provider.GetDerivedKubernetes(s.T().Context(), "")
131+
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes for context-1")
132+
s.NotNil(k, "Expected Kubernetes from GetDerivedKubernetes for context-1")
133+
s.Run("Derived Kubernetes points to correct context", func() {
134+
cfg, err := k.AccessControlClientset().ToRawKubeConfigLoader().RawConfig()
135+
s.Require().NoError(err, "Expected no error from ToRawKubeConfigLoader")
136+
s.Equal("context-1", cfg.CurrentContext, "Expected Kubernetes to point to changed-context")
137+
})
138+
})
139+
140+
s.Run("Keeps watching for further changes", func() {
141+
s.kubeconfig.CurrentContext = "context-2"
142+
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
143+
s.Require().NoError(waitForCallback(5 * time.Second))
144+
145+
s.Run("Derived Kubernetes points to updated context", func() {
146+
k, err := provider.GetDerivedKubernetes(s.T().Context(), "")
147+
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes for context-2")
148+
s.NotNil(k, "Expected Kubernetes from GetDerivedKubernetes for context-2")
149+
cfg, err := k.AccessControlClientset().ToRawKubeConfigLoader().RawConfig()
150+
s.Require().NoError(err, "Expected no error from ToRawKubeConfigLoader")
151+
s.Equal("context-2", cfg.CurrentContext, "Expected Kubernetes to point to changed-context")
152+
})
153+
})
154+
})
155+
}
156+
157+
// CallbackWaiter returns a callback and wait function that can be used multiple times.
158+
func CallbackWaiter() (callback func() error, waitFunc func(timeout time.Duration) error) {
159+
signal := make(chan struct{}, 1)
160+
callback = func() error {
161+
select {
162+
case signal <- struct{}{}:
163+
default:
164+
}
165+
return nil
166+
}
167+
waitFunc = func(timeout time.Duration) error {
168+
select {
169+
case <-signal:
170+
case <-time.After(timeout):
171+
return errors.New("timeout waiting for callback")
172+
}
173+
return nil
174+
}
175+
return
176+
}
177+
178+
func TestProviderWatchTargetsTestSuite(t *testing.T) {
179+
suite.Run(t, new(ProviderWatchTargetsTestSuite))
180+
}

0 commit comments

Comments
 (0)