diff --git a/Makefile b/Makefile index c1cf0489..007de6a0 100644 --- a/Makefile +++ b/Makefile @@ -25,3 +25,5 @@ sweep: testacc: TF_ACC=1 go test ./... -v ${TESTARGS} -timeout 120m + +generate: diff --git a/docs/resources/task.md b/docs/resources/task.md index 228c3a21..3b91756a 100644 --- a/docs/resources/task.md +++ b/docs/resources/task.md @@ -63,6 +63,8 @@ resource "iterative_task" "example" { - `parallelism` - (Optional) Number of machines to be launched in parallel. - `storage.workdir` - (Optional) Local working directory to upload and use as the `script` working directory. - `storage.output` - (Optional) Results directory (**relative to `workdir`**) to download (default: no download). +- `storage.container` - (Optional) Pre-allocated container to use for storage of task data, results and status. +- `storage.container_path` - (Optional) Subdirectory in pre-allocated container to use for storage. If omitted, the task's identifier will be used. - `environment` - (Optional) Map of environment variable names and values for the task script. Empty string values are replaced with local environment values. Empty values may also be combined with a [glob]() name to import all matching variables. - `timeout` - (Optional) Maximum number of seconds to run before instances are force-terminated. The countdown is reset each time TPI auto-respawns a spot instance. - `tags` - (Optional) Map of tags for the created cloud resources. diff --git a/go.mod b/go.mod index 346ed0a1..f620a2e8 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/cloudflare/gokey v0.1.0 github.com/docker/go-units v0.4.0 github.com/gobwas/glob v0.2.3 + github.com/golang/mock v1.6.0 // indirect github.com/google/go-github/v42 v42.0.0 github.com/google/go-github/v45 v45.2.0 github.com/google/uuid v1.3.0 diff --git a/go.sum b/go.sum index 5485f601..c6418b95 100644 --- a/go.sum +++ b/go.sum @@ -436,6 +436,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/iterative/resource_task.go b/iterative/resource_task.go index a275f7cf..8cda59c6 100644 --- a/iterative/resource_task.go +++ b/iterative/resource_task.go @@ -138,6 +138,26 @@ func resourceTask() *schema.Resource { Optional: true, Default: "", }, + "container": { + Type: schema.TypeString, + ForceNew: true, + Optional: true, + Default: "", + }, + "container_path": { + Type: schema.TypeString, + ForceNew: true, + Optional: true, + Default: "", + }, + "container_opts": { + Type: schema.TypeMap, + ForceNew: true, + Optional: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, }, }, }, @@ -338,10 +358,33 @@ func resourceTaskBuild(ctx context.Context, d *schema.ResourceData, m interface{ directory := "" directory_out := "" + + var remoteStorage *common.RemoteStorage if d.Get("storage").(*schema.Set).Len() > 0 { storage := d.Get("storage").(*schema.Set).List()[0].(map[string]interface{}) directory = storage["workdir"].(string) directory_out = storage["output"].(string) + + // Propagate configuration for pre-allocated storage container. + container := storage["container"].(string) + containerPath := storage["container_path"].(string) + if container != "" { + remoteStorage = &common.RemoteStorage{ + Container: container, + Path: containerPath, + Config: map[string]string{}, + } + } + if storage["container_opts"] != nil { + remoteConfig := storage["container_opts"].(map[string]interface{}) + var ok bool + for key, value := range remoteConfig { + if remoteStorage.Config[key], ok = value.(string); !ok { + return nil, fmt.Errorf("invalid value for remote config key %q: %v", key, value) + } + } + } + } t := common.Task{ @@ -363,6 +406,7 @@ func resourceTaskBuild(ctx context.Context, d *schema.ResourceData, m interface{ }, // Egress is open on every port }, + RemoteStorage: remoteStorage, Spot: common.Spot(d.Get("spot").(float64)), Parallelism: uint16(d.Get("parallelism").(int)), PermissionSet: d.Get("permission_set").(string), diff --git a/task/aws/client/client.go b/task/aws/client/client.go index 9568942b..a97260cc 100644 --- a/task/aws/client/client.go +++ b/task/aws/client/client.go @@ -33,13 +33,17 @@ func New(ctx context.Context, cloud common.Cloud, tags map[string]string) (*Clie if err != nil { return nil, err } - - c := new(Client) - c.Cloud = cloud - c.Region = region - c.Tags = cloud.Tags - - c.Config = config + credentials, err := config.Credentials.Retrieve(ctx) + if err != nil { + return nil, err + } + c := &Client{ + Cloud: cloud, + Region: region, + Tags: cloud.Tags, + Config: config, + credentials: credentials, + } c.Services.EC2 = ec2.NewFromConfig(config) c.Services.S3 = s3.NewFromConfig(config) @@ -53,8 +57,9 @@ type Client struct { Region string Tags map[string]string - Config aws.Config - Services struct { + Config aws.Config + credentials aws.Credentials + Services struct { EC2 *ec2.Client S3 *s3.Client STS *sts.Client @@ -94,3 +99,8 @@ func (c *Client) DecodeError(ctx context.Context, encoded error) error { return fmt.Errorf("unable to decode: %s", encoded.Error()) } + +// Credentials returns the AWS credentials the client is currently using. +func (c *Client) Credentials() aws.Credentials { + return c.credentials +} diff --git a/task/aws/resources/data_source_bucket.go b/task/aws/resources/data_source_bucket.go new file mode 100644 index 00000000..e522498b --- /dev/null +++ b/task/aws/resources/data_source_bucket.go @@ -0,0 +1,71 @@ +package resources + +import ( + "context" + "fmt" + "path" + + "terraform-provider-iterative/task/common" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// NewExistingS3Bucket returns a new data source refering to a pre-allocated +// S3 bucket. +func NewExistingS3Bucket(client S3Client, credentials aws.Credentials, id string, region string, path string) *ExistingS3Bucket { + return &ExistingS3Bucket{ + client: client, + credentials: credentials, + region: region, + + id: id, + path: path, + } +} + +// ExistingS3Bucket identifies an existing S3 bucket. +type ExistingS3Bucket struct { + client S3Client + credentials aws.Credentials + + id string + region string + path string +} + +// Read verifies the specified S3 bucket is accessible. +func (b *ExistingS3Bucket) Read(ctx context.Context) error { + input := s3.HeadBucketInput{ + Bucket: aws.String(b.id), + } + if _, err := b.client.HeadBucket(ctx, &input); err != nil { + if errorCodeIs(err, errNotFound) { + return common.NotFoundError + } + return err + } + return nil +} + +// ConnectionString implements common.StorageCredentials. +// The method returns the rclone connection string for the specific bucket. +func (b *ExistingS3Bucket) ConnectionString(ctx context.Context) (string, error) { + containerPath := path.Join(b.id, b.path) + connectionString := fmt.Sprintf( + ":s3,provider=AWS,region=%s,access_key_id=%s,secret_access_key=%s,session_token=%s:%s", + b.region, + b.credentials.AccessKeyID, + b.credentials.SecretAccessKey, + b.credentials.SessionToken, + containerPath) + return connectionString, nil +} + +// build-time check to ensure Bucket implements BucketCredentials. +var _ common.StorageCredentials = (*ExistingS3Bucket)(nil) + +// S3Client defines the functions of the AWS S3 API used. +type S3Client interface { + HeadBucket(context.Context, *s3.HeadBucketInput, ...func(*s3.Options)) (*s3.HeadBucketOutput, error) +} diff --git a/task/aws/resources/data_source_bucket_test.go b/task/aws/resources/data_source_bucket_test.go new file mode 100644 index 00000000..c204ecf4 --- /dev/null +++ b/task/aws/resources/data_source_bucket_test.go @@ -0,0 +1,58 @@ +package resources_test + +import ( + "context" + "testing" + + "terraform-provider-iterative/task/aws/resources" + "terraform-provider-iterative/task/aws/resources/mocks" + "terraform-provider-iterative/task/common" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/smithy-go" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +func TestExistingBucketConnectionString(t *testing.T) { + ctx := context.Background() + creds := aws.Credentials{ + AccessKeyID: "access-key-id", + SecretAccessKey: "secret-access-key", + SessionToken: "session-token", + } + b := resources.NewExistingS3Bucket(nil, creds, "pre-created-bucket", "us-east-1", "subdirectory") + connStr, err := b.ConnectionString(ctx) + require.NoError(t, err) + require.Equal(t, connStr, ":s3,provider=AWS,region=us-east-1,access_key_id=access-key-id,secret_access_key=secret-access-key,session_token=session-token:pre-created-bucket/subdirectory") +} + +func TestExistingBucketRead(t *testing.T) { + ctx := context.Background() + ctl := gomock.NewController(t) + defer ctl.Finish() + + s3Cl := mocks.NewMockS3Client(ctl) + s3Cl.EXPECT().HeadBucket(gomock.Any(), &s3.HeadBucketInput{Bucket: aws.String("bucket-id")}).Return(nil, nil) + b := resources.NewExistingS3Bucket(s3Cl, aws.Credentials{}, "bucket-id", "us-east-1", "subdirectory") + err := b.Read(ctx) + require.NoError(t, err) +} + +// TestExistingBucketReadNotFound tests the case where the s3 client indicates that the bucket could not be +// found. +func TestExistingBucketReadNotFound(t *testing.T) { + ctx := context.Background() + ctl := gomock.NewController(t) + defer ctl.Finish() + + s3Cl := mocks.NewMockS3Client(ctl) + + s3Cl.EXPECT(). + HeadBucket(gomock.Any(), &s3.HeadBucketInput{Bucket: aws.String("bucket-id")}). + Return(nil, &smithy.GenericAPIError{Code: "NotFound"}) + b := resources.NewExistingS3Bucket(s3Cl, aws.Credentials{}, "bucket-id", "us-east-1", "subdirectory") + err := b.Read(ctx) + require.ErrorIs(t, err, common.NotFoundError) +} diff --git a/task/aws/resources/data_source_credentials.go b/task/aws/resources/data_source_credentials.go index 46585d60..a19dca86 100644 --- a/task/aws/resources/data_source_credentials.go +++ b/task/aws/resources/data_source_credentials.go @@ -2,13 +2,12 @@ package resources import ( "context" - "fmt" "terraform-provider-iterative/task/aws/client" "terraform-provider-iterative/task/common" ) -func NewCredentials(client *client.Client, identifier common.Identifier, bucket *Bucket) *Credentials { +func NewCredentials(client *client.Client, identifier common.Identifier, bucket common.StorageCredentials) *Credentials { c := new(Credentials) c.Client = client c.Identifier = identifier.Long() @@ -20,7 +19,7 @@ type Credentials struct { Client *client.Client Identifier string Dependencies struct { - *Bucket + Bucket common.StorageCredentials } Resource map[string]string } @@ -31,20 +30,16 @@ func (c *Credentials) Read(ctx context.Context) error { return err } - connectionString := fmt.Sprintf( - ":s3,provider=AWS,region=%s,access_key_id=%s,secret_access_key=%s,session_token=%s:%s", - c.Client.Region, - credentials.AccessKeyID, - credentials.SecretAccessKey, - credentials.SessionToken, - c.Dependencies.Bucket.Identifier, - ) + bucketConnStr, err := c.Dependencies.Bucket.ConnectionString(ctx) + if err != nil { + return err + } c.Resource = map[string]string{ "AWS_ACCESS_KEY_ID": credentials.AccessKeyID, "AWS_SECRET_ACCESS_KEY": credentials.SecretAccessKey, "AWS_SESSION_TOKEN": credentials.SessionToken, - "RCLONE_REMOTE": connectionString, + "RCLONE_REMOTE": bucketConnStr, "TPI_TASK_CLOUD_PROVIDER": string(c.Client.Cloud.Provider), "TPI_TASK_CLOUD_REGION": c.Client.Region, "TPI_TASK_IDENTIFIER": c.Identifier, diff --git a/task/aws/resources/mocks/gen.go b/task/aws/resources/mocks/gen.go new file mode 100644 index 00000000..76d7588c --- /dev/null +++ b/task/aws/resources/mocks/gen.go @@ -0,0 +1,5 @@ +package mocks + +// This file includes go:generate statements for regenerating mocks. + +//go:generate mockgen -destination s3client_generated.go -package mocks .. S3Client diff --git a/task/aws/resources/mocks/s3client_generated.go b/task/aws/resources/mocks/s3client_generated.go new file mode 100644 index 00000000..2663afef --- /dev/null +++ b/task/aws/resources/mocks/s3client_generated.go @@ -0,0 +1,56 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: terraform-provider-iterative/task/aws/resources (interfaces: S3Client) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + s3 "github.com/aws/aws-sdk-go-v2/service/s3" + gomock "github.com/golang/mock/gomock" +) + +// MockS3Client is a mock of S3Client interface. +type MockS3Client struct { + ctrl *gomock.Controller + recorder *MockS3ClientMockRecorder +} + +// MockS3ClientMockRecorder is the mock recorder for MockS3Client. +type MockS3ClientMockRecorder struct { + mock *MockS3Client +} + +// NewMockS3Client creates a new mock instance. +func NewMockS3Client(ctrl *gomock.Controller) *MockS3Client { + mock := &MockS3Client{ctrl: ctrl} + mock.recorder = &MockS3ClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockS3Client) EXPECT() *MockS3ClientMockRecorder { + return m.recorder +} + +// HeadBucket mocks base method. +func (m *MockS3Client) HeadBucket(arg0 context.Context, arg1 *s3.HeadBucketInput, arg2 ...func(*s3.Options)) (*s3.HeadBucketOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "HeadBucket", varargs...) + ret0, _ := ret[0].(*s3.HeadBucketOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HeadBucket indicates an expected call of HeadBucket. +func (mr *MockS3ClientMockRecorder) HeadBucket(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadBucket", reflect.TypeOf((*MockS3Client)(nil).HeadBucket), varargs...) +} diff --git a/task/aws/resources/resource_auto_scaling_group.go b/task/aws/resources/resource_auto_scaling_group.go index f47cfc67..a888062c 100644 --- a/task/aws/resources/resource_auto_scaling_group.go +++ b/task/aws/resources/resource_auto_scaling_group.go @@ -8,13 +8,11 @@ import ( "strings" "time" - "github.com/aws/smithy-go" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/autoscaling" "github.com/aws/aws-sdk-go-v2/service/autoscaling/types" "github.com/aws/aws-sdk-go-v2/service/ec2" - + "github.com/aws/smithy-go" "github.com/sirupsen/logrus" "terraform-provider-iterative/task/aws/client" diff --git a/task/aws/resources/resource_bucket.go b/task/aws/resources/resource_bucket.go index f2626bf0..a12f41d9 100644 --- a/task/aws/resources/resource_bucket.go +++ b/task/aws/resources/resource_bucket.go @@ -3,12 +3,12 @@ package resources import ( "context" "errors" - - "github.com/aws/smithy-go" + "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" "terraform-provider-iterative/task/aws/client" "terraform-provider-iterative/task/common" @@ -153,6 +153,24 @@ func (b *Bucket) Delete(ctx context.Context) error { return nil } +// ConnectionString implements BucketCredentials. +// The method returns the rclone connection string for the specific bucket. +func (b *Bucket) ConnectionString(ctx context.Context) (string, error) { + credentials, err := b.Client.Config.Credentials.Retrieve(ctx) + if err != nil { + return "", err + } + + connectionString := fmt.Sprintf( + ":s3,provider=AWS,region=%s,access_key_id=%s,secret_access_key=%s,session_token=%s:%s", + b.Client.Region, + credentials.AccessKeyID, + credentials.SecretAccessKey, + credentials.SessionToken, + b.Identifier) + return connectionString, nil +} + // errorCodeIs checks if the provided error is an AWS API error // and its error code matches the supplied value. func errorCodeIs(err error, code string) bool { @@ -162,3 +180,6 @@ func errorCodeIs(err error, code string) bool { } return false } + +// build-time check to ensure Bucket implements BucketCredentials. +var _ common.StorageCredentials = (*Bucket)(nil) diff --git a/task/aws/resources/resource_key_pair.go b/task/aws/resources/resource_key_pair.go index 879fff8c..1aed18f2 100644 --- a/task/aws/resources/resource_key_pair.go +++ b/task/aws/resources/resource_key_pair.go @@ -5,11 +5,10 @@ import ( "errors" "strings" - "github.com/aws/smithy-go" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/aws/smithy-go" "terraform-provider-iterative/task/aws/client" "terraform-provider-iterative/task/common" diff --git a/task/aws/resources/resource_launch_template.go b/task/aws/resources/resource_launch_template.go index 7785a0e1..37aa0535 100644 --- a/task/aws/resources/resource_launch_template.go +++ b/task/aws/resources/resource_launch_template.go @@ -7,11 +7,10 @@ import ( "fmt" "time" - "github.com/aws/smithy-go" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/aws/smithy-go" "terraform-provider-iterative/task/aws/client" "terraform-provider-iterative/task/common" diff --git a/task/aws/task.go b/task/aws/task.go index e312692d..0b030725 100644 --- a/task/aws/task.go +++ b/task/aws/task.go @@ -2,6 +2,7 @@ package aws import ( "context" + "errors" "net" "github.com/sirupsen/logrus" @@ -13,6 +14,8 @@ import ( "terraform-provider-iterative/task/common/ssh" ) +const s3_region = "s3_region" + func List(ctx context.Context, cloud common.Cloud) ([]common.Identifier, error) { client, err := client.New(ctx, cloud, nil) if err != nil { @@ -47,14 +50,39 @@ func New(ctx context.Context, cloud common.Cloud, identifier common.Identifier, t.Client, t.Attributes.PermissionSet, ) - t.Resources.Bucket = resources.NewBucket( - t.Client, - t.Identifier, - ) + var bucketCredentials common.StorageCredentials + if task.RemoteStorage != nil { + containerPath := task.RemoteStorage.Path + // If a subdirectory was not specified, the task id will + // be used. + if containerPath == "" { + containerPath = string(t.Identifier) + } + // Container config may override the s3 region. + region, ok := task.RemoteStorage.Config[s3_region] + if !ok { + region = t.Client.Region + } + bucket := resources.NewExistingS3Bucket( + t.Client.Services.S3, + t.Client.Credentials(), + task.RemoteStorage.Container, + region, + containerPath) + t.DataSources.Bucket = bucket + bucketCredentials = bucket + } else { + bucket := resources.NewBucket( + t.Client, + t.Identifier, + ) + t.Resources.Bucket = bucket + bucketCredentials = bucket + } t.DataSources.Credentials = resources.NewCredentials( t.Client, t.Identifier, - t.Resources.Bucket, + bucketCredentials, ) t.Resources.SecurityGroup = resources.NewSecurityGroup( t.Client, @@ -87,23 +115,25 @@ func New(ctx context.Context, cloud common.Cloud, identifier common.Identifier, return t, nil } +// Task represents a task running in aws with all its dependent resources. type Task struct { Client *client.Client Identifier common.Identifier Attributes common.Task DataSources struct { - *resources.DefaultVPC - *resources.DefaultVPCSubnets - *resources.Image - *resources.Credentials - *resources.PermissionSet + DefaultVPC *resources.DefaultVPC + DefaultVPCSubnets *resources.DefaultVPCSubnets + Image *resources.Image + Credentials *resources.Credentials + PermissionSet *resources.PermissionSet + Bucket *resources.ExistingS3Bucket } Resources struct { - *resources.Bucket - *resources.SecurityGroup - *resources.KeyPair - *resources.LaunchTemplate - *resources.AutoScalingGroup + Bucket *resources.Bucket + SecurityGroup *resources.SecurityGroup + KeyPair *resources.KeyPair + LaunchTemplate *resources.LaunchTemplate + AutoScalingGroup *resources.AutoScalingGroup } } @@ -121,10 +151,19 @@ func (t *Task) Create(ctx context.Context) error { }, { Description: "Reading Image...", Action: t.DataSources.Image.Read, - }, { - Description: "Creating Bucket...", - Action: t.Resources.Bucket.Create, - }, { + }} + if t.Resources.Bucket != nil { + steps = append(steps, common.Step{ + Description: "Creating Bucket...", + Action: t.Resources.Bucket.Create, + }) + } else if t.DataSources.Bucket != nil { + steps = append(steps, common.Step{ + Description: "Verifying bucket...", + Action: t.DataSources.Bucket.Read, + }) + } + steps = append(steps, []common.Step{{ Description: "Creating SecurityGroup...", Action: t.Resources.SecurityGroup.Create, }, { @@ -139,7 +178,7 @@ func (t *Task) Create(ctx context.Context) error { }, { Description: "Creating AutoScalingGroup...", Action: t.Resources.AutoScalingGroup.Create, - }} + }}...) if t.Attributes.Environment.Directory != "" { steps = append(steps, common.Step{ @@ -176,7 +215,14 @@ func (t *Task) Read(ctx context.Context) error { Action: t.DataSources.Image.Read, }, { Description: "Reading Bucket...", - Action: t.Resources.Bucket.Read, + Action: func(ctx context.Context) error { + if t.Resources.Bucket != nil { + return t.Resources.Bucket.Read(ctx) + } else if t.DataSources.Bucket != nil { + return t.DataSources.Bucket.Read(ctx) + } + return errors.New("storage misconfigured") + }, }, { Description: "Reading SecurityGroup...", Action: t.Resources.SecurityGroup.Read, @@ -218,15 +264,17 @@ func (t *Task) Delete(ctx context.Context) error { return nil }}} } - steps = append(steps, common.Step{ - Description: "Emptying Bucket...", - Action: func(ctx context.Context) error { - err := machine.Delete(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"]) - if err != nil && err != common.NotFoundError { - return err - } - return nil - }}) + if t.Resources.Bucket != nil { + steps = append(steps, common.Step{ + Description: "Emptying Bucket...", + Action: func(ctx context.Context) error { + err := machine.Delete(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"]) + if err != nil && err != common.NotFoundError { + return err + } + return nil + }}) + } } steps = append(steps, []common.Step{{ Description: "Deleting AutoScalingGroup...", @@ -243,10 +291,13 @@ func (t *Task) Delete(ctx context.Context) error { }, { Description: "Reading Credentials...", Action: t.DataSources.Credentials.Read, - }, { - Description: "Deleting Bucket...", - Action: t.Resources.Bucket.Delete, }}...) + if t.Resources.Bucket != nil { + steps = append(steps, common.Step{ + Description: "Deleting Bucket...", + Action: t.Resources.Bucket.Delete, + }) + } if err := common.RunSteps(ctx, steps); err != nil { return err } diff --git a/task/common/machine/storage.go b/task/common/machine/storage.go index c2b5dc2f..1ccc0f99 100644 --- a/task/common/machine/storage.go +++ b/task/common/machine/storage.go @@ -49,7 +49,7 @@ func Reports(ctx context.Context, remote, prefix string) ([]string, error) { return nil, err } - entries, err := remoteFileSystem.List(ctx, "/reports") + entries, err := remoteFileSystem.List(ctx, "reports") if err != nil { return nil, err } diff --git a/task/common/resource.go b/task/common/resource.go new file mode 100644 index 00000000..4aaa868c --- /dev/null +++ b/task/common/resource.go @@ -0,0 +1,20 @@ +package common + +import ( + "context" +) + +// Resource defines the interface implemented by deployment resources. +type Resource interface { + Read(ctx context.Context) error + Create(ctx context.Context) error + Delete(ctx context.Context) error +} + +// StorageCredentials is an interface implemented by data sources and resources +// that provide access to cloud storage buckets. +type StorageCredentials interface { + // ConnectionString returns the connection string necessary to access + // an S3 bucket. + ConnectionString(ctx context.Context) (string, error) +} diff --git a/task/common/values.go b/task/common/values.go index 5006c231..0b0f2b96 100644 --- a/task/common/values.go +++ b/task/common/values.go @@ -41,7 +41,15 @@ type Event struct { Code string Description []string } -type Storage struct{} +type RemoteStorage struct { + // Container stores the id of the container to be used. + Container string + // Path stores the subdirectory inside the container. + Path string + // Config stores provider-specific configuration keys for accessing the pre-allocated + // storage container. + Config map[string]string +} type Task struct { Size Size @@ -52,6 +60,8 @@ type Task struct { Parallelism uint16 Tags map[string]string // Deprecated + RemoteStorage *RemoteStorage + Addresses []net.IP Status Status Events []Event diff --git a/task/gcp/resources/data_source_bucket.go b/task/gcp/resources/data_source_bucket.go new file mode 100644 index 00000000..bcdbbcb0 --- /dev/null +++ b/task/gcp/resources/data_source_bucket.go @@ -0,0 +1,67 @@ +package resources + +import ( + "context" + "errors" + "fmt" + "path" + + "google.golang.org/api/googleapi" + "google.golang.org/api/storage/v1" + + "terraform-provider-iterative/task/common" + "terraform-provider-iterative/task/gcp/client" +) + +// NewExistingBucket creates a new data source referring to a pre-allocated GCP storage bucket. +func NewExistingBucket(client *client.Client, id string, path string) *ExistingBucket { + return &ExistingBucket{ + client: client, + + id: id, + path: path, + } +} + +// ExistingBucket identifies a pre-allocated storage bucket. +type ExistingBucket struct { + client *client.Client + + resource *storage.Bucket + id string + path string +} + +// Read verifies the specified storage bucket exists and is accessible. +func (b *ExistingBucket) Read(ctx context.Context) error { + bucket, err := b.client.Services.Storage.Buckets.Get(b.id).Do() + if err != nil { + var e *googleapi.Error + if errors.As(err, &e) && e.Code == 404 { + return common.NotFoundError + } + return err + } + + b.resource = bucket + return nil +} + +// ConnectionString implements common.StorageCredentials. +// The method returns the rclone connection string for the specific bucket. +func (b *ExistingBucket) ConnectionString(ctx context.Context) (string, error) { + if len(b.client.Credentials.JSON) == 0 { + return "", errors.New("unable to find credentials JSON string") + } + credentials := string(b.client.Credentials.JSON) + containerPath := path.Join(b.id, b.path) + connStr := fmt.Sprintf( + ":googlecloudstorage,service_account_credentials='%s':%s", + credentials, + containerPath, + ) + + return connStr, nil +} + +var _ common.StorageCredentials = (*ExistingBucket)(nil) diff --git a/task/gcp/resources/data_source_credentials.go b/task/gcp/resources/data_source_credentials.go index 82558ec3..84343fc8 100644 --- a/task/gcp/resources/data_source_credentials.go +++ b/task/gcp/resources/data_source_credentials.go @@ -3,13 +3,12 @@ package resources import ( "context" "errors" - "fmt" "terraform-provider-iterative/task/common" "terraform-provider-iterative/task/gcp/client" ) -func NewCredentials(client *client.Client, identifier common.Identifier, bucket *Bucket) *Credentials { +func NewCredentials(client *client.Client, identifier common.Identifier, bucket common.StorageCredentials) *Credentials { c := new(Credentials) c.Client = client c.Identifier = identifier.Long() @@ -21,7 +20,7 @@ type Credentials struct { Client *client.Client Identifier string Dependencies struct { - *Bucket + Bucket common.StorageCredentials } Resource map[string]string } @@ -32,12 +31,10 @@ func (c *Credentials) Read(ctx context.Context) error { } credentials := string(c.Client.Credentials.JSON) - connectionString := fmt.Sprintf( - ":googlecloudstorage,service_account_credentials='%s':%s", - credentials, - c.Dependencies.Bucket.Identifier, - ) - + connectionString, err := c.Dependencies.Bucket.ConnectionString(ctx) + if err != nil { + return err + } c.Resource = map[string]string{ "GOOGLE_APPLICATION_CREDENTIALS_DATA": credentials, "RCLONE_REMOTE": connectionString, diff --git a/task/gcp/resources/resource_bucket.go b/task/gcp/resources/resource_bucket.go index b10657d4..12671411 100644 --- a/task/gcp/resources/resource_bucket.go +++ b/task/gcp/resources/resource_bucket.go @@ -3,6 +3,7 @@ package resources import ( "context" "errors" + "fmt" "google.golang.org/api/googleapi" "google.golang.org/api/storage/v1" @@ -32,22 +33,24 @@ func ListBuckets(ctx context.Context, client *client.Client) ([]common.Identifie func NewBucket(client *client.Client, identifier common.Identifier) *Bucket { b := new(Bucket) - b.Client = client + b.client = client b.Identifier = identifier.Long() return b } +// Bucket is a resource refering to an allocated gcp storage bucket. type Bucket struct { - Client *client.Client + client *client.Client Identifier string Resource *storage.Bucket } +// Create creates a new gcp storage bucket. func (b *Bucket) Create(ctx context.Context) error { - bucket, err := b.Client.Services.Storage.Buckets.Insert(b.Client.Credentials.ProjectID, &storage.Bucket{ + bucket, err := b.client.Services.Storage.Buckets.Insert(b.client.Credentials.ProjectID, &storage.Bucket{ Name: b.Identifier, - Location: b.Client.Region[:len(b.Client.Region)-2], // remove zone suffix (e.g. `{region}-a` -> `{region}`) - Labels: b.Client.Tags, + Location: b.client.Region[:len(b.client.Region)-2], // remove zone suffix (e.g. `{region}-a` -> `{region}`) + Labels: b.client.Tags, }).Do() if err != nil { var e *googleapi.Error @@ -61,8 +64,9 @@ func (b *Bucket) Create(ctx context.Context) error { return nil } +// Read verifies an existing gcp storage bucket. func (b *Bucket) Read(ctx context.Context) error { - bucket, err := b.Client.Services.Storage.Buckets.Get(b.Identifier).Do() + bucket, err := b.client.Services.Storage.Buckets.Get(b.Identifier).Do() if err != nil { var e *googleapi.Error if errors.As(err, &e) && e.Code == 404 { @@ -75,10 +79,14 @@ func (b *Bucket) Read(ctx context.Context) error { return nil } +// Update implements resource.Update. +// The operation is not implemented for storage buckets. func (b *Bucket) Update(ctx context.Context) error { return common.NotImplementedError } +// Delete deletes all objects stored in the bucket and destroys +// the storage bucket itself. func (b *Bucket) Delete(ctx context.Context) error { if b.Read(ctx) == common.NotFoundError { return nil @@ -86,21 +94,38 @@ func (b *Bucket) Delete(ctx context.Context) error { deletePage := func(objects *storage.Objects) error { for _, object := range objects.Items { - if err := b.Client.Services.Storage.Objects.Delete(b.Identifier, object.Name).Do(); err != nil { + if err := b.client.Services.Storage.Objects.Delete(b.Identifier, object.Name).Do(); err != nil { return err } } return nil } - if err := b.Client.Services.Storage.Objects.List(b.Identifier).Pages(ctx, deletePage); err != nil { + if err := b.client.Services.Storage.Objects.List(b.Identifier).Pages(ctx, deletePage); err != nil { return err } - if err := b.Client.Services.Storage.Buckets.Delete(b.Identifier).Do(); err != nil { + if err := b.client.Services.Storage.Buckets.Delete(b.Identifier).Do(); err != nil { return err } b.Resource = nil return nil } + +// ConnectionString implements common.StorageCredentials. +// The method returns the rclone connection string for the specific bucket. +func (b *Bucket) ConnectionString(ctx context.Context) (string, error) { + if len(b.client.Credentials.JSON) == 0 { + return "", errors.New("unable to find credentials JSON string") + } + credentials := string(b.client.Credentials.JSON) + + connStr := fmt.Sprintf( + ":googlecloudstorage,service_account_credentials='%s':%s", + credentials, + b.Identifier, + ) + + return connStr, nil +} diff --git a/task/gcp/task.go b/task/gcp/task.go index d91e44b9..cdab511f 100644 --- a/task/gcp/task.go +++ b/task/gcp/task.go @@ -2,6 +2,7 @@ package gcp import ( "context" + "errors" "net" "github.com/sirupsen/logrus" @@ -36,14 +37,32 @@ func New(ctx context.Context, cloud common.Cloud, identifier common.Identifier, t.Client, t.Attributes.PermissionSet, ) - t.Resources.Bucket = resources.NewBucket( - t.Client, - t.Identifier, - ) + var bucketCredentials common.StorageCredentials + if task.RemoteStorage != nil { + containerPath := task.RemoteStorage.Path + // If a subdirectory was not specified, the task id will + // be used. + if containerPath == "" { + containerPath = string(t.Identifier) + } + bucket := resources.NewExistingBucket( + t.Client, + task.RemoteStorage.Container, + containerPath) + t.DataSources.Bucket = bucket + bucketCredentials = bucket + } else { + bucket := resources.NewBucket( + t.Client, + t.Identifier, + ) + t.Resources.Bucket = bucket + bucketCredentials = bucket + } t.DataSources.Credentials = resources.NewCredentials( t.Client, t.Identifier, - t.Resources.Bucket, + bucketCredentials, ) t.DataSources.DefaultNetwork = resources.NewDefaultNetwork( t.Client, @@ -137,21 +156,22 @@ type Task struct { Identifier common.Identifier Attributes common.Task DataSources struct { - *resources.DefaultNetwork - *resources.Credentials - *resources.Image - *resources.PermissionSet + DefaultNetwork *resources.DefaultNetwork + Credentials *resources.Credentials + Image *resources.Image + PermissionSet *resources.PermissionSet + Bucket *resources.ExistingBucket } Resources struct { - *resources.Bucket + Bucket *resources.Bucket FirewallInternalIngress *resources.FirewallRule FirewallInternalEgress *resources.FirewallRule FirewallExternalIngress *resources.FirewallRule FirewallExternalEgress *resources.FirewallRule FirewallDenyIngress *resources.FirewallRule FirewallDenyEgress *resources.FirewallRule - *resources.InstanceTemplate - *resources.InstanceGroupManager + InstanceTemplate *resources.InstanceTemplate + InstanceGroupManager *resources.InstanceGroupManager } } @@ -166,10 +186,19 @@ func (t *Task) Create(ctx context.Context) error { }, { Description: "Reading Image...", Action: t.DataSources.Image.Read, - }, { - Description: "Creating Bucket...", - Action: t.Resources.Bucket.Create, - }, { + }} + if t.Resources.Bucket != nil { + steps = append(steps, common.Step{ + Description: "Creating Bucket...", + Action: t.Resources.Bucket.Create, + }) + } else if t.DataSources.Bucket != nil { + steps = append(steps, common.Step{ + Description: "Verifying bucket...", + Action: t.DataSources.Bucket.Read, + }) + } + steps = append(steps, []common.Step{{ Description: "Reading Credentials...", Action: t.DataSources.Credentials.Read, }, { @@ -196,7 +225,7 @@ func (t *Task) Create(ctx context.Context) error { }, { Description: "Creating InstanceGroupManager...", Action: t.Resources.InstanceGroupManager.Create, - }} + }}...) if t.Attributes.Environment.Directory != "" { steps = append(steps, common.Step{ @@ -229,7 +258,14 @@ func (t *Task) Read(ctx context.Context) error { Action: t.DataSources.Image.Read, }, { Description: "Reading Bucket...", - Action: t.Resources.Bucket.Read, + Action: func(ctx context.Context) error { + if t.Resources.Bucket != nil { + return t.Resources.Bucket.Read(ctx) + } else if t.DataSources.Bucket != nil { + return t.DataSources.Bucket.Read(ctx) + } + return errors.New("storage misconfigured") + }, }, { Description: "Reading Credentials...", Action: t.DataSources.Credentials.Read, @@ -318,10 +354,13 @@ func (t *Task) Delete(ctx context.Context) error { }, { Description: "Deleting FirewallDenyIngress...", Action: t.Resources.FirewallDenyIngress.Delete, - }, { - Description: "Deleting Bucket...", - Action: t.Resources.Bucket.Delete, }}...) + if t.Resources.Bucket != nil { + steps = append(steps, common.Step{ + Description: "Deleting Bucket...", + Action: t.Resources.Bucket.Delete, + }) + } if err := common.RunSteps(ctx, steps); err != nil { return err } diff --git a/task/task.go b/task/task.go index 7b51b9da..46b848c8 100644 --- a/task/task.go +++ b/task/task.go @@ -49,11 +49,9 @@ func New(ctx context.Context, cloud common.Cloud, identifier common.Identifier, } } +// Task defines the interface implemented by provider-specific task resources. type Task interface { - Read(ctx context.Context) error - - Create(ctx context.Context) error - Delete(ctx context.Context) error + common.Resource Start(ctx context.Context) error Stop(ctx context.Context) error