From 2b8880073bb3733c8671df886de4bb39356dc5dc Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 19 Jun 2023 23:31:23 -0700 Subject: [PATCH 1/4] fix ooo blocks shipping Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + pkg/ingester/ingester.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2085470f507..daafc3b85e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 * [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 * [BUGFIX] Store-Gateway and AlertManager: Add a `wait_instance_time_out` to WaitInstanceState context to avoid waiting forever. #5581 +* [BUGFIX] Ingester: Allow shipper to upload compacted blocks when out of order samples is enabled. #5416 ## 1.15.1 2023-04-26 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 96d8df52f7b..0918973243a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1967,7 +1967,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { if maxExemplarsForUser > 0 { enableExemplars = true } - oooTimeWindow := i.limits.OutOfOrderTimeWindow(userID) + oooTimeWindow := time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds() walCompressType := wlog.CompressionNone // TODO(yeya24): expose zstd compression for WAL. if i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled { @@ -2044,8 +2044,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { func() labels.Labels { return l }, metadata.ReceiveSource, func() bool { - return oooTimeWindow > 0 // Upload compacted blocks when OOO is enabled. - }, + return time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds() > 0 + }, // No need to upload compacted blocks unless out of order samples is enabled. true, // Allow out of order uploads. It's fine in Cortex's context. metadata.NoneFunc, ) From 0f0a0cff03016e0f4eebec38b2cf766930f5b18e Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 27 Oct 2023 20:08:25 -0700 Subject: [PATCH 2/4] fix ooo time window Signed-off-by: Ben Ye --- pkg/ingester/ingester.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 0918973243a..a814e60540f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1990,7 +1990,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { MaxExemplars: maxExemplarsForUser, HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown, - OutOfOrderTimeWindow: time.Duration(oooTimeWindow).Milliseconds(), + OutOfOrderTimeWindow: oooTimeWindow, OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, }, nil) if err != nil { @@ -2044,7 +2044,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { func() labels.Labels { return l }, metadata.ReceiveSource, func() bool { - return time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds() > 0 + return i.limits.OutOfOrderTimeWindow(userID) > 0 }, // No need to upload compacted blocks unless out of order samples is enabled. true, // Allow out of order uploads. It's fine in Cortex's context. metadata.NoneFunc, From 3d9f22d8a1d556cd08d6ebb848c5a6107409bb8b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 Oct 2023 23:24:11 -0700 Subject: [PATCH 3/4] add unit test Signed-off-by: Ben Ye --- pkg/ingester/ingester.go | 3 +- pkg/ingester/ingester_test.go | 52 +++++++++++++++++++++++++++++++++++ pkg/util/metrics_helper.go | 14 ++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a814e60540f..7c96aa683ee 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1967,7 +1967,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { if maxExemplarsForUser > 0 { enableExemplars = true } - oooTimeWindow := time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds() walCompressType := wlog.CompressionNone // TODO(yeya24): expose zstd compression for WAL. if i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled { @@ -1990,7 +1989,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { MaxExemplars: maxExemplarsForUser, HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown, - OutOfOrderTimeWindow: oooTimeWindow, + OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(), OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, }, nil) if err != nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 132003f4ff9..78e43c9c8be 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -4247,3 +4247,55 @@ func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest return cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API) } + +func TestIngesterShipperUploadCompactedHotReload(t *testing.T) { + // Create ingester. + cfg := defaultIngesterTestConfig(t) + dir := t.TempDir() + blocksDir := filepath.Join(dir, "blocks") + limits := defaultLimitsTestConfig() + reg := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, blocksDir, reg) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + uid := "user-1" + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("real-%d", 1)), 1) + ctx := user.InjectOrgID(context.Background(), uid) + _, err = i.Push(ctx, req) + require.NoError(t, err) + db := i.getTSDB(uid) + _, err = db.shipper.Sync(ctx) + require.NoError(t, err) + + promReg := i.TSDBState.tsdbMetrics.regs.GetPromRegistryByUser(uid) + require.NotNil(t, promReg) + // Out of order samples disabled. Expected metric value to be 0. + expectedMetrics := ` + # HELP thanos_shipper_upload_compacted_done If 1 it means shipper uploaded all compacted blocks from the filesystem. + # TYPE thanos_shipper_upload_compacted_done gauge + thanos_shipper_upload_compacted_done 0 +` + assert.NoError(t, testutil.GatherAndCompare(promReg, strings.NewReader(expectedMetrics), "thanos_shipper_upload_compacted_done")) + + // set ooo time window in limits, and re-initialize the ingester + limits.OutOfOrderTimeWindow = model.Duration(time.Minute) + i.limits, _ = validation.NewOverrides(limits, nil) + require.NoError(t, err) + _, err = db.shipper.Sync(ctx) + require.NoError(t, err) + // Expected metric value to be 1 now as OOO time window has been set. + expectedMetrics = ` + # HELP thanos_shipper_upload_compacted_done If 1 it means shipper uploaded all compacted blocks from the filesystem. + # TYPE thanos_shipper_upload_compacted_done gauge + thanos_shipper_upload_compacted_done 1 +` + assert.NoError(t, testutil.GatherAndCompare(promReg, strings.NewReader(expectedMetrics), "thanos_shipper_upload_compacted_done")) +} diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index 532912dd945..5479b492502 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -678,6 +678,20 @@ func (r *UserRegistries) Registries() []UserRegistry { return out } +// GetPromRegistryByUser returns the Prometheus metrics registry by userID. +func (r *UserRegistries) GetPromRegistryByUser(user string) *prometheus.Registry { + r.regsMu.Lock() + defer r.regsMu.Unlock() + + for _, reg := range r.regs { + if reg.user == user { + return reg.reg + } + } + + return nil +} + func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser { data := MetricFamiliesPerUser{} for _, entry := range r.Registries() { From 8da37ffb425a882826e53781909af8b7c1c2d63a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 Oct 2023 23:27:18 -0700 Subject: [PATCH 4/4] update changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index daafc3b85e6..1ef983c07fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,7 +91,7 @@ * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 * [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 * [BUGFIX] Store-Gateway and AlertManager: Add a `wait_instance_time_out` to WaitInstanceState context to avoid waiting forever. #5581 -* [BUGFIX] Ingester: Allow shipper to upload compacted blocks when out of order samples is enabled. #5416 +* [BUGFIX] Ingester: Allow shipper to hot reload upload compacted blocks when out of order samples is enabled. #5416 ## 1.15.1 2023-04-26