diff --git a/CHANGELOG.md b/CHANGELOG.md index fb01005e0d3..3d9924b9132 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ * [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111 * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 * [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112 +* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 * [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 5fb44208ec8..04a08cfa646 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -295,6 +295,17 @@ compactor: # CLI flag: -compactor.block-visit-marker-file-update-interval [block_visit_marker_file_update_interval: | default = 1m] + # How long cleaner visit marker file should be considered as expired and able + # to be picked up by cleaner again. The value should be smaller than + # -compactor.cleanup-interval + # CLI flag: -compactor.cleaner-visit-marker-timeout + [cleaner_visit_marker_timeout: | default = 10m] + + # How frequently cleaner visit marker file should be updated when cleaning + # user. + # CLI flag: -compactor.cleaner-visit-marker-file-update-interval + [cleaner_visit_marker_file_update_interval: | default = 5m] + # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index df036ed2f6b..b83996a4c20 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2232,6 +2232,16 @@ sharding_ring: # CLI flag: -compactor.block-visit-marker-file-update-interval [block_visit_marker_file_update_interval: | default = 1m] +# How long cleaner visit marker file should be considered as expired and able to +# be picked up by cleaner again. The value should be smaller than +# -compactor.cleanup-interval +# CLI flag: -compactor.cleaner-visit-marker-timeout +[cleaner_visit_marker_timeout: | default = 10m] + +# How frequently cleaner visit marker file should be updated when cleaning user. +# CLI flag: -compactor.cleaner-visit-marker-file-update-interval +[cleaner_visit_marker_file_update_interval: | default = 5m] + # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index a447231b64e..3ea46a5f38a 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -50,9 +50,14 @@ type BlocksCleaner struct { bucketClient objstore.InstrumentedBucket usersScanner *cortex_tsdb.UsersScanner + ringLifecyclerID string + // Keep track of the last owned users. lastOwnedUsers []string + cleanerVisitMarkerTimeout time.Duration + cleanerVisitMarkerFileUpdateInterval time.Duration + // Metrics. runsStarted *prometheus.CounterVec runsCompleted *prometheus.CounterVec @@ -76,15 +81,21 @@ func NewBlocksCleaner( usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, + ringLifecyclerID string, reg prometheus.Registerer, + cleanerVisitMarkerTimeout time.Duration, + cleanerVisitMarkerFileUpdateInterval time.Duration, blocksMarkedForDeletion *prometheus.CounterVec, ) *BlocksCleaner { c := &BlocksCleaner{ - cfg: cfg, - bucketClient: bucketClient, - usersScanner: usersScanner, - cfgProvider: cfgProvider, - logger: log.With(logger, "component", "cleaner"), + cfg: cfg, + bucketClient: bucketClient, + usersScanner: usersScanner, + cfgProvider: cfgProvider, + logger: log.With(logger, "component", "cleaner"), + ringLifecyclerID: ringLifecyclerID, + cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout, + cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval, runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_compactor_block_cleanup_started_total", Help: "Total number of blocks cleanup runs started.", @@ -246,7 +257,15 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string, return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) + visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) + if err != nil { + return err + } + if isVisited { + return nil + } errChan := make(chan error, 1) + go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) defer func() { errChan <- nil }() @@ -273,7 +292,15 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) + visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket) + if err != nil { + return err + } + if isVisited { + return nil + } errChan := make(chan error, 1) + go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) defer func() { errChan <- nil }() @@ -307,6 +334,19 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro return users, deleted, nil } +func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) { + cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID) + visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker) + + existingCleanerVisitMarker := &CleanerVisitMarker{} + err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker) + if err != nil && !errors.Is(err, errorVisitMarkerNotFound) { + return nil, false, errors.Wrapf(err, "failed to read cleaner visit marker") + } + isVisited = !errors.Is(err, errorVisitMarkerNotFound) && existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) + return visitMarkerManager, isVisited, nil +} + // Remove blocks and remaining data for tenant marked for deletion. func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 584b3984dcc..d3c7aa6da9e 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) // Clean User with no error cleaner.bucketClient = bkt @@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) require.NoError(t, err) require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true)) @@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) @@ -628,6 +628,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Existing behaviour - retention period disabled. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-1"] = 0 cfgProvider.userRetentionPeriods["user-2"] = 0 @@ -662,6 +666,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Retention enabled only for a single user, but does nothing. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-1"] = 9 * time.Hour activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) @@ -677,6 +685,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Retention enabled only for a single user, marking a single block. // Note the block won't be deleted yet due to deletion delay. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-1"] = 7 * time.Hour activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) @@ -710,6 +722,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Marking the block again, before the deletion occurs, should not cause an error. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) require.NoError(t, err) require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) @@ -722,6 +738,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Reduce the deletion delay. Now the block will be deleted. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cleaner.cfg.DeletionDelay = 0 activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) @@ -755,6 +775,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Retention enabled for other user; test deleting multiple blocks. { + // clean up cleaner visit marker before running test + bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck + cfgProvider.userRetentionPeriods["user-2"] = 5 * time.Hour activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) diff --git a/pkg/compactor/cleaner_visit_marker.go b/pkg/compactor/cleaner_visit_marker.go new file mode 100644 index 00000000000..b31e8810666 --- /dev/null +++ b/pkg/compactor/cleaner_visit_marker.go @@ -0,0 +1,66 @@ +package compactor + +import ( + "fmt" + "path" + "time" + + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" +) + +const ( + // CleanerVisitMarkerName is the name of cleaner visit marker file. + CleanerVisitMarkerName = "cleaner-visit-marker.json" + // CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file. + CleanerVisitMarkerVersion1 = 1 +) + +type CleanerVisitMarker struct { + CompactorID string `json:"compactorID"` + Status VisitStatus `json:"status"` + // VisitTime is a unix timestamp of when the partition was visited (mark updated). + VisitTime int64 `json:"visitTime"` + // Version of the file. + Version int `json:"version"` +} + +func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker { + return &CleanerVisitMarker{ + CompactorID: compactorID, + Version: CleanerVisitMarkerVersion1, + } +} + +func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool { + return !time.Now().Before(time.Unix(b.VisitTime, 0).Add(cleanerVisitMarkerTimeout)) +} + +func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool { + return !(b.GetStatus() == Completed) && !(b.GetStatus() == Failed) && !b.IsExpired(cleanerVisitMarkerTimeout) +} + +func (b *CleanerVisitMarker) GetStatus() VisitStatus { + return b.Status +} + +func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string { + return GetCleanerVisitMarkerFilePath() +} + +func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) { + b.CompactorID = ownerIdentifier + b.Status = status + b.VisitTime = time.Now().Unix() +} + +func (b *CleanerVisitMarker) String() string { + return fmt.Sprintf("compactor_id=%s status=%s visit_time=%s", + b.CompactorID, + b.Status, + time.Unix(b.VisitTime, 0).String(), + ) +} + +func GetCleanerVisitMarkerFilePath() string { + return path.Join(bucketindex.MarkersPathname, CleanerVisitMarkerName) +} diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 0d03348d6be..ff7907fe2c5 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -216,6 +216,10 @@ type Config struct { BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` + // Cleaner visit marker file config + CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"` + CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"` + AcceptMalformedIndex bool `yaml:"accept_malformed_index"` CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` } @@ -255,6 +259,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.") f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") + f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again. The value should be smaller than -compactor.cleanup-interval") + f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.") + f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status") } @@ -522,15 +529,7 @@ func (c *Compactor) starting(ctx context.Context) error { // Create the users scanner. c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger) - // Create the blocks cleaner (service). - c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ - DeletionDelay: c.compactorCfg.DeletionDelay, - CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1), - CleanupConcurrency: c.compactorCfg.CleanupConcurrency, - BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, - TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorMetrics.syncerBlocksMarkedForDeletion) - + var cleanerRingLifecyclerID = "default-cleaner" // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() @@ -539,6 +538,8 @@ func (c *Compactor) starting(ctx context.Context) error { return errors.Wrap(err, "unable to initialize compactor ring lifecycler") } + cleanerRingLifecyclerID = c.ringLifecycler.ID + c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring") @@ -588,6 +589,16 @@ func (c *Compactor) starting(ctx context.Context) error { } } + // Create the blocks cleaner (service). + c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ + DeletionDelay: c.compactorCfg.DeletionDelay, + CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1), + CleanupConcurrency: c.compactorCfg.CleanupConcurrency, + BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, + TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, + }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, + c.compactorMetrics.syncerBlocksMarkedForDeletion) + // Ensure an initial cleanup occurred before starting the compactor. if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil { c.ringSubservices.StopAsync() diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 6f283bdcf7d..908f962cf29 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -173,6 +173,9 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) @@ -239,7 +242,6 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { # HELP cortex_compactor_blocks_cleaned_total Total number of blocks deleted. # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 0 - # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter cortex_compactor_blocks_marked_for_no_compaction_total 0 @@ -332,7 +334,6 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # HELP cortex_compactor_blocks_cleaned_total Total number of blocks deleted. # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 0 - # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter cortex_compactor_blocks_marked_for_no_compaction_total 0 @@ -381,6 +382,9 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -436,6 +440,9 @@ func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) @@ -485,7 +492,13 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockGet("user-2/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-2/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-2/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) @@ -641,6 +654,10 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { "user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", }, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", nil) bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", nil) bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil) @@ -740,7 +757,13 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockGet("user-2/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-2/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-2/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", mockNoCompactBlockJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -816,6 +839,10 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockGet(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), `{"deletion_time": 1}`, nil) bucketClient.MockUpload(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/index", "some index content", nil) @@ -979,7 +1006,13 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockGet("user-2/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-2/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-2/markers/cleaner-visit-marker.json", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) @@ -1078,6 +1111,9 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -1212,6 +1248,9 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit bucketClient.MockIter(userID+"/", blockFiles, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete(userID+"/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) @@ -2005,6 +2044,9 @@ func TestCompactor_FailedWithRetriableError(t *testing.T) { bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, errors.New("test retriable error")) @@ -2056,6 +2098,9 @@ func TestCompactor_FailedWithHaltError(t *testing.T) { bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, compact.HaltError{}) diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go new file mode 100644 index 00000000000..ebe675556d8 --- /dev/null +++ b/pkg/compactor/visit_marker.go @@ -0,0 +1,155 @@ +package compactor + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/compact" + + "github.com/cortexproject/cortex/pkg/util/runutil" +) + +var ( + errorVisitMarkerNotFound = errors.New("visit marker not found") + errorUnmarshalVisitMarker = errors.New("unmarshal visit marker JSON") +) + +type VisitStatus string + +const ( + Pending VisitStatus = "pending" + InProgress VisitStatus = "inProgress" + Completed VisitStatus = "completed" + Failed VisitStatus = "failed" +) + +type VisitMarker interface { + GetVisitMarkerFilePath() string + UpdateStatus(ownerIdentifier string, status VisitStatus) + GetStatus() VisitStatus + IsExpired(visitMarkerTimeout time.Duration) bool + String() string +} + +type VisitMarkerManager struct { + bkt objstore.InstrumentedBucket + logger log.Logger + ownerIdentifier string + visitMarker VisitMarker +} + +func NewVisitMarkerManager( + bkt objstore.InstrumentedBucket, + logger log.Logger, + ownerIdentifier string, + visitMarker VisitMarker, +) *VisitMarkerManager { + return &VisitMarkerManager{ + bkt: bkt, + logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)), + ownerIdentifier: ownerIdentifier, + visitMarker: visitMarker, + } +} + +func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool) { + level.Info(v.getLogger()).Log("msg", "start visit marker heart beat") + ticker := time.NewTicker(visitMarkerFileUpdateInterval) + defer ticker.Stop() +heartBeat: + for { + v.MarkWithStatus(ctx, InProgress) + + select { + case <-ctx.Done(): + level.Warn(v.getLogger()).Log("msg", "visit marker heart beat got cancelled") + v.MarkWithStatus(context.Background(), Pending) + break heartBeat + case <-ticker.C: + continue + case err := <-errChan: + if err == nil { + level.Info(v.getLogger()).Log("msg", "update visit marker to completed status") + v.MarkWithStatus(ctx, Completed) + } else { + level.Warn(v.getLogger()).Log("msg", "stop visit marker heart beat due to error", "err", err) + if compact.IsHaltError(err) { + level.Info(v.getLogger()).Log("msg", "update visit marker to failed status", "err", err) + v.MarkWithStatus(ctx, Failed) + } else { + level.Info(v.getLogger()).Log("msg", "update visit marker to pending status", "err", err) + v.MarkWithStatus(ctx, Pending) + } + } + break heartBeat + } + } + level.Info(v.getLogger()).Log("msg", "stop visit marker heart beat") + if deleteOnExit { + level.Info(v.getLogger()).Log("msg", "delete visit marker when exiting heart beat") + v.DeleteVisitMarker(context.Background()) + } +} + +func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus) { + v.visitMarker.UpdateStatus(v.ownerIdentifier, status) + if err := v.updateVisitMarker(ctx); err != nil { + level.Error(v.getLogger()).Log("msg", "unable to upsert visit marker file content", "new_status", status, "err", err) + return + } + level.Debug(v.getLogger()).Log("msg", "marked with new status", "new_status", status) +} + +func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context) { + if err := v.bkt.Delete(ctx, v.visitMarker.GetVisitMarkerFilePath()); err != nil { + level.Error(v.getLogger()).Log("msg", "failed to delete visit marker", "err", err) + return + } + level.Debug(v.getLogger()).Log("msg", "visit marker deleted") +} + +func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error { + visitMarkerFile := v.visitMarker.GetVisitMarkerFilePath() + visitMarkerFileReader, err := v.bkt.ReaderWithExpectedErrs(v.bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) + if err != nil { + if v.bkt.IsObjNotFoundErr(err) { + return errors.Wrapf(errorVisitMarkerNotFound, "visit marker file: %s", visitMarkerFile) + } + return errors.Wrapf(err, "get visit marker file: %s", visitMarkerFile) + } + defer runutil.CloseWithLogOnErr(v.getLogger(), visitMarkerFileReader, "close visit marker reader") + b, err := io.ReadAll(visitMarkerFileReader) + if err != nil { + return errors.Wrapf(err, "read visit marker file: %s", visitMarkerFile) + } + if err = json.Unmarshal(b, visitMarker); err != nil { + return errors.Wrapf(errorUnmarshalVisitMarker, "visit marker file: %s, content: %s, error: %v", visitMarkerFile, string(b), err.Error()) + } + level.Debug(v.getLogger()).Log("msg", "visit marker read from file", "visit_marker_file", visitMarkerFile) + return nil +} + +func (v *VisitMarkerManager) updateVisitMarker(ctx context.Context) error { + visitMarkerFileContent, err := json.Marshal(v.visitMarker) + if err != nil { + return err + } + + reader := bytes.NewReader(visitMarkerFileContent) + if err := v.bkt.Upload(ctx, v.visitMarker.GetVisitMarkerFilePath(), reader); err != nil { + return err + } + return nil +} + +func (v *VisitMarkerManager) getLogger() log.Logger { + return log.With(v.logger, "visit_marker", v.visitMarker.String()) +} diff --git a/pkg/compactor/visit_marker_test.go b/pkg/compactor/visit_marker_test.go new file mode 100644 index 00000000000..2e3eae5b606 --- /dev/null +++ b/pkg/compactor/visit_marker_test.go @@ -0,0 +1,238 @@ +package compactor + +import ( + "context" + "crypto/rand" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/compact" + + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" +) + +func TestMarkPending(t *testing.T) { + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) + visitMarkerManager.MarkWithStatus(ctx, Pending) + + require.Equal(t, Pending, testVisitMarker.Status) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, Pending, visitMarkerFromFile.Status) +} + +func TestMarkInProgress(t *testing.T) { + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) + visitMarkerManager.MarkWithStatus(ctx, InProgress) + + require.Equal(t, InProgress, testVisitMarker.Status) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, InProgress, visitMarkerFromFile.Status) +} + +func TestMarkCompleted(t *testing.T) { + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) + visitMarkerManager.MarkWithStatus(ctx, Completed) + + require.Equal(t, Completed, testVisitMarker.Status) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, Completed, visitMarkerFromFile.Status) +} + +func TestUpdateExistingVisitMarker(t *testing.T) { + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + + ownerIdentifier1 := "test-owner-1" + testVisitMarker1 := NewTestVisitMarker(ownerIdentifier1) + visitMarkerManager1 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier1, testVisitMarker1) + visitMarkerManager1.MarkWithStatus(ctx, InProgress) + + ownerIdentifier2 := "test-owner-2" + testVisitMarker2 := &TestVisitMarker{ + OwnerIdentifier: ownerIdentifier2, + markerID: testVisitMarker1.markerID, + StoredValue: testVisitMarker1.StoredValue, + } + visitMarkerManager2 := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier2, testVisitMarker2) + visitMarkerManager2.MarkWithStatus(ctx, Completed) + + visitMarkerFromFile := &TestVisitMarker{} + err := visitMarkerManager2.ReadVisitMarker(ctx, visitMarkerFromFile) + require.NoError(t, err) + require.Equal(t, ownerIdentifier2, visitMarkerFromFile.OwnerIdentifier) + require.Equal(t, Completed, visitMarkerFromFile.Status) +} + +func TestHeartBeat(t *testing.T) { + for _, tcase := range []struct { + name string + isCancelled bool + callerErr error + expectedStatus VisitStatus + deleteOnExit bool + }{ + { + name: "heart beat got cancelled", + isCancelled: true, + callerErr: nil, + expectedStatus: Pending, + deleteOnExit: false, + }, + { + name: "heart beat complete without error", + isCancelled: false, + callerErr: nil, + expectedStatus: Completed, + deleteOnExit: false, + }, + { + name: "heart beat stopped due to halt error", + isCancelled: false, + callerErr: compact.HaltError{}, + expectedStatus: Failed, + deleteOnExit: false, + }, + { + name: "heart beat stopped due to non halt error", + isCancelled: false, + callerErr: fmt.Errorf("some error"), + expectedStatus: Pending, + deleteOnExit: false, + }, + { + name: "heart beat got cancelled and delete visit marker on exit", + isCancelled: true, + callerErr: nil, + expectedStatus: Pending, + deleteOnExit: true, + }, + { + name: "heart beat complete without error and delete visit marker on exit", + isCancelled: false, + callerErr: nil, + expectedStatus: Completed, + deleteOnExit: true, + }, + { + name: "heart beat stopped due to caller error and delete visit marker on exit", + isCancelled: false, + callerErr: fmt.Errorf("some error"), + expectedStatus: Failed, + deleteOnExit: true, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + errChan := make(chan error, 1) + + ownerIdentifier := "test-owner" + testVisitMarker := NewTestVisitMarker(ownerIdentifier) + resultTestVisitMarker := CopyTestVisitMarker(testVisitMarker) + visitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, testVisitMarker) + go visitMarkerManager.HeartBeat(ctx, errChan, time.Second, tcase.deleteOnExit) + + time.Sleep(2 * time.Second) + if tcase.isCancelled { + cancel() + } else { + errChan <- tcase.callerErr + defer cancel() + } + time.Sleep(2 * time.Second) + + if tcase.deleteOnExit { + exists, err := bkt.Exists(context.Background(), testVisitMarker.GetVisitMarkerFilePath()) + require.NoError(t, err) + require.False(t, exists) + } else { + resultVisitMarkerManager := NewVisitMarkerManager(objstore.WithNoopInstr(bkt), logger, ownerIdentifier, resultTestVisitMarker) + err := resultVisitMarkerManager.ReadVisitMarker(context.Background(), resultTestVisitMarker) + require.NoError(t, err) + require.Equal(t, tcase.expectedStatus, resultTestVisitMarker.Status) + } + }) + } +} + +type TestVisitMarker struct { + OwnerIdentifier string `json:"ownerIdentifier"` + Status VisitStatus `json:"status"` + StoredValue string `json:"storedValue"` + + markerID ulid.ULID +} + +func (t *TestVisitMarker) IsExpired(visitMarkerTimeout time.Duration) bool { + return true +} + +func (t *TestVisitMarker) GetStatus() VisitStatus { + return t.Status +} + +func NewTestVisitMarker(ownerIdentifier string) *TestVisitMarker { + return &TestVisitMarker{ + OwnerIdentifier: ownerIdentifier, + markerID: ulid.MustNew(uint64(time.Now().UnixMilli()), rand.Reader), + StoredValue: "initial value", + } +} + +func CopyTestVisitMarker(sourceVisitMarker *TestVisitMarker) *TestVisitMarker { + return &TestVisitMarker{ + OwnerIdentifier: sourceVisitMarker.OwnerIdentifier, + markerID: sourceVisitMarker.markerID, + StoredValue: sourceVisitMarker.StoredValue, + } +} + +func (t *TestVisitMarker) GetVisitMarkerFilePath() string { + return fmt.Sprintf("test-visit-marker-%s.json", t.markerID.String()) +} + +func (t *TestVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) { + t.OwnerIdentifier = ownerIdentifier + t.Status = status +} + +func (t *TestVisitMarker) String() string { + return fmt.Sprintf("id=%s ownerIdentifier=%s status=%s storedValue=%s", t.markerID.String(), t.OwnerIdentifier, t.Status, t.StoredValue) +}