diff --git a/pkg/client/actionconfig.go b/pkg/client/actionconfig.go index 120febf1..f49f5686 100644 --- a/pkg/client/actionconfig.go +++ b/pkg/client/actionconfig.go @@ -25,7 +25,6 @@ import ( "helm.sh/helm/v3/pkg/kube" "helm.sh/helm/v3/pkg/storage" "helm.sh/helm/v3/pkg/storage/driver" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" @@ -57,14 +56,25 @@ func NewActionConfigGetter(baseRestConfig *rest.Config, rm meta.RESTMapper, opts if acg.objectToClientNamespace == nil { acg.objectToClientNamespace = getObjectNamespace } - if acg.objectToStorageNamespace == nil { - acg.objectToStorageNamespace = getObjectNamespace + if acg.objectToClientRestConfig == nil { + acg.objectToClientRestConfig = func(_ context.Context, _ client.Object, baseRestConfig *rest.Config) (*rest.Config, error) { + return rest.CopyConfig(baseRestConfig), nil + } } - if acg.objectToRestConfig == nil { - acg.objectToRestConfig = func(_ context.Context, _ client.Object, baseRestConfig *rest.Config) (*rest.Config, error) { + if acg.objectToStorageRestConfig == nil { + acg.objectToStorageRestConfig = func(_ context.Context, _ client.Object, baseRestConfig *rest.Config) (*rest.Config, error) { return rest.CopyConfig(baseRestConfig), nil } } + if acg.objectToStorageDriver == nil { + if acg.objectToStorageNamespace == nil { + acg.objectToStorageNamespace = getObjectNamespace + } + acg.objectToStorageDriver = DefaultSecretsStorageDriver(SecretsStorageDriverOpts{ + DisableOwnerRefInjection: acg.disableStorageOwnerRefInjection, + StorageNamespaceMapper: acg.objectToStorageNamespace, + }) + } return acg, nil } @@ -73,6 +83,14 @@ var _ ActionConfigGetter = &actionConfigGetter{} type ActionConfigGetterOption func(getter *actionConfigGetter) type ObjectToStringMapper func(client.Object) (string, error) +type ObjectToRestConfigMapper func(context.Context, client.Object, *rest.Config) (*rest.Config, error) +type ObjectToStorageDriverMapper func(context.Context, client.Object, *rest.Config) (driver.Driver, error) + +func ClientRestConfigMapper(f ObjectToRestConfigMapper) ActionConfigGetterOption { // nolint:revive + return func(getter *actionConfigGetter) { + getter.objectToClientRestConfig = f + } +} func ClientNamespaceMapper(m ObjectToStringMapper) ActionConfigGetterOption { // nolint:revive return func(getter *actionConfigGetter) { @@ -80,21 +98,37 @@ func ClientNamespaceMapper(m ObjectToStringMapper) ActionConfigGetterOption { // } } +func StorageRestConfigMapper(f ObjectToRestConfigMapper) ActionConfigGetterOption { + return func(getter *actionConfigGetter) { + getter.objectToStorageRestConfig = f + } +} + +func StorageDriverMapper(f ObjectToStorageDriverMapper) ActionConfigGetterOption { + return func(getter *actionConfigGetter) { + getter.objectToStorageDriver = f + } +} + +// Deprecated: use StorageDriverMapper(DefaultSecretsStorageDriver(SecretsStorageDriverOpts)) instead. func StorageNamespaceMapper(m ObjectToStringMapper) ActionConfigGetterOption { return func(getter *actionConfigGetter) { getter.objectToStorageNamespace = m } } +// Deprecated: use StorageDriverMapper(DefaultSecretsStorageDriver(SecretsStorageDriverOpts)) instead. func DisableStorageOwnerRefInjection(v bool) ActionConfigGetterOption { return func(getter *actionConfigGetter) { getter.disableStorageOwnerRefInjection = v } } +// Deprecated: use ClientRestConfigMapper and StorageRestConfigMapper instead. func RestConfigMapper(f func(context.Context, client.Object, *rest.Config) (*rest.Config, error)) ActionConfigGetterOption { return func(getter *actionConfigGetter) { - getter.objectToRestConfig = f + getter.objectToClientRestConfig = f + getter.objectToStorageRestConfig = f } } @@ -107,21 +141,22 @@ type actionConfigGetter struct { restMapper meta.RESTMapper discoveryClient discovery.CachedDiscoveryInterface - objectToClientNamespace ObjectToStringMapper - objectToStorageNamespace ObjectToStringMapper - objectToRestConfig func(context.Context, client.Object, *rest.Config) (*rest.Config, error) + objectToClientRestConfig ObjectToRestConfigMapper + objectToClientNamespace ObjectToStringMapper + + objectToStorageRestConfig ObjectToRestConfigMapper + objectToStorageDriver ObjectToStorageDriverMapper + + // Deprecated: only keep around for backward compatibility with StorageNamespaceMapper option. + objectToStorageNamespace ObjectToStringMapper + // Deprecated: only keep around for backward compatibility with DisableStorageOwnerRefInjection option. disableStorageOwnerRefInjection bool } func (acg *actionConfigGetter) ActionConfigFor(ctx context.Context, obj client.Object) (*action.Configuration, error) { - storageNs, err := acg.objectToStorageNamespace(obj) + clientRestConfig, err := acg.objectToClientRestConfig(ctx, obj, acg.baseRestConfig) if err != nil { - return nil, fmt.Errorf("get storage namespace for object: %v", err) - } - - restConfig, err := acg.objectToRestConfig(ctx, obj, acg.baseRestConfig) - if err != nil { - return nil, fmt.Errorf("get rest config for object: %v", err) + return nil, fmt.Errorf("get client rest config for object: %v", err) } clientNamespace, err := acg.objectToClientNamespace(obj) @@ -129,36 +164,30 @@ func (acg *actionConfigGetter) ActionConfigFor(ctx context.Context, obj client.O return nil, fmt.Errorf("get client namespace for object: %v", err) } - rcg := newRESTClientGetter(restConfig, acg.restMapper, acg.discoveryClient, clientNamespace) - kc := kube.New(rcg) - kc.Namespace = clientNamespace - - kcs, err := kc.Factory.KubernetesClientSet() - if err != nil { - return nil, fmt.Errorf("create kubernetes clientset: %v", err) - } + clientRCG := newRESTClientGetter(clientRestConfig, acg.restMapper, acg.discoveryClient, clientNamespace) + clientKC := kube.New(clientRCG) + clientKC.Namespace = clientNamespace // Setup the debug log function that Helm will use debugLog := getDebugLogger(ctx) - secretClient := kcs.CoreV1().Secrets(storageNs) - if !acg.disableStorageOwnerRefInjection { - ownerRef := metav1.NewControllerRef(obj, obj.GetObjectKind().GroupVersionKind()) - secretClient = &ownerRefSecretClient{ - SecretInterface: secretClient, - refs: []metav1.OwnerReference{*ownerRef}, - } + storageRestConfig, err := acg.objectToStorageRestConfig(ctx, obj, acg.baseRestConfig) + if err != nil { + return nil, fmt.Errorf("get storage rest config for object: %v", err) + } + + d, err := acg.objectToStorageDriver(ctx, obj, storageRestConfig) + if err != nil { + return nil, fmt.Errorf("get storage driver for object: %v", err) } - d := driver.NewSecrets(secretClient) - d.Log = debugLog // Initialize the storage backend s := storage.Init(d) return &action.Configuration{ - RESTClientGetter: rcg, + RESTClientGetter: clientRCG, Releases: s, - KubeClient: kc, + KubeClient: clientKC, Log: debugLog, }, nil } @@ -173,19 +202,32 @@ func getDebugLogger(ctx context.Context) func(format string, v ...interface{}) { } } -var _ v1.SecretInterface = &ownerRefSecretClient{} - -type ownerRefSecretClient struct { - v1.SecretInterface - refs []metav1.OwnerReference +type SecretsStorageDriverOpts struct { + DisableOwnerRefInjection bool + StorageNamespaceMapper ObjectToStringMapper } -func (c *ownerRefSecretClient) Create(ctx context.Context, in *corev1.Secret, opts metav1.CreateOptions) (*corev1.Secret, error) { - in.OwnerReferences = append(in.OwnerReferences, c.refs...) - return c.SecretInterface.Create(ctx, in, opts) -} +func DefaultSecretsStorageDriver(opts SecretsStorageDriverOpts) ObjectToStorageDriverMapper { + if opts.StorageNamespaceMapper == nil { + opts.StorageNamespaceMapper = getObjectNamespace + } + return func(ctx context.Context, obj client.Object, restConfig *rest.Config) (driver.Driver, error) { + storageNamespace, err := opts.StorageNamespaceMapper(obj) + if err != nil { + return nil, fmt.Errorf("get storage namespace for object: %v", err) + } + secretsInterface, err := v1.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("create secrets client for storage: %v", err) + } -func (c *ownerRefSecretClient) Update(ctx context.Context, in *corev1.Secret, opts metav1.UpdateOptions) (*corev1.Secret, error) { - in.OwnerReferences = append(in.OwnerReferences, c.refs...) - return c.SecretInterface.Update(ctx, in, opts) + secretClient := secretsInterface.Secrets(storageNamespace) + if !opts.DisableOwnerRefInjection { + ownerRef := metav1.NewControllerRef(obj, obj.GetObjectKind().GroupVersionKind()) + secretClient = NewOwnerRefSecretClient(secretClient, []metav1.OwnerReference{*ownerRef}, MatchAllSecrets) + } + d := driver.NewSecrets(secretClient) + d.Log = getDebugLogger(ctx) + return d, nil + } } diff --git a/pkg/client/actionconfig_test.go b/pkg/client/actionconfig_test.go index 6a3ee470..8ca53995 100644 --- a/pkg/client/actionconfig_test.go +++ b/pkg/client/actionconfig_test.go @@ -23,8 +23,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/kube" + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/storage/driver" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -185,6 +188,32 @@ metadata: Expect(err).ToNot(HaveOccurred()) Expect(ac2.RESTClientGetter.ToRESTConfig()).To(WithTransform(func(c *rest.Config) string { return c.BearerToken }, Equal("test2"))) }) + + It("should use a custom storage driver", func() { + storageDriver := driver.NewMemory() + + storageDriverMapper := func(ctx context.Context, obj client.Object, cfg *rest.Config) (driver.Driver, error) { + return storageDriver, nil + } + acg, err := NewActionConfigGetter(cfg, rm, StorageDriverMapper(storageDriverMapper)) + Expect(err).ToNot(HaveOccurred()) + + testObject := func(name string) client.Object { + u := unstructured.Unstructured{} + u.SetName(name) + return &u + } + + ac, err := acg.ActionConfigFor(context.Background(), testObject("test1")) + Expect(err).ToNot(HaveOccurred()) + + expected := &release.Release{Name: "test1", Version: 2, Info: &release.Info{Status: release.StatusDeployed}} + Expect(ac.Releases.Create(expected)).To(Succeed()) + actual, err := storageDriver.List(func(r *release.Release) bool { return true }) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(HaveLen(1)) + Expect(actual[0]).To(Equal(expected)) + }) }) }) diff --git a/pkg/client/ownerrefclient.go b/pkg/client/ownerrefclient.go new file mode 100644 index 00000000..c417cc64 --- /dev/null +++ b/pkg/client/ownerrefclient.go @@ -0,0 +1,65 @@ +package client + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +var _ clientcorev1.SecretInterface = &ownerRefSecretClient{} + +// NewOwnerRefSecretClient returns a SecretInterface that injects the provided owner references +// to all created or updated secrets that match the provided match function. If match is nil, all +// secrets are matched. +func NewOwnerRefSecretClient(client clientcorev1.SecretInterface, refs []metav1.OwnerReference, match func(*corev1.Secret) bool) clientcorev1.SecretInterface { + if match == nil { + match = MatchAllSecrets + } + return &ownerRefSecretClient{ + SecretInterface: client, + match: match, + refs: refs, + } +} + +func MatchAllSecrets(_ *corev1.Secret) bool { + return true +} + +type ownerRefSecretClient struct { + clientcorev1.SecretInterface + match func(secret *corev1.Secret) bool + refs []metav1.OwnerReference +} + +func (c *ownerRefSecretClient) appendMissingOwnerRefs(secret *corev1.Secret) { + hasOwnerRef := func(secret *corev1.Secret, ref metav1.OwnerReference) bool { + for _, r := range secret.OwnerReferences { + if r.UID == ref.UID { + return true + } + } + return false + } + for i := range c.refs { + if !hasOwnerRef(secret, c.refs[i]) { + secret.OwnerReferences = append(secret.OwnerReferences, c.refs[i]) + } + } +} + +func (c *ownerRefSecretClient) Create(ctx context.Context, in *corev1.Secret, opts metav1.CreateOptions) (*corev1.Secret, error) { + if c.match == nil || c.match(in) { + c.appendMissingOwnerRefs(in) + } + return c.SecretInterface.Create(ctx, in, opts) +} + +func (c *ownerRefSecretClient) Update(ctx context.Context, in *corev1.Secret, opts metav1.UpdateOptions) (*corev1.Secret, error) { + if c.match == nil || c.match(in) { + c.appendMissingOwnerRefs(in) + } + return c.SecretInterface.Update(ctx, in, opts) +} diff --git a/pkg/storage/chunked.go b/pkg/storage/chunked.go new file mode 100644 index 00000000..4ebecbec --- /dev/null +++ b/pkg/storage/chunked.go @@ -0,0 +1,416 @@ +package storage + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/base32" + "encoding/json" + "fmt" + "hash" + "hash/fnv" + "io" + "strconv" + "sync" + "time" + + "github.com/pkg/errors" + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/storage/driver" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/utils/ptr" +) + +var _ driver.Driver = (*chunkedSecrets)(nil) + +type ChunkedSecretsConfig struct { + ChunkSize int + MaxReadChunks int + MaxWriteChunks int + Log func(string, ...interface{}) +} + +func NewChunkedSecrets(client clientcorev1.SecretInterface, owner string, config ChunkedSecretsConfig) driver.Driver { + if config.Log == nil { + config.Log = func(string, ...interface{}) {} + } + + return &chunkedSecrets{ + client: client, + owner: owner, + ChunkedSecretsConfig: config, + + hashEncoding: base32.NewEncoding("abcdefghijklmnopqrstuvwxyz123456").WithPadding(base32.NoPadding), + hash: fnv.New64a(), + } +} + +type chunkedSecrets struct { + client clientcorev1.SecretInterface + owner string + ChunkedSecretsConfig + + hashMu sync.Mutex + hash hash.Hash64 + hashEncoding *base32.Encoding +} + +func (c *chunkedSecrets) Create(key string, rls *release.Release) error { + c.Log("create: %q", key) + defer c.Log("created: %q", key) + + chunks, err := c.encodeReleaseAsChunks(key, rls) + if err != nil { + return fmt.Errorf("create: failed to encode release %q: %w", rls.Name, err) + } + + createdAt := time.Now() + indexSecret := c.indexSecretFromChunks(key, rls, chunks) + indexSecret.Labels["createdAt"] = strconv.Itoa(int(createdAt.Unix())) + indexSecret, err = c.client.Create(context.Background(), indexSecret, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + return driver.ErrReleaseExists + } + return fmt.Errorf("create: failed to create index and chunk %d of %d secret %q: %w", 1, len(chunks), key, err) + } + + for i, ch := range chunks[1:] { + chunkSecret := c.chunkSecretFromChunk(indexSecret, ch) + chunkSecret.Labels["createdAt"] = strconv.Itoa(int(createdAt.Unix())) + if _, err := c.client.Create(context.Background(), chunkSecret, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create: failed to create chunk secret %d of %d %q: %w", i+2, len(chunks), ch.name, err) + } + } + return nil +} + +type chunk struct { + name string + data []byte +} + +// encodeRelease encodes a release returning a base64 encoded +// gzipped string representation, or error. +func (c *chunkedSecrets) encodeReleaseAsChunks(key string, rls *release.Release) ([]chunk, error) { + buf := &bytes.Buffer{} + + if err := func() error { + gzw, err := gzip.NewWriterLevel(buf, gzip.BestCompression) + if err != nil { + return err + } + defer gzw.Close() + return json.NewEncoder(gzw).Encode(rls) + }(); err != nil { + return nil, err + } + data := buf.Bytes() + + // Split the encoded release into chunks of chunkSize + // and return the chunks. + var chunks []chunk + for i := 0; i < len(data); i += c.ChunkSize { + end := i + c.ChunkSize + if end > len(data) { + end = len(data) + } + chunks = append(chunks, chunk{ + name: fmt.Sprintf("%s-%s", key, c.hashForData(data[i:end])), + data: data[i:end], + }) + } + + if c.MaxWriteChunks > 0 && len(chunks) > c.MaxWriteChunks { + return nil, fmt.Errorf("release too large: %q requires %d chunks, which exceeds the maximum of %d", rls.Name, len(chunks), c.MaxWriteChunks) + } + + return chunks, nil +} + +const ( + SecretTypeChunkedIndex = corev1.SecretType("operatorframework.io/index.v1") + SecretTypeChunkedChunk = corev1.SecretType("operatorframework.io/chunk.v1") +) + +func (c *chunkedSecrets) indexSecretFromChunks(key string, rls *release.Release, chunks []chunk) *corev1.Secret { + extraChunkNames := make([]string, 0, len(chunks)-1) + for _, ch := range chunks[1:] { + extraChunkNames = append(extraChunkNames, ch.name) + } + extraChunkNamesData, err := json.Marshal(extraChunkNames) + if err != nil { + panic(err) + } + + indexLabels := newIndexLabels(c.owner, key, rls) + indexSecret := &corev1.Secret{ + Type: SecretTypeChunkedIndex, + ObjectMeta: metav1.ObjectMeta{ + Name: key, + Labels: indexLabels, + }, + Immutable: ptr.To(false), + Data: map[string][]byte{ + "extraChunks": extraChunkNamesData, + "chunk": chunks[0].data, + }, + } + return indexSecret +} + +func (c *chunkedSecrets) chunkSecretFromChunk(indexSecret *corev1.Secret, ch chunk) *corev1.Secret { + chunkLabels := newChunkLabels(c.owner, indexSecret.Name) + chunkSecret := &corev1.Secret{ + Type: SecretTypeChunkedChunk, + ObjectMeta: metav1.ObjectMeta{ + Name: ch.name, + Labels: chunkLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Secret", + Name: indexSecret.Name, + UID: indexSecret.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(false), + }, + }, + }, + Immutable: ptr.To(true), + Data: map[string][]byte{ + "chunk": ch.data, + }, + } + return chunkSecret +} + +func (c *chunkedSecrets) getIndex(ctx context.Context, key string) (*corev1.Secret, error) { + indexSecret, err := c.client.Get(ctx, key, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, driver.ErrReleaseNotFound + } + return nil, fmt.Errorf("failed to get secret for key %q: %w", key, err) + } + return indexSecret, nil +} + +func (c *chunkedSecrets) Update(key string, rls *release.Release) error { + c.Log("update: %q", key) + defer c.Log("updated: %q", key) + + // Get the existing index secret to make sure it exists + existingIndex, err := c.getIndex(context.Background(), key) + if err != nil { + return fmt.Errorf("update: %w", err) + } + + // Delete the existing chunk secrets + if err := c.client.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: newListChunksForKeySelector(c.owner, existingIndex.Name).String()}); err != nil { + return fmt.Errorf("update: failed to delete previous chunk secrets for key %q: %w", key, err) + } + + // Generate new chunks + chunks, err := c.encodeReleaseAsChunks(key, rls) + if err != nil { + return fmt.Errorf("create: failed to encode release %q: %w", rls.Name, err) + } + + modifiedAt := time.Now() + + // Update the index secret + updatedIndexSecret := c.indexSecretFromChunks(key, rls, chunks) + updatedIndexSecret.Labels["createdAt"] = existingIndex.Labels["createdAt"] + updatedIndexSecret.Labels["modifiedAt"] = strconv.Itoa(int(modifiedAt.Unix())) + updatedIndexSecret, err = c.client.Update(context.Background(), updatedIndexSecret, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("create: failed to create index and chunk %d of %d secret %q: %w", 1, len(chunks), key, err) + } + + // Create the new chunks + for i, ch := range chunks[1:] { + chunkSecret := c.chunkSecretFromChunk(updatedIndexSecret, ch) + if _, err := c.client.Create(context.Background(), chunkSecret, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create: failed to create chunk secret %d of %d %q: %w", i+2, len(chunks), ch.name, err) + } + } + return nil +} + +func (c *chunkedSecrets) Delete(key string) (*release.Release, error) { + c.Log("delete: %q", key) + defer c.Log("deleted: %q", key) + + indexSecret, rls, err := c.getIndexAndRelease(key) + if err != nil { + if errors.Is(err, driver.ErrReleaseNotFound) { + return nil, driver.ErrReleaseNotFound + } + return nil, fmt.Errorf("delete: %w", err) + } + if err := c.client.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: newListAllForKeySelector(c.owner, key).String()}); err != nil { + return nil, fmt.Errorf("delete: failed to delete index secret %q: %w", indexSecret.Name, err) + } + return rls, nil +} + +func (c *chunkedSecrets) getIndexAndRelease(key string) (*corev1.Secret, *release.Release, error) { + indexSecret, err := c.getIndex(context.Background(), key) + if err != nil { + return nil, nil, err + } + + rls, err := c.decodeRelease(context.Background(), indexSecret) + if err != nil { + return nil, nil, fmt.Errorf("failed to decode release from index secret %q: %w", indexSecret.Name, err) + } + return indexSecret, rls, nil +} + +func (c *chunkedSecrets) Get(key string) (*release.Release, error) { + c.Log("get: %q", key) + defer c.Log("got: %q", key) + + _, rls, err := c.getIndexAndRelease(key) + if err != nil { + return nil, fmt.Errorf("get: %w", err) + } + return rls, nil +} + +func (c *chunkedSecrets) List(filter func(*release.Release) bool) ([]*release.Release, error) { + c.Log("list") + defer c.Log("listed") + + indexSecrets, err := c.client.List(context.Background(), metav1.ListOptions{LabelSelector: newListIndicesLabelSelector(c.owner).String()}) + if err != nil { + return nil, fmt.Errorf("list: %w", err) + } + + var results []*release.Release + for _, indexSecret := range indexSecrets.Items { + indexSecret := indexSecret + rls, err := c.decodeRelease(context.Background(), &indexSecret) + if err != nil { + return nil, fmt.Errorf("list: failed to decode release for key %q: %w", indexSecret.Labels["key"], err) + } + rls.Labels = indexSecret.Labels + if filter(rls) { + results = append(results, rls) + } + } + return results, nil +} + +func (c *chunkedSecrets) Query(queryLabels map[string]string) ([]*release.Release, error) { + for k, v := range queryLabels { + if k == "owner" && v == "helm" { + // Helm hardcodes some queries with owner=helm. We'll translate this + // to use our owner value + queryLabels[k] = c.owner + } + } + c.Log("query: labels=%v", queryLabels) + defer c.Log("queried: labels=%v", queryLabels) + + selector := newListIndicesLabelSelector(c.owner) + if queryRequirements, selectable := labels.Set(queryLabels).AsSelector().Requirements(); selectable { + selector = selector.Add(queryRequirements...) + } + + indexSecrets, err := c.client.List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + + if len(indexSecrets.Items) == 0 { + return nil, driver.ErrReleaseNotFound + } + + results := make([]*release.Release, 0, len(indexSecrets.Items)) + for _, indexSecret := range indexSecrets.Items { + indexSecret := indexSecret + rls, err := c.decodeRelease(context.Background(), &indexSecret) + if err != nil { + return nil, fmt.Errorf("query: failed to decode release: %w", err) + } + results = append(results, rls) + } + return results, nil +} + +func (c *chunkedSecrets) Name() string { + return fmt.Sprintf("%s/chunkedSecrets", c.owner) +} + +func (c *chunkedSecrets) decodeRelease(ctx context.Context, indexSecret *corev1.Secret) (*release.Release, error) { + extraChunkNamesData, ok := indexSecret.Data["extraChunks"] + if !ok { + return nil, fmt.Errorf("index secret %q missing chunks data: %#v", indexSecret.Name, indexSecret) + } + + var extraChunkNames []string + if err := json.Unmarshal(extraChunkNamesData, &extraChunkNames); err != nil { + return nil, fmt.Errorf("failed to parse chunk names from index: %w", err) + } + + if c.MaxReadChunks > 0 && 1+len(extraChunkNames) > c.MaxReadChunks { + return nil, fmt.Errorf("release too large: %q consists of %d chunks, which exceeds the maximum of %d", indexSecret.Name, 1+len(extraChunkNames), c.MaxReadChunks) + } + + pr, pw := io.Pipe() + go func() { + defer pw.Close() + firstChunkData, ok := indexSecret.Data["chunk"] + if !ok { + pw.CloseWithError(fmt.Errorf("index secret %q missing chunk %d data", indexSecret.Name, 1)) + return + } + if _, err := pw.Write(firstChunkData); err != nil { + pw.CloseWithError(fmt.Errorf("failed to write chunk %d data from %q: %w", 1, indexSecret.Name, err)) + return + } + for i, chunkName := range extraChunkNames { + chunkSecret, err := c.client.Get(ctx, chunkName, metav1.GetOptions{}) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to get chunk %d secret %q: %w", i+2, chunkName, err)) + return + } + chunkData, ok := chunkSecret.Data["chunk"] + if !ok { + pw.CloseWithError(fmt.Errorf("chunk %d secret %q missing chunk data", i+2, chunkName)) + return + } + if _, err := pw.Write(chunkData); err != nil { + pw.CloseWithError(fmt.Errorf("failed to write chunk %d data from %q: %w", i+2, chunkName, err)) + return + } + } + }() + + gzr, err := gzip.NewReader(pr) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %w", err) + } + releaseDecoder := json.NewDecoder(gzr) + var r release.Release + if err := releaseDecoder.Decode(&r); err != nil { + return nil, fmt.Errorf("failed to decode release: %w", err) + } + r.Labels = filterSystemLabels(indexSecret.Labels) + return &r, nil +} + +func (c *chunkedSecrets) hashForData(data []byte) string { + c.hashMu.Lock() + defer c.hashMu.Unlock() + + c.hash.Reset() + _, _ = c.hash.Write(data) + return c.hashEncoding.EncodeToString(c.hash.Sum(nil)) +} diff --git a/pkg/storage/chunked_test.go b/pkg/storage/chunked_test.go new file mode 100644 index 00000000..420cf36e --- /dev/null +++ b/pkg/storage/chunked_test.go @@ -0,0 +1,348 @@ +package storage + +import ( + "context" + "encoding/json" + "fmt" + "maps" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/storage/driver" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/utils/ptr" +) + +var _ = Describe("chunkedSecrets", func() { + const chunkSize = 1000 + var ( + secretInterface clientcorev1.SecretInterface + chunkedDriver driver.Driver + ) + + BeforeEach(func() { + secretInterface = clientcorev1.NewForConfigOrDie(cfg).Secrets("default") + chunkedDriver = NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 2, + MaxWriteChunks: 2, + }) + }) + + AfterEach(func() { + Expect(secretInterface.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{})).To(Succeed()) + }) + + var _ = Describe("Create", func() { + It("should create a large release with multiple secrets", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + }) + It("should create a small release with a single secret", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 1) + }) + It("should fail if the release already exists", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + + // Change the status to produce a release with the same key, but different content. + rel.Info.Status = release.StatusDeployed + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(MatchError(driver.ErrReleaseExists)) + }) + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*4) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("Get", func() { + It("should get a large release that is chunked", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + actual, err := chunkedDriver.Get(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + }) + It("should get a small release that is not chunked", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + actual, err := chunkedDriver.Get(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + }) + It("should fail if the release does not exist", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + _, err := chunkedDriver.Get(releaseKey(rel)) + Expect(err).To(MatchError(driver.ErrReleaseNotFound)) + }) + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + _, err := maxReadDriver.Get(releaseKey(rel)) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("Update", func() { + It("should update a single-secret release to a multi-secret release", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 1) + + // Change the status to produce a release with the same key, but different content. + rel = genRelease("test-release", 1, release.StatusDeployed, nil, chunkSize*2) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + }) + It("should update a multi-secret release to a single-secret release", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + + // Change the status to produce a release with the same key, but different content. + rel = genRelease("test-release", 1, release.StatusDeployed, nil, chunkSize/2) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 1) + }) + It("should fail if the release does not exist", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(MatchError(driver.ErrReleaseNotFound)) + }) + + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + verifySecrets(secretInterface, 2) + + // Change the status to produce a release with the same key, but different content. + rel = genRelease("test-release", 1, release.StatusDeployed, nil, chunkSize*4) + Expect(chunkedDriver.Update(releaseKey(rel), rel)).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("Delete", func() { + It("should delete a multi-secret release", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + verifySecrets(secretInterface, 2) + + actual, err := chunkedDriver.Delete(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + verifySecrets(secretInterface, 0) + }) + It("should delete a single-secret release", func() { + expected := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + Expect(chunkedDriver.Create(releaseKey(expected), expected)).To(Succeed()) + verifySecrets(secretInterface, 1) + + actual, err := chunkedDriver.Delete(releaseKey(expected)) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(Equal(expected)) + verifySecrets(secretInterface, 0) + }) + It("should fail if the release does not exist", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize/2) + _, err := chunkedDriver.Delete(releaseKey(rel)) + Expect(err).To(MatchError(driver.ErrReleaseNotFound)) + }) + It("should fail if the release is too large", func() { + rel := genRelease("test-release", 1, release.StatusPendingInstall, nil, chunkSize*2) + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + + _, err := maxReadDriver.Delete(releaseKey(rel)) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + }) + }) + + var _ = Describe("List", func() { + BeforeEach(func() { + releases := []*release.Release{ + genRelease("a", 1, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 2, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 3, release.StatusSuperseded, nil, chunkSize*2), + genRelease("a", 4, release.StatusDeployed, nil, chunkSize*2), + + genRelease("b", 1, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 2, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 3, release.StatusDeployed, nil, chunkSize/2), + } + for _, rel := range releases { + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + } + }) + It("should list releases by name", func() { + aReleases, err := chunkedDriver.List(func(rel *release.Release) bool { + return rel.Name == "a" + }) + Expect(err).ToNot(HaveOccurred()) + Expect(aReleases).To(HaveLen(4)) + }) + + It("should list releases by status", func() { + deployedReleases, err := chunkedDriver.List(func(rel *release.Release) bool { + return rel.Info.Status == release.StatusDeployed + }) + Expect(err).ToNot(HaveOccurred()) + Expect(deployedReleases).To(HaveLen(2)) + }) + + It("should return an empty list if no releases match", func() { + cReleases, err := chunkedDriver.List(func(rel *release.Release) bool { + return rel.Name == "c" + }) + Expect(err).ToNot(HaveOccurred()) + Expect(cReleases).To(BeEmpty()) + }) + + It("should fail if any release is too large", func() { + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + actual, err := maxReadDriver.List(func(rel *release.Release) bool { return true }) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + Expect(actual).To(BeNil()) + }) + }) + + var _ = Describe("Query", func() { + BeforeEach(func() { + releases := []*release.Release{ + genRelease("a", 1, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 2, release.StatusSuperseded, nil, chunkSize/2), + genRelease("a", 3, release.StatusSuperseded, nil, chunkSize*2), + genRelease("a", 4, release.StatusDeployed, nil, chunkSize*2), + + genRelease("b", 1, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 2, release.StatusSuperseded, nil, chunkSize*2), + genRelease("b", 3, release.StatusDeployed, map[string]string{"key1": "val1"}, chunkSize/2), + } + for _, rel := range releases { + Expect(chunkedDriver.Create(releaseKey(rel), rel)).To(Succeed()) + } + }) + + It("should query releases by custom labels", func() { + key1Releases, err := chunkedDriver.Query(map[string]string{"key1": "val1"}) + Expect(err).ToNot(HaveOccurred()) + Expect(key1Releases).To(HaveLen(1)) + }) + + It("should return ErrReleaseNotFound when there is no match", func() { + _, err := chunkedDriver.Query(map[string]string{"nonexistentKey": "nonexistentVal"}) + Expect(err).To(MatchError(driver.ErrReleaseNotFound)) + }) + + It("should succeed if no matched release is too large", func() { + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + actual, err := maxReadDriver.Query(map[string]string{"name": "a", "version": "1"}) + Expect(err).ToNot(HaveOccurred()) + Expect(actual).To(HaveLen(1)) + }) + + It("should fail if any matched release is too large", func() { + maxReadDriver := NewChunkedSecrets(secretInterface, "test-owner", ChunkedSecretsConfig{ + ChunkSize: chunkSize, + MaxReadChunks: 1, + MaxWriteChunks: 2, + }) + actual, err := maxReadDriver.Query(map[string]string{"name": "a"}) + Expect(err).To(MatchError(ContainSubstring("release too large"))) + Expect(actual).To(BeNil()) + }) + + // This test is necessary because the helm storage library hardcodes the owner to "helm". + // We have no way to configure the release storage implementation to use a different owner + // when we set it up as part of the action.Configuration. + It("should translate owner=helm to owner=test-owner", func() { + allReleases, err := chunkedDriver.Query(map[string]string{"owner": "helm"}) + Expect(err).ToNot(HaveOccurred()) + Expect(allReleases).To(HaveLen(7)) + }) + }) +}) + +func verifySecrets(secretInterface clientcorev1.SecretInterface, expected int) { + GinkgoHelper() + + items, err := secretInterface.List(context.Background(), metav1.ListOptions{}) + Expect(err).ToNot(HaveOccurred()) + Expect(items.Items).To(HaveLen(expected)) + + if expected == 0 { + return + } + var indexSecrets, chunkSecrets []corev1.Secret + for _, s := range items.Items { + switch s.Type { + case SecretTypeChunkedIndex: + indexSecrets = append(indexSecrets, s) + case SecretTypeChunkedChunk: + chunkSecrets = append(chunkSecrets, s) + } + } + Expect(indexSecrets).To(HaveLen(1), "expected exactly one index secret") + + Expect(indexSecrets[0].Data).To(HaveKey("extraChunks")) + Expect(indexSecrets[0].Data).To(HaveKey("chunk")) + Expect(indexSecrets[0].Immutable).To(Equal(ptr.To(false))) + + var expectedExtraChunkNames []string + Expect(json.Unmarshal(indexSecrets[0].Data["extraChunks"], &expectedExtraChunkNames)).To(Succeed()) + + Expect(chunkSecrets).To(HaveLen(expected-1), "expected multiple chunk secrets") + actualExtraChunkNames := make([]string, 0, len(chunkSecrets)) + for _, sec := range chunkSecrets { + actualExtraChunkNames = append(actualExtraChunkNames, sec.Name) + Expect(sec.Data).To(HaveKey("chunk")) + Expect(sec.Immutable).To(Equal(ptr.To(true))) + } + + Expect(actualExtraChunkNames).To(ConsistOf(expectedExtraChunkNames)) +} + +func releaseKey(rel *release.Release) string { + return fmt.Sprintf("%s.v%d", rel.Name, rel.Version) +} + +func genRelease(name string, version int, status release.Status, extraLabels map[string]string, minSize int) *release.Release { + lbls := map[string]string{ + "globalKey": "globalValue", + } + maps.Copy(lbls, extraLabels) + return &release.Release{ + Name: name, + Version: version, + Config: map[string]interface{}{ + "takingUpSpace": rand.String(minSize), + }, + Info: &release.Info{Status: status}, + Labels: lbls, + } +} diff --git a/pkg/storage/labels.go b/pkg/storage/labels.go new file mode 100644 index 00000000..1a54c3f7 --- /dev/null +++ b/pkg/storage/labels.go @@ -0,0 +1,60 @@ +package storage + +import ( + "maps" + "strconv" + + "helm.sh/helm/v3/pkg/release" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" +) + +func newIndexLabels(owner, key string, rls *release.Release) map[string]string { + labels := map[string]string{} + maps.Copy(labels, rls.Labels) + labels["name"] = rls.Name + labels["owner"] = owner + labels["status"] = rls.Info.Status.String() + labels["version"] = strconv.Itoa(rls.Version) + labels["key"] = key + labels["type"] = "index" + return labels +} + +func newChunkLabels(owner, key string) map[string]string { + labels := map[string]string{} + labels["owner"] = owner + labels["key"] = key + labels["type"] = "chunk" + return labels +} + +func newListIndicesLabelSelector(owner string) labels.Selector { + return labels.Set{"owner": owner, "type": "index"}.AsSelector() +} + +func newListAllForKeySelector(owner, key string) labels.Selector { + return labels.Set{"owner": owner, "key": key}.AsSelector() +} + +func newListChunksForKeySelector(owner, key string) labels.Selector { + return labels.Set{"owner": owner, "key": key, "type": "chunk"}.AsSelector() +} + +var systemLabels = sets.New[string]("name", "owner", "status", "version", "key", "type", "createdAt", "modifiedAt") + +// Checks if label is system +func isSystemLabel(key string) bool { + return systemLabels.Has(key) +} + +// Removes system labels from labels map +func filterSystemLabels(lbs map[string]string) map[string]string { + result := make(map[string]string) + for k, v := range lbs { + if !isSystemLabel(k) { + result[k] = v + } + } + return result +} diff --git a/pkg/storage/storage_suite_test.go b/pkg/storage/storage_suite_test.go new file mode 100644 index 00000000..a7ec43fc --- /dev/null +++ b/pkg/storage/storage_suite_test.go @@ -0,0 +1,36 @@ +package storage + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestStorage(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Storage Suite") +} + +var ( + testenv *envtest.Environment + cfg *rest.Config +) + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + testenv = &envtest.Environment{} + + var err error + cfg, err = testenv.Start() + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = AfterSuite(func() { + Expect(testenv.Stop()).To(Succeed()) +})