From 14906f8dfcf8f9bf11abdddf29c550563b1170f1 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Tue, 16 Jul 2024 13:28:06 -0400 Subject: [PATCH] Add support for a chunked release storage driver This commit adds a custom helm release storage driver that overcomes limitations in the size of a single value that can be stored in etcd. In order to remain backward-compatible and also make this storage driver available, this commit also refactors the ActionConfigGetter options so that a custom function can be provided to the ActionConfigGetter that can create the desired storage driver. This commit also separates the rest config mapping into two separate options. One for interactions with the storage backend, and the other for managing release content. Signed-off-by: Joe Lanford --- pkg/client/actionconfig.go | 136 ++++++---- pkg/client/actionconfig_test.go | 29 +++ pkg/client/ownerrefclient.go | 65 +++++ pkg/storage/chunked.go | 416 ++++++++++++++++++++++++++++++ pkg/storage/chunked_test.go | 348 +++++++++++++++++++++++++ pkg/storage/labels.go | 60 +++++ pkg/storage/storage_suite_test.go | 36 +++ 7 files changed, 1043 insertions(+), 47 deletions(-) create mode 100644 pkg/client/ownerrefclient.go create mode 100644 pkg/storage/chunked.go create mode 100644 pkg/storage/chunked_test.go create mode 100644 pkg/storage/labels.go create mode 100644 pkg/storage/storage_suite_test.go diff --git a/pkg/client/actionconfig.go b/pkg/client/actionconfig.go index 120febf1..f49f5686 100644 --- a/pkg/client/actionconfig.go +++ b/pkg/client/actionconfig.go @@ -25,7 +25,6 @@ import ( "helm.sh/helm/v3/pkg/kube" "helm.sh/helm/v3/pkg/storage" "helm.sh/helm/v3/pkg/storage/driver" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" @@ -57,14 +56,25 @@ func NewActionConfigGetter(baseRestConfig *rest.Config, rm meta.RESTMapper, opts if acg.objectToClientNamespace == nil { acg.objectToClientNamespace = getObjectNamespace } - if acg.objectToStorageNamespace == nil { - acg.objectToStorageNamespace = getObjectNamespace + if acg.objectToClientRestConfig == nil { + acg.objectToClientRestConfig = func(_ context.Context, _ client.Object, baseRestConfig *rest.Config) (*rest.Config, error) { + return rest.CopyConfig(baseRestConfig), nil + } } - if acg.objectToRestConfig == nil { - acg.objectToRestConfig = func(_ context.Context, _ client.Object, baseRestConfig *rest.Config) (*rest.Config, error) { + if acg.objectToStorageRestConfig == nil { + acg.objectToStorageRestConfig = func(_ context.Context, _ client.Object, baseRestConfig *rest.Config) (*rest.Config, error) { return rest.CopyConfig(baseRestConfig), nil } } + if acg.objectToStorageDriver == nil { + if acg.objectToStorageNamespace == nil { + acg.objectToStorageNamespace = getObjectNamespace + } + acg.objectToStorageDriver = DefaultSecretsStorageDriver(SecretsStorageDriverOpts{ + DisableOwnerRefInjection: acg.disableStorageOwnerRefInjection, + StorageNamespaceMapper: acg.objectToStorageNamespace, + }) + } return acg, nil } @@ -73,6 +83,14 @@ var _ ActionConfigGetter = &actionConfigGetter{} type ActionConfigGetterOption func(getter *actionConfigGetter) type ObjectToStringMapper func(client.Object) (string, error) +type ObjectToRestConfigMapper func(context.Context, client.Object, *rest.Config) (*rest.Config, error) +type ObjectToStorageDriverMapper func(context.Context, client.Object, *rest.Config) (driver.Driver, error) + +func ClientRestConfigMapper(f ObjectToRestConfigMapper) ActionConfigGetterOption { // nolint:revive + return func(getter *actionConfigGetter) { + getter.objectToClientRestConfig = f + } +} func ClientNamespaceMapper(m ObjectToStringMapper) ActionConfigGetterOption { // nolint:revive return func(getter *actionConfigGetter) { @@ -80,21 +98,37 @@ func ClientNamespaceMapper(m ObjectToStringMapper) ActionConfigGetterOption { // } } +func StorageRestConfigMapper(f ObjectToRestConfigMapper) ActionConfigGetterOption { + return func(getter *actionConfigGetter) { + getter.objectToStorageRestConfig = f + } +} + +func StorageDriverMapper(f ObjectToStorageDriverMapper) ActionConfigGetterOption { + return func(getter *actionConfigGetter) { + getter.objectToStorageDriver = f + } +} + +// Deprecated: use StorageDriverMapper(DefaultSecretsStorageDriver(SecretsStorageDriverOpts)) instead. func StorageNamespaceMapper(m ObjectToStringMapper) ActionConfigGetterOption { return func(getter *actionConfigGetter) { getter.objectToStorageNamespace = m } } +// Deprecated: use StorageDriverMapper(DefaultSecretsStorageDriver(SecretsStorageDriverOpts)) instead. func DisableStorageOwnerRefInjection(v bool) ActionConfigGetterOption { return func(getter *actionConfigGetter) { getter.disableStorageOwnerRefInjection = v } } +// Deprecated: use ClientRestConfigMapper and StorageRestConfigMapper instead. func RestConfigMapper(f func(context.Context, client.Object, *rest.Config) (*rest.Config, error)) ActionConfigGetterOption { return func(getter *actionConfigGetter) { - getter.objectToRestConfig = f + getter.objectToClientRestConfig = f + getter.objectToStorageRestConfig = f } } @@ -107,21 +141,22 @@ type actionConfigGetter struct { restMapper meta.RESTMapper discoveryClient discovery.CachedDiscoveryInterface - objectToClientNamespace ObjectToStringMapper - objectToStorageNamespace ObjectToStringMapper - objectToRestConfig func(context.Context, client.Object, *rest.Config) (*rest.Config, error) + objectToClientRestConfig ObjectToRestConfigMapper + objectToClientNamespace ObjectToStringMapper + + objectToStorageRestConfig ObjectToRestConfigMapper + objectToStorageDriver ObjectToStorageDriverMapper + + // Deprecated: only keep around for backward compatibility with StorageNamespaceMapper option. + objectToStorageNamespace ObjectToStringMapper + // Deprecated: only keep around for backward compatibility with DisableStorageOwnerRefInjection option. disableStorageOwnerRefInjection bool } func (acg *actionConfigGetter) ActionConfigFor(ctx context.Context, obj client.Object) (*action.Configuration, error) { - storageNs, err := acg.objectToStorageNamespace(obj) + clientRestConfig, err := acg.objectToClientRestConfig(ctx, obj, acg.baseRestConfig) if err != nil { - return nil, fmt.Errorf("get storage namespace for object: %v", err) - } - - restConfig, err := acg.objectToRestConfig(ctx, obj, acg.baseRestConfig) - if err != nil { - return nil, fmt.Errorf("get rest config for object: %v", err) + return nil, fmt.Errorf("get client rest config for object: %v", err) } clientNamespace, err := acg.objectToClientNamespace(obj) @@ -129,36 +164,30 @@ func (acg *actionConfigGetter) ActionConfigFor(ctx context.Context, obj client.O return nil, fmt.Errorf("get client namespace for object: %v", err) } - rcg := newRESTClientGetter(restConfig, acg.restMapper, acg.discoveryClient, clientNamespace) - kc := kube.New(rcg) - kc.Namespace = clientNamespace - - kcs, err := kc.Factory.KubernetesClientSet() - if err != nil { - return nil, fmt.Errorf("create kubernetes clientset: %v", err) - } + clientRCG := newRESTClientGetter(clientRestConfig, acg.restMapper, acg.discoveryClient, clientNamespace) + clientKC := kube.New(clientRCG) + clientKC.Namespace = clientNamespace // Setup the debug log function that Helm will use debugLog := getDebugLogger(ctx) - secretClient := kcs.CoreV1().Secrets(storageNs) - if !acg.disableStorageOwnerRefInjection { - ownerRef := metav1.NewControllerRef(obj, obj.GetObjectKind().GroupVersionKind()) - secretClient = &ownerRefSecretClient{ - SecretInterface: secretClient, - refs: []metav1.OwnerReference{*ownerRef}, - } + storageRestConfig, err := acg.objectToStorageRestConfig(ctx, obj, acg.baseRestConfig) + if err != nil { + return nil, fmt.Errorf("get storage rest config for object: %v", err) + } + + d, err := acg.objectToStorageDriver(ctx, obj, storageRestConfig) + if err != nil { + return nil, fmt.Errorf("get storage driver for object: %v", err) } - d := driver.NewSecrets(secretClient) - d.Log = debugLog // Initialize the storage backend s := storage.Init(d) return &action.Configuration{ - RESTClientGetter: rcg, + RESTClientGetter: clientRCG, Releases: s, - KubeClient: kc, + KubeClient: clientKC, Log: debugLog, }, nil } @@ -173,19 +202,32 @@ func getDebugLogger(ctx context.Context) func(format string, v ...interface{}) { } } -var _ v1.SecretInterface = &ownerRefSecretClient{} - -type ownerRefSecretClient struct { - v1.SecretInterface - refs []metav1.OwnerReference +type SecretsStorageDriverOpts struct { + DisableOwnerRefInjection bool + StorageNamespaceMapper ObjectToStringMapper } -func (c *ownerRefSecretClient) Create(ctx context.Context, in *corev1.Secret, opts metav1.CreateOptions) (*corev1.Secret, error) { - in.OwnerReferences = append(in.OwnerReferences, c.refs...) - return c.SecretInterface.Create(ctx, in, opts) -} +func DefaultSecretsStorageDriver(opts SecretsStorageDriverOpts) ObjectToStorageDriverMapper { + if opts.StorageNamespaceMapper == nil { + opts.StorageNamespaceMapper = getObjectNamespace + } + return func(ctx context.Context, obj client.Object, restConfig *rest.Config) (driver.Driver, error) { + storageNamespace, err := opts.StorageNamespaceMapper(obj) + if err != nil { + return nil, fmt.Errorf("get storage namespace for object: %v", err) + } + secretsInterface, err := v1.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("create secrets client for storage: %v", err) + } -func (c *ownerRefSecretClient) Update(ctx context.Context, in *corev1.Secret, opts metav1.UpdateOptions) (*corev1.Secret, error) { - in.OwnerReferences = append(in.OwnerReferences, c.refs...) - return c.SecretInterface.Update(ctx, in, opts) + secretClient := secretsInterface.Secrets(storageNamespace) + if !opts.DisableOwnerRefInjection { + ownerRef := metav1.NewControllerRef(obj, obj.GetObjectKind().GroupVersionKind()) + secretClient = NewOwnerRefSecretClient(secretClient, []metav1.OwnerReference{*ownerRef}, MatchAllSecrets) + } + d := driver.NewSecrets(secretClient) + d.Log = getDebugLogger(ctx) + return d, nil + } } diff --git a/pkg/client/actionconfig_test.go b/pkg/client/actionconfig_test.go index 6a3ee470..8ca53995 100644 --- a/pkg/client/actionconfig_test.go +++ b/pkg/client/actionconfig_test.go @@ -23,8 +23,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/kube" + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/storage/driver" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -185,6 +188,32 @@ metadata: Expect(err).ToNot(HaveOccurred()) Expect(ac2.RESTClientGetter.ToRESTConfig()).To(WithTransform(func(c *rest.Config) string { return c.BearerToken }, Equal("test2"))) }) + + It("should use a custom storage driver", func() { + storageDriver := driver.NewMemory() + + storageDriverMapper := func(ctx context.Context, obj client.Object, cfg *rest.Config) (driver.Driver, error) { + return storageDriver, nil + } + acg, err := NewActionConfigGetter(cfg, rm, StorageDriverMapper(storageDriverMapper)) + Expect(err).ToNot(HaveOccurred()) + + testObject := func(name string) client.Object { + u := unstructured.Unstructured{} + u.SetName(name) + return &u + } + + ac, err := acg.ActionConfigFor(context.Background(), testObject("test1")) + Expect(err).ToNot(HaveOccurred()) + + expected := &release.Release{Name: "test1", Version: 2, Info: &release.Info{Status: release.StatusDeployed}} + Expect(ac.Releases.Create(expected)).To(Succeed()) + actual, err := storageDriver.List(func(r *release.Release) bool { return true }) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(HaveLen(1)) + Expect(actual[0]).To(Equal(expected)) + }) }) }) diff --git a/pkg/client/ownerrefclient.go b/pkg/client/ownerrefclient.go new file mode 100644 index 00000000..c417cc64 --- /dev/null +++ b/pkg/client/ownerrefclient.go @@ -0,0 +1,65 @@ +package client + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +var _ clientcorev1.SecretInterface = &ownerRefSecretClient{} + +// NewOwnerRefSecretClient returns a SecretInterface that injects the provided owner references +// to all created or updated secrets that match the provided match function. If match is nil, all +// secrets are matched. +func NewOwnerRefSecretClient(client clientcorev1.SecretInterface, refs []metav1.OwnerReference, match func(*corev1.Secret) bool) clientcorev1.SecretInterface { + if match == nil { + match = MatchAllSecrets + } + return &ownerRefSecretClient{ + SecretInterface: client, + match: match, + refs: refs, + } +} + +func MatchAllSecrets(_ *corev1.Secret) bool { + return true +} + +type ownerRefSecretClient struct { + clientcorev1.SecretInterface + match func(secret *corev1.Secret) bool + refs []metav1.OwnerReference +} + +func (c *ownerRefSecretClient) appendMissingOwnerRefs(secret *corev1.Secret) { + hasOwnerRef := func(secret *corev1.Secret, ref metav1.OwnerReference) bool { + for _, r := range secret.OwnerReferences { + if r.UID == ref.UID { + return true + } + } + return false + } + for i := range c.refs { + if !hasOwnerRef(secret, c.refs[i]) { + secret.OwnerReferences = append(secret.OwnerReferences, c.refs[i]) + } + } +} + +func (c *ownerRefSecretClient) Create(ctx context.Context, in *corev1.Secret, opts metav1.CreateOptions) (*corev1.Secret, error) { + if c.match == nil || c.match(in) { + c.appendMissingOwnerRefs(in) + } + return c.SecretInterface.Create(ctx, in, opts) +} + +func (c *ownerRefSecretClient) Update(ctx context.Context, in *corev1.Secret, opts metav1.UpdateOptions) (*corev1.Secret, error) { + if c.match == nil || c.match(in) { + c.appendMissingOwnerRefs(in) + } + return c.SecretInterface.Update(ctx, in, opts) +} diff --git a/pkg/storage/chunked.go b/pkg/storage/chunked.go new file mode 100644 index 00000000..4ebecbec --- /dev/null +++ b/pkg/storage/chunked.go @@ -0,0 +1,416 @@ +package storage + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/base32" + "encoding/json" + "fmt" + "hash" + "hash/fnv" + "io" + "strconv" + "sync" + "time" + + "github.com/pkg/errors" + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/storage/driver" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/utils/ptr" +) + +var _ driver.Driver = (*chunkedSecrets)(nil) + +type ChunkedSecretsConfig struct { + ChunkSize int + MaxReadChunks int + MaxWriteChunks int + Log func(string, ...interface{}) +} + +func NewChunkedSecrets(client clientcorev1.SecretInterface, owner string, config ChunkedSecretsConfig) driver.Driver { + if config.Log == nil { + config.Log = func(string, ...interface{}) {} + } + + return &chunkedSecrets{ + client: client, + owner: owner, + ChunkedSecretsConfig: config, + + hashEncoding: base32.NewEncoding("abcdefghijklmnopqrstuvwxyz123456").WithPadding(base32.NoPadding), + hash: fnv.New64a(), + } +} + +type chunkedSecrets struct { + client clientcorev1.SecretInterface + owner string + ChunkedSecretsConfig + + hashMu sync.Mutex + hash hash.Hash64 + hashEncoding *base32.Encoding +} + +func (c *chunkedSecrets) Create(key string, rls *release.Release) error { + c.Log("create: %q", key) + defer c.Log("created: %q", key) + + chunks, err := c.encodeReleaseAsChunks(key, rls) + if err != nil { + return fmt.Errorf("create: failed to encode release %q: %w", rls.Name, err) + } + + createdAt := time.Now() + indexSecret := c.indexSecretFromChunks(key, rls, chunks) + indexSecret.Labels["createdAt"] = strconv.Itoa(int(createdAt.Unix())) + indexSecret, err = c.client.Create(context.Background(), indexSecret, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + return driver.ErrReleaseExists + } + return fmt.Errorf("create: failed to create index and chunk %d of %d secret %q: %w", 1, len(chunks), key, err) + } + + for i, ch := range chunks[1:] { + chunkSecret := c.chunkSecretFromChunk(indexSecret, ch) + chunkSecret.Labels["createdAt"] = strconv.Itoa(int(createdAt.Unix())) + if _, err := c.client.Create(context.Background(), chunkSecret, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create: failed to create chunk secret %d of %d %q: %w", i+2, len(chunks), ch.name, err) + } + } + return nil +} + +type chunk struct { + name string + data []byte +} + +// encodeRelease encodes a release returning a base64 encoded +// gzipped string representation, or error. +func (c *chunkedSecrets) encodeReleaseAsChunks(key string, rls *release.Release) ([]chunk, error) { + buf := &bytes.Buffer{} + + if err := func() error { + gzw, err := gzip.NewWriterLevel(buf, gzip.BestCompression) + if err != nil { + return err + } + defer gzw.Close() + return json.NewEncoder(gzw).Encode(rls) + }(); err != nil { + return nil, err + } + data := buf.Bytes() + + // Split the encoded release into chunks of chunkSize + // and return the chunks. + var chunks []chunk + for i := 0; i < len(data); i += c.ChunkSize { + end := i + c.ChunkSize + if end > len(data) { + end = len(data) + } + chunks = append(chunks, chunk{ + name: fmt.Sprintf("%s-%s", key, c.hashForData(data[i:end])), + data: data[i:end], + }) + } + + if c.MaxWriteChunks > 0 && len(chunks) > c.MaxWriteChunks { + return nil, fmt.Errorf("release too large: %q requires %d chunks, which exceeds the maximum of %d", rls.Name, len(chunks), c.MaxWriteChunks) + } + + return chunks, nil +} + +const ( + SecretTypeChunkedIndex = corev1.SecretType("operatorframework.io/index.v1") + SecretTypeChunkedChunk = corev1.SecretType("operatorframework.io/chunk.v1") +) + +func (c *chunkedSecrets) indexSecretFromChunks(key string, rls *release.Release, chunks []chunk) *corev1.Secret { + extraChunkNames := make([]string, 0, len(chunks)-1) + for _, ch := range chunks[1:] { + extraChunkNames = append(extraChunkNames, ch.name) + } + extraChunkNamesData, err := json.Marshal(extraChunkNames) + if err != nil { + panic(err) + } + + indexLabels := newIndexLabels(c.owner, key, rls) + indexSecret := &corev1.Secret{ + Type: SecretTypeChunkedIndex, + ObjectMeta: metav1.ObjectMeta{ + Name: key, + Labels: indexLabels, + }, + Immutable: ptr.To(false), + Data: map[string][]byte{ + "extraChunks": extraChunkNamesData, + "chunk": chunks[0].data, + }, + } + return indexSecret +} + +func (c *chunkedSecrets) chunkSecretFromChunk(indexSecret *corev1.Secret, ch chunk) *corev1.Secret { + chunkLabels := newChunkLabels(c.owner, indexSecret.Name) + chunkSecret := &corev1.Secret{ + Type: SecretTypeChunkedChunk, + ObjectMeta: metav1.ObjectMeta{ + Name: ch.name, + Labels: chunkLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Secret", + Name: indexSecret.Name, + UID: indexSecret.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(false), + }, + }, + }, + Immutable: ptr.To(true), + Data: map[string][]byte{ + "chunk": ch.data, + }, + } + return chunkSecret +} + +func (c *chunkedSecrets) getIndex(ctx context.Context, key string) (*corev1.Secret, error) { + indexSecret, err := c.client.Get(ctx, key, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, driver.ErrReleaseNotFound + } + return nil, fmt.Errorf("failed to get secret for key %q: %w", key, err) + } + return indexSecret, nil +} + +func (c *chunkedSecrets) Update(key string, rls *release.Release) error { + c.Log("update: %q", key) + defer c.Log("updated: %q", key) + + // Get the existing index secret to make sure it exists + existingIndex, err := c.getIndex(context.Background(), key) + if err != nil { + return fmt.Errorf("update: %w", err) + } + + // Delete the existing chunk secrets + if err := c.client.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: newListChunksForKeySelector(c.owner, existingIndex.Name).String()}); err != nil { + return fmt.Errorf("update: failed to delete previous chunk secrets for key %q: %w", key, err) + } + + // Generate new chunks + chunks, err := c.encodeReleaseAsChunks(key, rls) + if err != nil { + return fmt.Errorf("create: failed to encode release %q: %w", rls.Name, err) + } + + modifiedAt := time.Now() + + // Update the index secret + updatedIndexSecret := c.indexSecretFromChunks(key, rls, chunks) + updatedIndexSecret.Labels["createdAt"] = existingIndex.Labels["createdAt"] + updatedIndexSecret.Labels["modifiedAt"] = strconv.Itoa(int(modifiedAt.Unix())) + updatedIndexSecret, err = c.client.Update(context.Background(), updatedIndexSecret, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("create: failed to create index and chunk %d of %d secret %q: %w", 1, len(chunks), key, err) + } + + // Create the new chunks + for i, ch := range chunks[1:] { + chunkSecret := c.chunkSecretFromChunk(updatedIndexSecret, ch) + if _, err := c.client.Create(context.Background(), chunkSecret, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create: failed to create chunk secret %d of %d %q: %w", i+2, len(chunks), ch.name, err) + } + } + return nil +} + +func (c *chunkedSecrets) Delete(key string) (*release.Release, error) { + c.Log("delete: %q", key) + defer c.Log("deleted: %q", key) + + indexSecret, rls, err := c.getIndexAndRelease(key) + if err != nil { + if errors.Is(err, driver.ErrReleaseNotFound) { + return nil, driver.ErrReleaseNotFound + } + return nil, fmt.Errorf("delete: %w", err) + } + if err := c.client.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: newListAllForKeySelector(c.owner, key).String()}); err != nil { + return nil, fmt.Errorf("delete: failed to delete index secret %q: %w", indexSecret.Name, err) + } + return rls, nil +} + +func (c *chunkedSecrets) getIndexAndRelease(key string) (*corev1.Secret, *release.Release, error) { + indexSecret, err := c.getIndex(context.Background(), key) + if err != nil { + return nil, nil, err + } + + rls, err := c.decodeRelease(context.Background(), indexSecret) + if err != nil { + return nil, nil, fmt.Errorf("failed to decode release from index secret %q: %w", indexSecret.Name, err) + } + return indexSecret, rls, nil +} + +func (c *chunkedSecrets) Get(key string) (*release.Release, error) { + c.Log("get: %q", key) + defer c.Log("got: %q", key) + + _, rls, err := c.getIndexAndRelease(key) + if err != nil { + return nil, fmt.Errorf("get: %w", err) + } + return rls, nil +} + +func (c *chunkedSecrets) List(filter func(*release.Release) bool) ([]*release.Release, error) { + c.Log("list") + defer c.Log("listed") + + indexSecrets, err := c.client.List(context.Background(), metav1.ListOptions{LabelSelector: newListIndicesLabelSelector(c.owner).String()}) + if err != nil { + return nil, fmt.Errorf("list: %w", err) + } + + var results []*release.Release + for _, indexSecret := range indexSecrets.Items { + indexSecret := indexSecret + rls, err := c.decodeRelease(context.Background(), &indexSecret) + if err != nil { + return nil, fmt.Errorf("list: failed to decode release for key %q: %w", indexSecret.Labels["key"], err) + } + rls.Labels = indexSecret.Labels + if filter(rls) { + results = append(results, rls) + } + } + return results, nil +} + +func (c *chunkedSecrets) Query(queryLabels map[string]string) ([]*release.Release, error) { + for k, v := range queryLabels { + if k == "owner" && v == "helm" { + // Helm hardcodes some queries with owner=helm. We'll translate this + // to use our owner value + queryLabels[k] = c.owner + } + } + c.Log("query: labels=%v", queryLabels) + defer c.Log("queried: labels=%v", queryLabels) + + selector := newListIndicesLabelSelector(c.owner) + if queryRequirements, selectable := labels.Set(queryLabels).AsSelector().Requirements(); selectable { + selector = selector.Add(queryRequirements...) + } + + indexSecrets, err := c.client.List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + + if len(indexSecrets.Items) == 0 { + return nil, driver.ErrReleaseNotFound + } + + results := make([]*release.Release, 0, len(indexSecrets.Items)) + for _, indexSecret := range indexSecrets.Items { + indexSecret := indexSecret + rls, err := c.decodeRelease(context.Background(), &indexSecret) + if err != nil { + return nil, fmt.Errorf("query: failed to decode release: %w", err) + } + results = append(results, rls) + } + return results, nil +} + +func (c *chunkedSecrets) Name() string { + return fmt.Sprintf("%s/chunkedSecrets", c.owner) +} + +func (c *chunkedSecrets) decodeRelease(ctx context.Context, indexSecret *corev1.Secret) (*release.Release, error) { + extraChunkNamesData, ok := indexSecret.Data["extraChunks"] + if !ok { + return nil, fmt.Errorf("index secret %q missing chunks data: %#v", indexSecret.Name, indexSecret) + } + + var extraChunkNames []string + if err := json.Unmarshal(extraChunkNamesData, &extraChunkNames); err != nil { + return nil, fmt.Errorf("failed to parse chunk names from index: %w", err) + } + + if c.MaxReadChunks > 0 && 1+len(extraChunkNames) > c.MaxReadChunks { + return nil, fmt.Errorf("release too large: %q consists of %d chunks, which exceeds the maximum of %d", indexSecret.Name, 1+len(extraChunkNames), c.MaxReadChunks) + } + + pr, pw := io.Pipe() + go func() { + defer pw.Close() + firstChunkData, ok := indexSecret.Data["chunk"] + if !ok { + pw.CloseWithError(fmt.Errorf("index secret %q missing chunk %d data", indexSecret.Name, 1)) + return + } + if _, err := pw.Write(firstChunkData); err != nil { + pw.CloseWithError(fmt.Errorf("failed to write chunk %d data from %q: %w", 1, indexSecret.Name, err)) + return + } + for i, chunkName := range extraChunkNames { + chunkSecret, err := c.client.Get(ctx, chunkName, metav1.GetOptions{}) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to get chunk %d secret %q: %w", i+2, chunkName, err)) + return + } + chunkData, ok := chunkSecret.Data["chunk"] + if !ok { + pw.CloseWithError(fmt.Errorf("chunk %d secret %q missing chunk data", i+2, chunkName)) + return + } + if _, err := pw.Write(chunkData); err != nil { + pw.CloseWithError(fmt.Errorf("failed to write chunk %d data from %q: %w", i+2, chunkName, err)) + return + } + } + }() + + gzr, err := gzip.NewReader(pr) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %w", err) + } + releaseDecoder := json.NewDecoder(gzr) + var r release.Release + if err := releaseDecoder.Decode(&r); err != nil { + return nil, fmt.Errorf("failed to decode release: %w", err) + } + r.Labels = filterSystemLabels(indexSecret.Labels) + return &r, nil +} + +func (c *chunkedSecrets) hashForData(data []byte) string { + c.hashMu.Lock() + defer c.hashMu.Unlock() + + c.hash.Reset() + _, _ = c.hash.Write(data) + return c.hashEncoding.EncodeToString(c.hash.Sum(nil)) +} diff --git a/pkg/storage/chunked_test.go b/pkg/storage/chunked_test.go new file mode 100644 index 00000000..420cf36e --- /dev/null +++ b/pkg/storage/chunked_test.go @@ -0,0 +1,348 @@ +package storage + +import ( + "context" + "encoding/json" + "fmt" + "maps" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/storage/driver" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/utils/ptr" +) + +var _ = Describe("chunkedSecrets", func() { + const chunkSize = 1000 + var ( + secretInterface clientcorev1.SecretInterface + chunkedDriver driver.Driver + ) + + BeforeEach(func() { + secretInterface = clientcorev1.NewForConfigOrDie(cfg).Secrets("default") + chunkedDriver = NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 2, + MaxWriteChunks: 2, + }) + }) + + AfterEach(func() { + Expect(secretInterface.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{})).To(Succeed()) + }) + + var _ = Describe("Create", func() { + It("should create a large release with multiple secrets", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + }) + It("should create a small release with a single secret", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 1) + }) + It("should fail if the release already exists", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + + // Change the status to produce a release with the same key, but different content. + rel.Info.Status = release.StatusDeployed + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(MatchError(driver.ErrReleaseExists)) + }) + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*4) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("Get", func() { + It("should get a large release that is chunked", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + actual, err := chunkedDriver.Get(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + }) + It("should get a small release that is not chunked", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + actual, err := chunkedDriver.Get(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + }) + It("should fail if the release does not exist", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + _, err := chunkedDriver.Get(releaseKey(rel)) + Expect(err).To(MatchError(driver.ErrReleaseNotFound)) + }) + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + _, err := maxReadDriver.Get(releaseKey(rel)) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("Update", func() { + It("should update a single-secret release to a multi-secret release", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 1) + + // Change the status to produce a release with the same key, but different content. + rel = genRelease("test-release", 1, release.StatusDeployed, nil, chunkSize*2) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + }) + It("should update a multi-secret release to a single-secret release", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + + // Change the status to produce a release with the same key, but different content. + rel = genRelease("test-release", 1, release.StatusDeployed, nil, chunkSize/2) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 1) + }) + It("should fail if the release does not exist", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(MatchError(driver.ErrReleaseNotFound)) + }) + + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + + // Change the status to produce a release with the same key, but different content. + rel = genRelease("test-release", 1, release.StatusDeployed, nil, chunkSize*4) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("Delete", func() { + It("should delete a multi-secret release", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + verifySecrets(secretInterface, 2) + + actual, err := chunkedDriver.Delete(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + verifySecrets(secretInterface, 0) + }) + It("should delete a single-secret release", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + verifySecrets(secretInterface, 1) + + actual, err := chunkedDriver.Delete(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + verifySecrets(secretInterface, 0) + }) + It("should fail if the release does not exist", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + _, err := chunkedDriver.Delete(releaseKey(rel)) + Expect(err).To(MatchError(driver.ErrReleaseNotFound)) + }) + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + + _, err := maxReadDriver.Delete(releaseKey(rel)) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("List", func() { + BeforeEach(func() { + releases := []*release.Release{ + genRelease("a", 1, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 2, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 3, release.StatusSuperseded, nil, chunkSize*2), + genRelease("a", 4, release.StatusDeployed, nil, chunkSize*2), + + genRelease("b", 1, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 2, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 3, release.StatusDeployed, nil, chunkSize/2), + } + for _, rel := range releases { + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + } + }) + It("should list releases by name", func() { + aReleases, err := chunkedDriver.List(func(rel *release.Release) bool { + return rel.Name == "a" + }) + Expect(err).ToNot(HaveOccurred()) + Expect(aReleases).To(HaveLen(4)) + }) + + It("should list releases by status", func() { + deployedReleases, err := chunkedDriver.List(func(rel *release.Release) bool { + return rel.Info.Status == release.StatusDeployed + }) + Expect(err).ToNot(HaveOccurred()) + Expect(deployedReleases).To(HaveLen(2)) + }) + + It("should return an empty list if no releases match", func() { + cReleases, err := chunkedDriver.List(func(rel *release.Release) bool { + return rel.Name == "c" + }) + Expect(err).ToNot(HaveOccurred()) + Expect(cReleases).To(BeEmpty()) + }) + + It("should fail if any release is too large", func() { + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + actual, err := maxReadDriver.List(func(rel *release.Release) bool { return true }) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + Expect(actual).To(BeNil()) + }) + }) + + var _ = Describe("Query", func() { + BeforeEach(func() { + releases := []*release.Release{ + genRelease("a", 1, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 2, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 3, release.StatusSuperseded, nil, chunkSize*2), + genRelease("a", 4, release.StatusDeployed, nil, chunkSize*2), + + genRelease("b", 1, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 2, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 3, release.StatusDeployed, map[string]string{"key1": "val1"}, chunkSize/2), + } + for _, rel := range releases { + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + } + }) + + It("should query releases by custom labels", func() { + key1Releases, err := chunkedDriver.Query(map[string]string{"key1": "val1"}) + Expect(err).ToNot(HaveOccurred()) + Expect(key1Releases).To(HaveLen(1)) + }) + + It("should return ErrReleaseNotFound when there is no match", func() { + _, err := chunkedDriver.Query(map[string]string{"nonexistentKey": "nonexistentVal"}) + Expect(err).To(MatchError(driver.ErrReleaseNotFound)) + }) + + It("should succeed if no matched release is too large", func() { + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + actual, err := maxReadDriver.Query(map[string]string{"name": "a", "version": "1"}) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(HaveLen(1)) + }) + + It("should fail if any matched release is too large", func() { + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + actual, err := maxReadDriver.Query(map[string]string{"name": "a"}) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + Expect(actual).To(BeNil()) + }) + + // This test is necessary because the helm storage library hardcodes the owner to "helm". + // We have no way to configure the release storage implementation to use a different owner + // when we set it up as part of the action.Configuration. + It("should translate owner=helm to owner=test-owner", func() { + allReleases, err := chunkedDriver.Query(map[string]string{"owner": "helm"}) + Expect(err).ToNot(HaveOccurred()) + Expect(allReleases).To(HaveLen(7)) + }) + }) +}) + +func verifySecrets(secretInterface clientcorev1.SecretInterface, expected int) { + GinkgoHelper() + + items, err := secretInterface.List(context.Background(), metav1.ListOptions{}) + Expect(err).ToNot(HaveOccurred()) + Expect(items.Items).To(HaveLen(expected)) + + if expected == 0 { + return + } + var indexSecrets, chunkSecrets []corev1.Secret + for _, s := range items.Items { + switch s.Type { + case SecretTypeChunkedIndex: + indexSecrets = append(indexSecrets, s) + case SecretTypeChunkedChunk: + chunkSecrets = append(chunkSecrets, s) + } + } + Expect(indexSecrets).To(HaveLen(1), "expected exactly one index secret") + + Expect(indexSecrets[0].Data).To(HaveKey("extraChunks")) + Expect(indexSecrets[0].Data).To(HaveKey("chunk")) + Expect(indexSecrets[0].Immutable).To(Equal(ptr.To(false))) + + var expectedExtraChunkNames []string + Expect(json.Unmarshal(indexSecrets[0].Data["extraChunks"], &expectedExtraChunkNames)).To(Succeed()) + + Expect(chunkSecrets).To(HaveLen(expected-1), "expected multiple chunk secrets") + actualExtraChunkNames := make([]string, 0, len(chunkSecrets)) + for _, sec := range chunkSecrets { + actualExtraChunkNames = append(actualExtraChunkNames, sec.Name) + Expect(sec.Data).To(HaveKey("chunk")) + Expect(sec.Immutable).To(Equal(ptr.To(true))) + } + + Expect(actualExtraChunkNames).To(ConsistOf(expectedExtraChunkNames)) +} + +func releaseKey(rel *release.Release) string { + return fmt.Sprintf("%s.v%d", rel.Name, rel.Version) +} + +func genRelease(name string, version int, status release.Status, extraLabels map[string]string, minSize int) *release.Release { + lbls := map[string]string{ + "globalKey": "globalValue", + } + maps.Copy(lbls, extraLabels) + return &release.Release{ + Name: name, + Version: version, + Config: map[string]interface{}{ + "takingUpSpace": rand.String(minSize), + }, + Info: &release.Info{Status: status}, + Labels: lbls, + } +} diff --git a/pkg/storage/labels.go b/pkg/storage/labels.go new file mode 100644 index 00000000..1a54c3f7 --- /dev/null +++ b/pkg/storage/labels.go @@ -0,0 +1,60 @@ +package storage + +import ( + "maps" + "strconv" + + "helm.sh/helm/v3/pkg/release" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" +) + +func newIndexLabels(owner, key string, rls *release.Release) map[string]string { + labels := map[string]string{} + maps.Copy(labels, rls.Labels) + labels["name"] = rls.Name + labels["owner"] = owner + labels["status"] = rls.Info.Status.String() + labels["version"] = strconv.Itoa(rls.Version) + labels["key"] = key + labels["type"] = "index" + return labels +} + +func newChunkLabels(owner, key string) map[string]string { + labels := map[string]string{} + labels["owner"] = owner + labels["key"] = key + labels["type"] = "chunk" + return labels +} + +func newListIndicesLabelSelector(owner string) labels.Selector { + return labels.Set{"owner": owner, "type": "index"}.AsSelector() +} + +func newListAllForKeySelector(owner, key string) labels.Selector { + return labels.Set{"owner": owner, "key": key}.AsSelector() +} + +func newListChunksForKeySelector(owner, key string) labels.Selector { + return labels.Set{"owner": owner, "key": key, "type": "chunk"}.AsSelector() +} + +var systemLabels = sets.New[string]("name", "owner", "status", "version", "key", "type", "createdAt", "modifiedAt") + +// Checks if label is system +func isSystemLabel(key string) bool { + return systemLabels.Has(key) +} + +// Removes system labels from labels map +func filterSystemLabels(lbs map[string]string) map[string]string { + result := make(map[string]string) + for k, v := range lbs { + if !isSystemLabel(k) { + result[k] = v + } + } + return result +} diff --git a/pkg/storage/storage_suite_test.go b/pkg/storage/storage_suite_test.go new file mode 100644 index 00000000..a7ec43fc --- /dev/null +++ b/pkg/storage/storage_suite_test.go @@ -0,0 +1,36 @@ +package storage + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestStorage(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Storage Suite") +} + +var ( + testenv *envtest.Environment + cfg *rest.Config +) + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + testenv = &envtest.Environment{} + + var err error + cfg, err = testenv.Start() + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = AfterSuite(func() { + Expect(testenv.Stop()).To(Succeed()) +})