Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions pkg/kubernetes/provider_kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"

"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes/watcher"
Expand All @@ -18,6 +19,7 @@ const KubeConfigTargetParameterName = "context"
// Kubernetes clusters using different contexts from a kubeconfig file.
// It lazily initializes managers for each context as they are requested.
type kubeConfigClusterProvider struct {
staticConfig *config.StaticConfig
defaultContext string
managers map[string]*Manager
kubeconfigWatcher *watcher.Kubeconfig
Expand All @@ -35,37 +37,44 @@ func init() {
// Internally, it leverages a KubeconfigManager for each context, initializing them
// lazily when requested.
func newKubeConfigClusterProvider(cfg *config.StaticConfig) (Provider, error) {
m, err := NewKubeconfigManager(cfg, "")
ret := &kubeConfigClusterProvider{staticConfig: cfg}
if err := ret.reset(); err != nil {
return nil, err
}
return ret, nil
}

func (p *kubeConfigClusterProvider) reset() error {
m, err := NewKubeconfigManager(p.staticConfig, "")
if err != nil {
if errors.Is(err, ErrorKubeconfigInClusterNotAllowed) {
return nil, fmt.Errorf("kubeconfig ClusterProviderStrategy is invalid for in-cluster deployments: %v", err)
return fmt.Errorf("kubeconfig ClusterProviderStrategy is invalid for in-cluster deployments: %v", err)
}
return nil, err
return err
}

rawConfig, err := m.accessControlClientset.clientCmdConfig.RawConfig()
if err != nil {
return nil, err
return err
}

allClusterManagers := map[string]*Manager{
p.managers = map[string]*Manager{
rawConfig.CurrentContext: m, // we already initialized a manager for the default context, let's use it
}

for name := range rawConfig.Contexts {
if name == rawConfig.CurrentContext {
continue // already initialized this, don't want to set it to nil
}

allClusterManagers[name] = nil
p.managers[name] = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would these managers have any resources that should be cleaned up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question.

We don't provide a kubernetes.Manager.Clean method.

The manager itself has a pointer reference to the global config.StaticConfig and an instance of kubernetes.AccessControlClientset.

The clientset has instances of the discovery client, dynamic client, and kubernetes interface. But AFAICT, none of those clients provide clean up methods.

I might have missed something, but I'd say that there's no clean up needed.

}

return &kubeConfigClusterProvider{
defaultContext: rawConfig.CurrentContext,
managers: allClusterManagers,
kubeconfigWatcher: watcher.NewKubeconfig(m.accessControlClientset.clientCmdConfig),
clusterStateWatcher: watcher.NewClusterState(m.accessControlClientset.DiscoveryClient()),
}, nil
p.Close()
p.kubeconfigWatcher = watcher.NewKubeconfig(m.accessControlClientset.clientCmdConfig)
p.clusterStateWatcher = watcher.NewClusterState(m.accessControlClientset.DiscoveryClient())
p.defaultContext = rawConfig.CurrentContext

return nil
}

func (p *kubeConfigClusterProvider) managerForContext(context string) (*Manager, error) {
Expand Down Expand Up @@ -124,20 +133,21 @@ func (p *kubeConfigClusterProvider) GetDefaultTarget() string {
}

func (p *kubeConfigClusterProvider) WatchTargets(reload McpReload) {
reloadWithCacheInvalidate := func() error {
// Invalidate all cached managers to force reloading on next access
for contextName := range p.managers {
if m := p.managers[contextName]; m != nil {
m.Invalidate()
}
reloadWithReset := func() error {
if err := p.reset(); err != nil {
return err
}
p.WatchTargets(reload)
return reload()
}
p.kubeconfigWatcher.Watch(reloadWithCacheInvalidate)
p.clusterStateWatcher.Watch(reloadWithCacheInvalidate)
p.kubeconfigWatcher.Watch(reloadWithReset)
p.clusterStateWatcher.Watch(reload)
}

func (p *kubeConfigClusterProvider) Close() {
_ = p.kubeconfigWatcher.Close()
_ = p.clusterStateWatcher.Close()
for _, w := range []watcher.Watcher{p.kubeconfigWatcher, p.clusterStateWatcher} {
if !reflect.ValueOf(w).IsNil() {
w.Close()
}
}
}
71 changes: 44 additions & 27 deletions pkg/kubernetes/provider_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"

"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes/watcher"
Expand All @@ -14,6 +15,7 @@ import (
// Kubernetes cluster. Used for in-cluster deployments or when multi-cluster
// support is disabled.
type singleClusterProvider struct {
staticConfig *config.StaticConfig
strategy string
manager *Manager
kubeconfigWatcher *watcher.Kubeconfig
Expand All @@ -32,31 +34,41 @@ func init() {
// Otherwise, it uses a KubeconfigManager.
func newSingleClusterProvider(strategy string) ProviderFactory {
return func(cfg *config.StaticConfig) (Provider, error) {
if cfg != nil && cfg.KubeConfig != "" && strategy == config.ClusterProviderInCluster {
return nil, fmt.Errorf("kubeconfig file %s cannot be used with the in-cluster ClusterProviderStrategy", cfg.KubeConfig)
ret := &singleClusterProvider{
staticConfig: cfg,
strategy: strategy,
}

var m *Manager
var err error
if strategy == config.ClusterProviderInCluster || IsInCluster(cfg) {
m, err = NewInClusterManager(cfg)
} else {
m, err = NewKubeconfigManager(cfg, "")
}
if err != nil {
if errors.Is(err, ErrorInClusterNotInCluster) {
return nil, fmt.Errorf("server must be deployed in cluster for the %s ClusterProviderStrategy: %v", strategy, err)
}
if err := ret.reset(); err != nil {
return nil, err
}
return ret, nil
}
}

func (p *singleClusterProvider) reset() error {
if p.staticConfig != nil && p.staticConfig.KubeConfig != "" && p.strategy == config.ClusterProviderInCluster {
return fmt.Errorf("kubeconfig file %s cannot be used with the in-cluster ClusterProviderStrategy",
p.staticConfig.KubeConfig)
}

return &singleClusterProvider{
manager: m,
strategy: strategy,
kubeconfigWatcher: watcher.NewKubeconfig(m.accessControlClientset.clientCmdConfig),
clusterStateWatcher: watcher.NewClusterState(m.accessControlClientset.DiscoveryClient()),
}, nil
var err error
if p.strategy == config.ClusterProviderInCluster || IsInCluster(p.staticConfig) {
p.manager, err = NewInClusterManager(p.staticConfig)
} else {
p.manager, err = NewKubeconfigManager(p.staticConfig, "")
}
if err != nil {
if errors.Is(err, ErrorInClusterNotInCluster) {
return fmt.Errorf("server must be deployed in cluster for the %s ClusterProviderStrategy: %v",
p.strategy, err)
}
return err
}

p.Close()
p.kubeconfigWatcher = watcher.NewKubeconfig(p.manager.accessControlClientset.clientCmdConfig)
p.clusterStateWatcher = watcher.NewClusterState(p.manager.accessControlClientset.DiscoveryClient())
return nil
}

func (p *singleClusterProvider) IsOpenShift(ctx context.Context) bool {
Expand Down Expand Up @@ -91,16 +103,21 @@ func (p *singleClusterProvider) GetTargetParameterName() string {
}

func (p *singleClusterProvider) WatchTargets(reload McpReload) {
reloadWithCacheInvalidate := func() error {
// Invalidate all cached managers to force reloading on next access
p.manager.Invalidate()
reloadWithReset := func() error {
if err := p.reset(); err != nil {
return err
}
p.WatchTargets(reload)
return reload()
}
p.kubeconfigWatcher.Watch(reloadWithCacheInvalidate)
p.clusterStateWatcher.Watch(reloadWithCacheInvalidate)
p.kubeconfigWatcher.Watch(reloadWithReset)
p.clusterStateWatcher.Watch(reload)
}

func (p *singleClusterProvider) Close() {
_ = p.kubeconfigWatcher.Close()
_ = p.clusterStateWatcher.Close()
for _, w := range []watcher.Watcher{p.kubeconfigWatcher, p.clusterStateWatcher} {
if !reflect.ValueOf(w).IsNil() {
w.Close()
}
}
}
180 changes: 180 additions & 0 deletions pkg/kubernetes/provider_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package kubernetes

import (
"errors"
"fmt"
"reflect"
"testing"
"time"

"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

type ProviderWatchTargetsTestSuite struct {
suite.Suite
mockServer *test.MockServer
discoveryClientHandler *test.DiscoveryClientHandler
kubeconfig *clientcmdapi.Config
staticConfig *config.StaticConfig
}

func (s *ProviderWatchTargetsTestSuite) SetupTest() {
s.mockServer = test.NewMockServer()
s.discoveryClientHandler = &test.DiscoveryClientHandler{}
s.mockServer.Handle(s.discoveryClientHandler)

s.T().Setenv("CLUSTER_STATE_POLL_INTERVAL_MS", "100")
s.T().Setenv("CLUSTER_STATE_DEBOUNCE_WINDOW_MS", "50")

// Add multiple fake contexts to allow testing of context changes
s.kubeconfig = s.mockServer.Kubeconfig()
for i := 0; i < 10; i++ {
name := fmt.Sprintf("context-%d", i)
s.kubeconfig.Contexts[name] = clientcmdapi.NewContext()
s.kubeconfig.Contexts[name].Cluster = s.kubeconfig.Contexts[s.kubeconfig.CurrentContext].Cluster
s.kubeconfig.Contexts[name].AuthInfo = s.kubeconfig.Contexts[s.kubeconfig.CurrentContext].AuthInfo
}

s.staticConfig = &config.StaticConfig{KubeConfig: test.KubeconfigFile(s.T(), s.kubeconfig)}
}

func (s *ProviderWatchTargetsTestSuite) TearDownTest() {
s.mockServer.Close()
}

func (s *ProviderWatchTargetsTestSuite) TestClusterStateChanges() {
testCases := []func() (Provider, error){
func() (Provider, error) { return newKubeConfigClusterProvider(s.staticConfig) },
func() (Provider, error) {
return newSingleClusterProvider(config.ClusterProviderDisabled)(s.staticConfig)
},
}
for _, tc := range testCases {
provider, err := tc()
s.Require().NoError(err, "Expected no error from provider creation")

s.Run("With provider "+reflect.TypeOf(provider).String(), func() {
callback, waitForCallback := CallbackWaiter()
provider.WatchTargets(callback)
s.Run("Reloads provider on cluster changes", func() {
s.discoveryClientHandler.Groups = append(s.discoveryClientHandler.Groups, `{"name":"alex.example.com","versions":[{"groupVersion":"alex.example.com/v1","version":"v1"}],"preferredVersion":{"groupVersion":"alex.example.com/v1","version":"v1"}}`)

s.Require().NoError(waitForCallback(5 * time.Second))
// Provider-wise the watcher.ClusterState which triggers the callback has no effect.
// We might consider removing it at some point? (20251202)
})
})
}
}

func (s *ProviderWatchTargetsTestSuite) TestKubeConfigClusterProvider() {
provider, err := newKubeConfigClusterProvider(s.staticConfig)
s.Require().NoError(err, "Expected no error from provider creation")

callback, waitForCallback := CallbackWaiter()
provider.WatchTargets(callback)

s.Run("KubeConfigClusterProvider updates targets (reset) on kubeconfig change", func() {
s.kubeconfig.CurrentContext = "context-1"
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
s.Require().NoError(waitForCallback(5 * time.Second))

s.Run("Replaces default target with new context", func() {
s.Equal("context-1", provider.GetDefaultTarget(), "Expected default target context to be updated")
})
s.Run("Adds new context to targets", func() {
targets, err := provider.GetTargets(s.T().Context())
s.Require().NoError(err, "Expected no error from GetTargets")
s.Contains(targets, "context-1")
})
s.Run("Has derived Kubernetes for new context", func() {
k, err := provider.GetDerivedKubernetes(s.T().Context(), "context-1")
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes for context-1")
s.NotNil(k, "Expected Kubernetes from GetDerivedKubernetes for context-1")
s.Run("Derived Kubernetes points to correct context", func() {
cfg, err := k.AccessControlClientset().ToRawKubeConfigLoader().RawConfig()
s.Require().NoError(err, "Expected no error from ToRawKubeConfigLoader")
s.Equal("context-1", cfg.CurrentContext, "Expected Kubernetes to point to changed-context")
})
})

s.Run("Keeps watching for further changes", func() {
s.kubeconfig.CurrentContext = "context-2"
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
s.Require().NoError(waitForCallback(5 * time.Second))

s.Run("Replaces default target with new context", func() {
s.Equal("context-2", provider.GetDefaultTarget(), "Expected default target context to be updated")
})
})
})
}

func (s *ProviderWatchTargetsTestSuite) TestSingleClusterProvider() {
provider, err := newSingleClusterProvider(config.ClusterProviderDisabled)(s.staticConfig)
s.Require().NoError(err, "Expected no error from provider creation")

callback, waitForCallback := CallbackWaiter()
provider.WatchTargets(callback)

s.Run("SingleClusterProvider reloads/resets on kubeconfig change", func() {
s.kubeconfig.CurrentContext = "context-1"
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
s.Require().NoError(waitForCallback(5 * time.Second))

s.Run("Derived Kubernetes points to updated context", func() {
k, err := provider.GetDerivedKubernetes(s.T().Context(), "")
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes for context-1")
s.NotNil(k, "Expected Kubernetes from GetDerivedKubernetes for context-1")
s.Run("Derived Kubernetes points to correct context", func() {
cfg, err := k.AccessControlClientset().ToRawKubeConfigLoader().RawConfig()
s.Require().NoError(err, "Expected no error from ToRawKubeConfigLoader")
s.Equal("context-1", cfg.CurrentContext, "Expected Kubernetes to point to changed-context")
})
})

s.Run("Keeps watching for further changes", func() {
s.kubeconfig.CurrentContext = "context-2"
s.Require().NoError(clientcmd.WriteToFile(*s.kubeconfig, s.staticConfig.KubeConfig))
s.Require().NoError(waitForCallback(5 * time.Second))

s.Run("Derived Kubernetes points to updated context", func() {
k, err := provider.GetDerivedKubernetes(s.T().Context(), "")
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes for context-2")
s.NotNil(k, "Expected Kubernetes from GetDerivedKubernetes for context-2")
cfg, err := k.AccessControlClientset().ToRawKubeConfigLoader().RawConfig()
s.Require().NoError(err, "Expected no error from ToRawKubeConfigLoader")
s.Equal("context-2", cfg.CurrentContext, "Expected Kubernetes to point to changed-context")
})
})
})
}

// CallbackWaiter returns a callback and wait function that can be used multiple times.
func CallbackWaiter() (callback func() error, waitFunc func(timeout time.Duration) error) {
signal := make(chan struct{}, 1)
callback = func() error {
select {
case signal <- struct{}{}:
default:
}
return nil
}
waitFunc = func(timeout time.Duration) error {
select {
case <-signal:
case <-time.After(timeout):
return errors.New("timeout waiting for callback")
}
return nil
}
return
}

func TestProviderWatchTargetsTestSuite(t *testing.T) {
suite.Run(t, new(ProviderWatchTargetsTestSuite))
}
Loading