diff --git a/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors.go b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors.go new file mode 100644 index 00000000000..e455fa313db --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors.go @@ -0,0 +1,19 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package errors + +import "errors" + +var ErrInsufficientDiskSpace = errors.New("insufficient disk space") + +func IsDiskSpaceError(err error) bool { + for _, osErr := range OS_DiskSpaceErrors { + if errors.Is(err, osErr) { + return true + } + } + + return false +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_other.go b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_other.go new file mode 100644 index 00000000000..0fb5068aa7a --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_other.go @@ -0,0 +1,14 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build !windows + +package errors + +import "syscall" + +var OS_DiskSpaceErrors = []error{ + syscall.ENOSPC, + syscall.EDQUOT, +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_test.go b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_test.go new file mode 100644 index 00000000000..a5b5003c713 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_test.go @@ -0,0 +1,34 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package errors + +import ( + goerrors "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + agentErrors "github.com/elastic/elastic-agent/internal/pkg/agent/errors" +) + +func TestIsDiskSpaceError(t *testing.T) { + for _, err := range OS_DiskSpaceErrors { + testCases := map[string]struct { + err error + want bool + }{ + "os_error": {err: err, want: true}, + "wrapped_os_error": {err: fmt.Errorf("wrapped: %w", err), want: true}, + "joined_error": {err: goerrors.Join(err, goerrors.New("test")), want: true}, + "new_error": {err: agentErrors.New(err, fmt.Errorf("test")), want: false}, + } + for name, tc := range testCases { + t.Run(fmt.Sprintf("%s_%s", err.Error(), name), func(t *testing.T) { + require.Equal(t, tc.want, IsDiskSpaceError(tc.err)) + }) + } + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_windows.go b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_windows.go new file mode 100644 index 00000000000..e1a90ffa587 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/errors/disk_space_errors_windows.go @@ -0,0 +1,14 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build windows + +package errors + +import "golang.org/x/sys/windows" + +var OS_DiskSpaceErrors = []error{ + windows.ERROR_DISK_FULL, + windows.ERROR_HANDLE_DISK_FULL, +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go index 771a94714fd..6adf183737a 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go @@ -6,6 +6,7 @@ package fs import ( "context" + goerrors "errors" "fmt" "io" "os" @@ -27,6 +28,10 @@ const ( type Downloader struct { dropPath string config *artifact.Config + // The following are abstractions for stdlib functions so that we can mock them in tests. + copy func(dst io.Writer, src io.Reader) (int64, error) + mkdirAll func(name string, perm os.FileMode) error + openFile func(name string, flag int, perm os.FileMode) (*os.File, error) } // NewDownloader creates and configures Elastic Downloader @@ -34,6 +39,9 @@ func NewDownloader(config *artifact.Config) *Downloader { return &Downloader{ config: config, dropPath: getDropPath(config), + copy: io.Copy, + mkdirAll: os.MkdirAll, + openFile: os.OpenFile, } } @@ -108,18 +116,18 @@ func (e *Downloader) downloadFile(filename, fullPath string) (string, error) { defer sourceFile.Close() if destinationDir := filepath.Dir(fullPath); destinationDir != "" && destinationDir != "." { - if err := os.MkdirAll(destinationDir, 0755); err != nil { + if err := e.mkdirAll(destinationDir, 0755); err != nil { return "", err } } - destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions) + destinationFile, err := e.openFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions) if err != nil { - return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath)) + return "", goerrors.Join(errors.New("creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath)), err) } defer destinationFile.Close() - _, err = io.Copy(destinationFile, sourceFile) + _, err = e.copy(destinationFile, sourceFile) if err != nil { return "", err } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader_test.go b/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader_test.go index 67d2b3079d7..865bf1863e5 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader_test.go @@ -7,6 +7,7 @@ package fs import ( "context" "fmt" + "io" "os" "path/filepath" "testing" @@ -14,7 +15,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" agtversion "github.com/elastic/elastic-agent/pkg/version" ) @@ -161,6 +164,9 @@ func TestDownloader_Download(t *testing.T) { e := &Downloader{ dropPath: dropPath, config: config, + copy: io.Copy, + mkdirAll: os.MkdirAll, + openFile: os.OpenFile, } got, err := e.Download(context.TODO(), tt.args.a, tt.args.version) if !tt.wantErr(t, err, fmt.Sprintf("Download(%v, %v)", tt.args.a, tt.args.version)) { @@ -282,6 +288,9 @@ func TestDownloader_DownloadAsc(t *testing.T) { e := &Downloader{ dropPath: dropPath, config: config, + copy: io.Copy, + mkdirAll: os.MkdirAll, + openFile: os.OpenFile, } got, err := e.DownloadAsc(context.TODO(), tt.args.a, tt.args.version) if !tt.wantErr(t, err, fmt.Sprintf("DownloadAsc(%v, %v)", tt.args.a, tt.args.version)) { @@ -291,3 +300,76 @@ func TestDownloader_DownloadAsc(t *testing.T) { }) } } + +func TestDownloadDiskSpaceError(t *testing.T) { + testError := errors.New("test error") + + testCases := map[string]struct { + mockStdlibFuncs func(downloader *Downloader) + expectedError error + }{ + "when io.Copy runs into an error, the downloader should return the error and clean up the downloaded files": { + mockStdlibFuncs: func(downloader *Downloader) { + downloader.copy = func(dst io.Writer, src io.Reader) (int64, error) { + return 0, testError + } + }, + expectedError: testError, + }, + "when os.OpenFile runs into an error, the downloader should return the error and clean up the downloaded files": { + mockStdlibFuncs: func(downloader *Downloader) { + downloader.openFile = func(name string, flag int, perm os.FileMode) (*os.File, error) { + return nil, testError + } + }, + expectedError: testError, + }, + "when os.MkdirAll runs into an error, the downloader should return the error and clean up the downloaded files": { + mockStdlibFuncs: func(downloader *Downloader) { + downloader.mkdirAll = func(name string, perm os.FileMode) error { + return testError + } + }, + expectedError: testError, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + baseDir := t.TempDir() + paths.SetTop(baseDir) + config := &artifact.Config{ + DropPath: filepath.Join(baseDir, "drop"), + TargetDirectory: filepath.Join(baseDir, "target"), + } + + err := os.MkdirAll(config.DropPath, 0o755) + require.NoError(t, err) + + err = os.MkdirAll(config.TargetDirectory, 0o755) + require.NoError(t, err) + + parsedVersion := agtversion.NewParsedSemVer(1, 2, 3, "", "") + + artifactName, err := artifact.GetArtifactName(agentSpec, *parsedVersion, config.OS(), config.Arch()) + require.NoError(t, err) + + sourceArtifactPath := filepath.Join(config.DropPath, artifactName) + sourceArtifactHashPath := sourceArtifactPath + ".sha512" + + err = os.WriteFile(sourceArtifactPath, []byte("test"), 0o666) + require.NoError(t, err, "failed to create source artifact file") + + err = os.WriteFile(sourceArtifactHashPath, []byte("test"), 0o666) + require.NoError(t, err, "failed to create source artifact hash file") + + downloader := NewDownloader(config) + tc.mockStdlibFuncs(downloader) + targetArtifactPath, err := downloader.Download(context.Background(), agentSpec, parsedVersion) + + require.ErrorIs(t, err, tc.expectedError) + + require.NoFileExists(t, targetArtifactPath) + }) + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go index 3c17862dd82..a4a3240aeb3 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go @@ -6,6 +6,7 @@ package http import ( "context" + goerrors "errors" "fmt" "io" "net/http" @@ -20,6 +21,7 @@ import ( "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + downloadErrors "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -49,6 +51,12 @@ type Downloader struct { config *artifact.Config client http.Client upgradeDetails *details.Details + // The following are abstractions for stdlib functions so that we can mock them in tests. + copy func(dst io.Writer, src io.Reader) (int64, error) + mkdirAll func(name string, perm os.FileMode) error + openFile func(name string, flag int, perm os.FileMode) (*os.File, error) + // Abstraction for the disk space error check function so that we can mock it in tests. + isDiskSpaceErrorFunc func(err error) bool } // NewDownloader creates and configures Elastic Downloader @@ -68,10 +76,14 @@ func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails * // NewDownloaderWithClient creates Elastic Downloader with specific client used func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client, upgradeDetails *details.Details) *Downloader { return &Downloader{ - log: log, - config: config, - client: client, - upgradeDetails: upgradeDetails, + log: log, + config: config, + client: client, + upgradeDetails: upgradeDetails, + copy: io.Copy, + mkdirAll: os.MkdirAll, + openFile: os.OpenFile, + isDiskSpaceErrorFunc: downloadErrors.IsDiskSpaceError, } } @@ -179,14 +191,14 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f } if destinationDir := filepath.Dir(fullPath); destinationDir != "" && destinationDir != "." { - if err := os.MkdirAll(destinationDir, 0o755); err != nil { + if err := e.mkdirAll(destinationDir, 0o755); err != nil { return "", err } } - destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions) + destinationFile, err := e.openFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions) if err != nil { - return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath)) + return "", goerrors.Join(errors.New("creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath)), err) } defer destinationFile.Close() @@ -213,11 +225,18 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f detailsObserver := newDetailsProgressObserver(e.upgradeDetails) dp := newDownloadProgressReporter(sourceURI, e.config.Timeout, fileSize, loggingObserver, detailsObserver) dp.Report(ctx) - _, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp)) + + _, err = e.copy(destinationFile, io.TeeReader(resp.Body, dp)) if err != nil { - dp.ReportFailed(err) + // checking for disk space error here before passing it into the reporter + // so the details observer sets the state with clean error message + reportedErr := err + if e.isDiskSpaceErrorFunc(err) { + reportedErr = downloadErrors.ErrInsufficientDiskSpace + } + dp.ReportFailed(reportedErr) // return path, file already exists and needs to be cleaned up - return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) + return fullPath, goerrors.Join(errors.New("copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)), err) } dp.ReportComplete() diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go index bf0ebca963f..83e5be31cc2 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go @@ -23,7 +23,9 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + downloadErrors "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/testutils/fipsutils" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" @@ -528,3 +530,98 @@ func TestDownloadVersion(t *testing.T) { }) } } + +func TestDownloadDiskSpaceError(t *testing.T) { + fipsutils.SkipIfFIPSOnly(t, "elastic.co test server generates an OpenPGP key which results in a SHA-1 violation.") + targetDir, err := os.MkdirTemp(os.TempDir(), "") + if err != nil { + t.Fatal(err) + } + + log, _ := loggertest.New("downloader") + timeout := 30 * time.Second + testCases := getTestCases() + server, _, _ := getElasticCoServer(t) + elasticClient := getElasticCoClient(server) + + config := &artifact.Config{ + SourceURI: source, + TargetDirectory: targetDir, + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: timeout, + }, + } + + testError := errors.New("test error") + + type errorHandlingTestCase struct { + mockStdlibFuncs func(downloader *Downloader) + isDiskSpaceErrorResult bool + expectedError error + } + + errorHandlingTestCases := map[string]errorHandlingTestCase{ + "when io.Copy runs into an error, the downloader should return the error and clean up the downloaded files": { + mockStdlibFuncs: func(downloader *Downloader) { + downloader.copy = func(dst io.Writer, src io.Reader) (int64, error) { + return 0, testError + } + }, + expectedError: testError, + }, + "when io.Copy runs into disk space error, the downloader should report the error and clean up the downloaded files": { + mockStdlibFuncs: func(downloader *Downloader) { + downloader.copy = func(dst io.Writer, src io.Reader) (int64, error) { + return 0, testError + } + }, + isDiskSpaceErrorResult: true, + expectedError: testError, + }, + "when os.OpenFile runs into an error, the downloader should return the error and clean up the downloaded files": { + mockStdlibFuncs: func(downloader *Downloader) { + downloader.openFile = func(name string, flag int, perm os.FileMode) (*os.File, error) { + return nil, testError + } + }, + expectedError: testError, + }, + "when os.MkdirAll runs into an error, the downloader should return the error and clean up the downloaded files": { + mockStdlibFuncs: func(downloader *Downloader) { + downloader.mkdirAll = func(name string, perm os.FileMode) error { + return testError + } + }, + expectedError: testError, + }, + } + + for _, testCase := range testCases { + for name, etc := range errorHandlingTestCases { + + testName := fmt.Sprintf("%s-binary-%s-%s", testCase.system, testCase.arch, name) + t.Run(testName, func(t *testing.T) { + config.OperatingSystem = testCase.system + config.Architecture = testCase.arch + + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, elasticClient, upgradeDetails) + etc.mockStdlibFuncs(testClient) + testClient.isDiskSpaceErrorFunc = func(err error) bool { + return etc.isDiskSpaceErrorResult + } + artifactPath, err := testClient.Download(context.Background(), beatSpec, version) + + require.ErrorIs(t, err, etc.expectedError, "expected error mismatch") + require.NoFileExists(t, artifactPath) + + if etc.isDiskSpaceErrorResult { + require.Equal(t, details.StateFailed, upgradeDetails.State) + require.Equal(t, downloadErrors.ErrInsufficientDiskSpace.Error(), upgradeDetails.Metadata.ErrorMsg) + } + + os.Remove(artifactPath) + }) + } + } +} diff --git a/internal/pkg/agent/application/upgrade/step_download.go b/internal/pkg/agent/application/upgrade/step_download.go index 58d56c81f52..8b8966b3a20 100644 --- a/internal/pkg/agent/application/upgrade/step_download.go +++ b/internal/pkg/agent/application/upgrade/step_download.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/composed" + downloadErrors "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/fs" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/localremote" @@ -40,17 +41,34 @@ type downloaderFactory func(*agtversion.ParsedSemVer, *logger.Logger, *artifact. type downloader func(context.Context, downloaderFactory, *agtversion.ParsedSemVer, *artifact.Config, *details.Details) (string, error) -func (u *Upgrader) downloadArtifact(ctx context.Context, parsedVersion *agtversion.ParsedSemVer, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { +type artifactDownloader struct { + log *logger.Logger + settings *artifact.Config + fleetServerURI string +} + +func newArtifactDownloader(settings *artifact.Config, log *logger.Logger) *artifactDownloader { + return &artifactDownloader{ + log: log, + settings: settings, + } +} + +func (a *artifactDownloader) withFleetServerURI(fleetServerURI string) { + a.fleetServerURI = fleetServerURI +} + +func (a *artifactDownloader) downloadArtifact(ctx context.Context, parsedVersion *agtversion.ParsedSemVer, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { span, ctx := apm.StartSpan(ctx, "downloadArtifact", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() span.End() }() - pgpBytes = u.appendFallbackPGP(parsedVersion, pgpBytes) + pgpBytes = a.appendFallbackPGP(parsedVersion, pgpBytes) // do not update source config - settings := *u.settings + settings := *a.settings var downloaderFunc downloader var factory downloaderFactory var verifier download.Verifier @@ -62,7 +80,7 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, parsedVersion *agtversi // use specific function that doesn't perform retries on download as its // local and no retry should be performed - downloaderFunc = u.downloadOnce + downloaderFunc = a.downloadOnce // set specific downloader, local file just uses the fs.NewDownloader // no fallback is allowed because it was requested that this specific source be used @@ -71,13 +89,13 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, parsedVersion *agtversi } // set specific verifier, local file verifies locally only - verifier, err = fs.NewVerifier(u.log, &settings, release.PGP()) + verifier, err = fs.NewVerifier(a.log, &settings, release.PGP()) if err != nil { return "", errors.New(err, "initiating verifier") } // log that a local upgrade artifact is being used - u.log.Infow("Using local upgrade artifact", "version", parsedVersion, + a.log.Infow("Using local upgrade artifact", "version", parsedVersion, "drop_path", settings.DropPath, "target_path", settings.TargetDirectory, "install_path", settings.InstallPath) } else { @@ -88,21 +106,21 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, parsedVersion *agtversi if factory == nil { // set the factory to the newDownloader factory factory = newDownloader - u.log.Infow("Downloading upgrade artifact", "version", parsedVersion, + a.log.Infow("Downloading upgrade artifact", "version", parsedVersion, "source_uri", settings.SourceURI, "drop_path", settings.DropPath, "target_path", settings.TargetDirectory, "install_path", settings.InstallPath) } if downloaderFunc == nil { - downloaderFunc = u.downloadWithRetries + downloaderFunc = a.downloadWithRetries } if err := os.MkdirAll(paths.Downloads(), 0750); err != nil { - return "", errors.New(err, fmt.Sprintf("failed to create download directory at %s", paths.Downloads())) + return "", fmt.Errorf("failed to create download directory at %s: %w", paths.Downloads(), err) } path, err := downloaderFunc(ctx, factory, parsedVersion, &settings, upgradeDetails) if err != nil { - return "", errors.New(err, "failed download of agent binary") + return "", fmt.Errorf("failed download of agent binary: %w", err) } if skipVerifyOverride { @@ -110,7 +128,7 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, parsedVersion *agtversi } if verifier == nil { - verifier, err = newVerifier(parsedVersion, u.log, &settings) + verifier, err = newVerifier(parsedVersion, a.log, &settings) if err != nil { return "", errors.New(err, "initiating verifier") } @@ -122,7 +140,7 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, parsedVersion *agtversi return path, nil } -func (u *Upgrader) appendFallbackPGP(targetVersion *agtversion.ParsedSemVer, pgpBytes []string) []string { +func (a *artifactDownloader) appendFallbackPGP(targetVersion *agtversion.ParsedSemVer, pgpBytes []string) []string { if pgpBytes == nil { pgpBytes = make([]string, 0, 1) } @@ -131,14 +149,14 @@ func (u *Upgrader) appendFallbackPGP(targetVersion *agtversion.ParsedSemVer, pgp pgpBytes = append(pgpBytes, fallbackPGP) // add a secondary fallback if fleet server is configured - u.log.Debugf("Considering fleet server uri for pgp check fallback %q", u.fleetServerURI) - if u.fleetServerURI != "" { + a.log.Debugf("Considering fleet server uri for pgp check fallback %q", a.fleetServerURI) + if a.fleetServerURI != "" { secondaryPath, err := url.JoinPath( - u.fleetServerURI, + a.fleetServerURI, fmt.Sprintf(fleetUpgradeFallbackPGPFormat, targetVersion.Major(), targetVersion.Minor(), targetVersion.Patch()), ) if err != nil { - u.log.Warnf("failed to compose Fleet Server URI: %v", err) + a.log.Warnf("failed to compose Fleet Server URI: %v", err) } else { secondaryFallback := download.PgpSourceURIPrefix + secondaryPath pgpBytes = append(pgpBytes, secondaryFallback) @@ -194,14 +212,14 @@ func newVerifier(version *agtversion.ParsedSemVer, log *logger.Logger, settings return composed.NewVerifier(log, fsVerifier, snapshotVerifier, remoteVerifier), nil } -func (u *Upgrader) downloadOnce( +func (a *artifactDownloader) downloadOnce( ctx context.Context, factory downloaderFactory, version *agtversion.ParsedSemVer, settings *artifact.Config, upgradeDetails *details.Details, ) (string, error) { - downloader, err := factory(version, u.log, settings, upgradeDetails) + downloader, err := factory(version, a.log, settings, upgradeDetails) if err != nil { return "", fmt.Errorf("unable to create fetcher: %w", err) } @@ -217,7 +235,7 @@ func (u *Upgrader) downloadOnce( return path, nil } -func (u *Upgrader) downloadWithRetries( +func (a *artifactDownloader) downloadWithRetries( ctx context.Context, factory downloaderFactory, version *agtversion.ParsedSemVer, @@ -239,17 +257,21 @@ func (u *Upgrader) downloadWithRetries( opFn := func() error { attempt++ - u.log.Infof("download attempt %d", attempt) + a.log.Infof("download attempt %d", attempt) var err error - path, err = u.downloadOnce(cancelCtx, factory, version, settings, upgradeDetails) + path, err = a.downloadOnce(cancelCtx, factory, version, settings, upgradeDetails) if err != nil { + if downloadErrors.IsDiskSpaceError(err) { + a.log.Infof("insufficient disk space error detected, stopping retries") + return backoff.Permanent(err) + } return err } return nil } opFailureNotificationFn := func(err error, retryAfter time.Duration) { - u.log.Warnf("download attempt %d failed: %s; retrying in %s.", + a.log.Warnf("download attempt %d failed: %s; retrying in %s.", attempt, err.Error(), retryAfter) upgradeDetails.SetRetryableError(err) } diff --git a/internal/pkg/agent/application/upgrade/step_download_test.go b/internal/pkg/agent/application/upgrade/step_download_test.go index f1e20427c25..1cd8cef6f56 100644 --- a/internal/pkg/agent/application/upgrade/step_download_test.go +++ b/internal/pkg/agent/application/upgrade/step_download_test.go @@ -15,9 +15,9 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/transport/httpcommon" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + downloadErrors "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -55,11 +55,11 @@ func TestFallbackIsAppended(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { l, _ := loggertest.New(tc.name) - u := Upgrader{ + a := artifactDownloader{ fleetServerURI: tc.fleetServerURI, log: l, } - res := u.appendFallbackPGP(tc.targetVersion, tc.passedBytes) + res := a.appendFallbackPGP(tc.targetVersion, tc.passedBytes) // check default fallback is passed and is very last require.NotNil(t, res) require.Equal(t, tc.expectedLen, len(res)) @@ -91,8 +91,7 @@ func TestDownloadWithRetries(t *testing.T) { return &mockDownloader{expectedDownloadPath, nil}, nil } - u, err := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) - require.NoError(t, err) + a := newArtifactDownloader(&settings, testLogger) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) @@ -100,7 +99,7 @@ func TestDownloadWithRetries(t *testing.T) { upgradeDetails, upgradeDetailsRetryUntil, upgradeDetailsRetryUntilWasUnset, upgradeDetailsRetryErrorMsg := mockUpgradeDetails(parsedVersion) minRetryDeadline := time.Now().Add(settings.Timeout) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) + path, err := a.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -141,8 +140,7 @@ func TestDownloadWithRetries(t *testing.T) { return nil, nil } - u, err := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) - require.NoError(t, err) + a := newArtifactDownloader(&settings, testLogger) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) @@ -150,7 +148,7 @@ func TestDownloadWithRetries(t *testing.T) { upgradeDetails, upgradeDetailsRetryUntil, upgradeDetailsRetryUntilWasUnset, upgradeDetailsRetryErrorMsg := mockUpgradeDetails(parsedVersion) minRetryDeadline := time.Now().Add(settings.Timeout) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) + path, err := a.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -196,8 +194,7 @@ func TestDownloadWithRetries(t *testing.T) { return nil, nil } - u, err := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) - require.NoError(t, err) + a := newArtifactDownloader(&settings, testLogger) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) @@ -205,7 +202,7 @@ func TestDownloadWithRetries(t *testing.T) { upgradeDetails, upgradeDetailsRetryUntil, upgradeDetailsRetryUntilWasUnset, upgradeDetailsRetryErrorMsg := mockUpgradeDetails(parsedVersion) minRetryDeadline := time.Now().Add(settings.Timeout) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) + path, err := a.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -241,8 +238,7 @@ func TestDownloadWithRetries(t *testing.T) { return &mockDownloader{"", errors.New("download failed")}, nil } - u, err := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) - require.NoError(t, err) + a := newArtifactDownloader(&settings, testLogger) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) @@ -250,7 +246,7 @@ func TestDownloadWithRetries(t *testing.T) { upgradeDetails, upgradeDetailsRetryUntil, upgradeDetailsRetryUntilWasUnset, upgradeDetailsRetryErrorMsg := mockUpgradeDetails(parsedVersion) minRetryDeadline := time.Now().Add(testCaseSettings.Timeout) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings, upgradeDetails) + path, err := a.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings, upgradeDetails) require.Equal(t, "context deadline exceeded", err.Error()) require.Equal(t, "", path) @@ -276,6 +272,35 @@ func TestDownloadWithRetries(t *testing.T) { require.NotEmpty(t, *upgradeDetailsRetryErrorMsg) require.Equal(t, *upgradeDetailsRetryErrorMsg, upgradeDetails.Metadata.RetryErrorMsg) }) + + t.Run("insufficient disk space stops retries", func(t *testing.T) { + numberOfAttempts := 0 + diskSpaceError := downloadErrors.OS_DiskSpaceErrors[0] + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { + numberOfAttempts++ + return &mockDownloader{"", diskSpaceError}, nil + } + + a := newArtifactDownloader(&settings, testLogger) + + parsedVersion, err := agtversion.ParseVersion("8.9.0") + require.NoError(t, err) + + upgradeDetails, upgradeDetailsRetryUntil, upgradeDetailsRetryUntilWasUnset, upgradeDetailsRetryErrorMsg := mockUpgradeDetails(parsedVersion) + + path, err := a.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) + + require.Error(t, err) + require.Equal(t, "", path) + + require.Equal(t, 1, numberOfAttempts) + require.ErrorIs(t, err, diskSpaceError) + + require.NotZero(t, *upgradeDetailsRetryUntil) + require.False(t, *upgradeDetailsRetryUntilWasUnset) + + require.Empty(t, *upgradeDetailsRetryErrorMsg) + }) } // mockUpgradeDetails returns a *details.Details value that has an observer registered on it for inspecting @@ -307,3 +332,9 @@ func mockUpgradeDetails(parsedVersion *agtversion.ParsedSemVer) (*details.Detail &upgradeDetailsRetryUntil, &upgradeDetailsRetryUntilWasUnset, &upgradeDetailsRetryErrorMsg } + +func TestWithFleetServerURI(t *testing.T) { + a := &artifactDownloader{} + a.withFleetServerURI("mockURI") + require.Equal(t, "mockURI", a.fleetServerURI) +} diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index 04c91c3a7d0..d933d069ddc 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + upgradeErrors "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" @@ -68,6 +69,11 @@ func init() { } } +type artifactDownloadHandler interface { + downloadArtifact(ctx context.Context, parsedVersion *agtversion.ParsedSemVer, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) + withFleetServerURI(fleetServerURI string) +} + // Upgrader performs an upgrade type Upgrader struct { log *logger.Logger @@ -76,6 +82,10 @@ type Upgrader struct { upgradeable bool fleetServerURI string markerWatcher MarkerWatcher + + // The following are abstractions for testability + artifactDownloader artifactDownloadHandler + isDiskSpaceErrorFunc func(err error) bool } // IsUpgradeable when agent is installed and running as a service or flag was provided. @@ -88,11 +98,13 @@ func IsUpgradeable() bool { // NewUpgrader creates an upgrader which is capable of performing upgrade operation func NewUpgrader(log *logger.Logger, settings *artifact.Config, agentInfo info.Agent) (*Upgrader, error) { return &Upgrader{ - log: log, - settings: settings, - agentInfo: agentInfo, - upgradeable: IsUpgradeable(), - markerWatcher: newMarkerFileWatcher(markerFilePath(paths.Data()), log), + log: log, + settings: settings, + agentInfo: agentInfo, + upgradeable: IsUpgradeable(), + markerWatcher: newMarkerFileWatcher(markerFilePath(paths.Data()), log), + artifactDownloader: newArtifactDownloader(settings, log), + isDiskSpaceErrorFunc: upgradeErrors.IsDiskSpaceError, }, nil } @@ -101,10 +113,12 @@ func (u *Upgrader) SetClient(c fleetclient.Sender) { if c == nil { u.log.Debug("client nil, resetting Fleet Server URI") u.fleetServerURI = "" + u.artifactDownloader.withFleetServerURI("") } u.fleetServerURI = c.URI() u.log.Debugf("Set client changed URI to %s", u.fleetServerURI) + u.artifactDownloader.withFleetServerURI(u.fleetServerURI) } // Reload reloads the artifact configuration for the upgrader. @@ -197,6 +211,16 @@ func checkUpgrade(log *logger.Logger, currentVersion, newVersion agentVersion, m func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, det *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { u.log.Infow("Upgrading agent", "version", version, "source_uri", sourceURI) + defer func() { + if err != nil { + // Add the disk space error to the error chain if it is a disk space error + // so that we can use errors.Is to check for it + if u.isDiskSpaceErrorFunc(err) { + err = goerrors.Join(err, upgradeErrors.ErrInsufficientDiskSpace) + } + } + }() + currentVersion := agentVersion{ version: release.Version(), snapshot: release.Snapshot(), @@ -237,7 +261,7 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, fmt.Errorf("error parsing version %q: %w", version, err) } - archivePath, err := u.downloadArtifact(ctx, parsedVersion, sourceURI, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) + archivePath, err := u.artifactDownloader.downloadArtifact(ctx, parsedVersion, sourceURI, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { // Run the same pre-upgrade cleanup task to get rid of any newly downloaded files // This may have an issue if users are upgrading to the same version number. diff --git a/internal/pkg/agent/application/upgrade/upgrade_test.go b/internal/pkg/agent/application/upgrade/upgrade_test.go index 17d19252f6e..4af120d211b 100644 --- a/internal/pkg/agent/application/upgrade/upgrade_test.go +++ b/internal/pkg/agent/application/upgrade/upgrade_test.go @@ -8,6 +8,9 @@ import ( "context" "crypto/tls" "fmt" + "io" + "net/http" + "net/url" "os" "path/filepath" "runtime" @@ -25,6 +28,7 @@ import ( "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + upgradeErrors "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/config" @@ -37,6 +41,7 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" agtversion "github.com/elastic/elastic-agent/pkg/version" + "github.com/elastic/elastic-agent/testing/mocks/internal_/pkg/agent/application/info" mocks "github.com/elastic/elastic-agent/testing/mocks/pkg/control/v2/client" ) @@ -1292,3 +1297,79 @@ func (f *fakeAcker) Commit(ctx context.Context) error { args := f.Called(ctx) return args.Error(0) } + +type mockArtifactDownloader struct { + returnError error + fleetServerURI string +} + +func (m *mockArtifactDownloader) downloadArtifact(ctx context.Context, parsedVersion *agtversion.ParsedSemVer, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { + return "", m.returnError +} + +func (m *mockArtifactDownloader) withFleetServerURI(fleetServerURI string) { + m.fleetServerURI = fleetServerURI +} + +func TestUpgradeErrorHandling(t *testing.T) { + log, _ := loggertest.New("test") + testError := errors.New("test error") + + type testCase struct { + isDiskSpaceErrorResult bool + expectedError error + } + + testCases := map[string]testCase{ + "should return error if downloadArtifact fails": { + isDiskSpaceErrorResult: false, + expectedError: testError, + }, + "should add disk space error to the error chain if downloadArtifact fails with disk space error": { + isDiskSpaceErrorResult: true, + expectedError: upgradeErrors.ErrInsufficientDiskSpace, + }, + } + + mockAgentInfo := info.NewAgent(t) + mockAgentInfo.On("Version").Return("9.0.0") + + upgrader, err := NewUpgrader(log, &artifact.Config{}, mockAgentInfo) + require.NoError(t, err) + + upgrader.artifactDownloader = &mockArtifactDownloader{ + returnError: testError, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + upgrader.isDiskSpaceErrorFunc = func(err error) bool { + return tc.isDiskSpaceErrorResult + } + + _, err = upgrader.Upgrade(context.Background(), "9.0.0", "", nil, details.NewDetails("9.0.0", details.StateRequested, "test"), true, true) + require.ErrorIs(t, err, tc.expectedError) + }) + } +} + +type mockSender struct{} + +func (m *mockSender) Send(ctx context.Context, method, path string, params url.Values, headers http.Header, body io.Reader) (*http.Response, error) { + return nil, nil +} + +func (m *mockSender) URI() string { + return "mockURI" +} + +func TestSetClient(t *testing.T) { + log, _ := loggertest.New("test") + upgrader := &Upgrader{ + log: log, + artifactDownloader: &mockArtifactDownloader{}, + } + + upgrader.SetClient(&mockSender{}) + require.Equal(t, "mockURI", upgrader.artifactDownloader.(*mockArtifactDownloader).fleetServerURI) +}