Skip to content

Commit 56adef2

Browse files
committed
Refactoring after review
Signed-off-by: Marco Pracucci <[email protected]>
1 parent 7bb171e commit 56adef2

File tree

4 files changed

+17
-45
lines changed

4 files changed

+17
-45
lines changed

pkg/compactor/compactor.go

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,40 +44,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4444
cfg.retryMinBackoff = 10 * time.Second
4545
cfg.retryMaxBackoff = time.Minute
4646

47-
f.Var(
48-
&cfg.BlockRanges,
49-
"compactor.block-ranges",
50-
"Comma separated list of compaction ranges expressed in the time duration format")
51-
52-
f.DurationVar(
53-
&cfg.ConsistencyDelay,
54-
"compactor.consistency-delay",
55-
30*time.Minute,
56-
fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval))
57-
58-
f.IntVar(
59-
&cfg.BlockSyncConcurrency,
60-
"compactor.block-sync-concurrency",
61-
20,
62-
"Number of goroutines to use when syncing block metadata from object storage")
63-
64-
f.StringVar(
65-
&cfg.DataDir,
66-
"compactor.data-dir",
67-
"./data",
68-
"Data directory in which to cache blocks and process compactions")
69-
70-
f.DurationVar(
71-
&cfg.CompactionInterval,
72-
"compactor.compaction-interval",
73-
time.Hour,
74-
"The frequency at which the compaction runs")
75-
76-
f.IntVar(
77-
&cfg.CompactionRetries,
78-
"compactor.compaction-retries",
79-
3,
80-
"How many times to retry a failed compaction during a single compaction interval")
47+
f.Var(&cfg.BlockRanges, "compactor.block-ranges", "Comma separated list of compaction ranges expressed in the time duration format")
48+
f.DurationVar(&cfg.ConsistencyDelay, "compactor.consistency-delay", 30*time.Minute, fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval))
49+
f.IntVar(&cfg.BlockSyncConcurrency, "compactor.block-sync-concurrency", 20, "Number of goroutines to use when syncing block metadata from object storage")
50+
f.StringVar(&cfg.DataDir, "compactor.data-dir", "./data", "Data directory in which to cache blocks and process compactions")
51+
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
52+
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
8153
}
8254

8355
// Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
@@ -116,7 +88,7 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.Config, logger log
11688
return nil, errors.Wrap(err, "failed to create the bucket client")
11789
}
11890

119-
tsdbCompactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMillisecond(), downsample.NewPool())
91+
tsdbCompactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMilliseconds(), downsample.NewPool())
12092
if err != nil {
12193
cancelCtx()
12294
return nil, errors.Wrap(err, "failed to create TSDB compactor")
@@ -205,7 +177,7 @@ func (c *Compactor) compactUsersWithRetries(ctx context.Context) {
205177
c.compactionRunsStarted.Inc()
206178

207179
for retries.Ongoing() {
208-
if err := c.compactUsers(ctx); err == nil {
180+
if success := c.compactUsers(ctx); success {
209181
c.compactionRunsCompleted.Inc()
210182
return
211183
}
@@ -216,20 +188,20 @@ func (c *Compactor) compactUsersWithRetries(ctx context.Context) {
216188
c.compactionRunsFailed.Inc()
217189
}
218190

219-
func (c *Compactor) compactUsers(ctx context.Context) error {
191+
func (c *Compactor) compactUsers(ctx context.Context) bool {
220192
level.Info(c.logger).Log("msg", "discovering users from bucket")
221193
users, err := c.discoverUsers(ctx)
222194
if err != nil {
223195
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
224-
return err
196+
return false
225197
}
226198
level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users))
227199

228200
for _, userID := range users {
229201
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
230202
if ctx.Err() != nil {
231203
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err)
232-
return ctx.Err()
204+
return false
233205
}
234206

235207
level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)
@@ -242,7 +214,7 @@ func (c *Compactor) compactUsers(ctx context.Context) error {
242214
level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID)
243215
}
244216

245-
return nil
217+
return true
246218
}
247219

248220
func (c *Compactor) compactUser(ctx context.Context, userID string) error {

pkg/ingester/ingester_v2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
426426
// Create a new user database
427427
db, err := tsdb.Open(udir, util.Logger, tsdbPromReg, &tsdb.Options{
428428
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
429-
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecond(),
429+
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMilliseconds(),
430430
NoLockfile: true,
431431
})
432432
if err != nil {

pkg/storage/tsdb/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ func (d *DurationList) Set(s string) error {
7272
return nil
7373
}
7474

75-
// ToMillisecond returns the duration list in milliseconds
76-
func (d *DurationList) ToMillisecond() []int64 {
75+
// ToMilliseconds returns the duration list in milliseconds
76+
func (d *DurationList) ToMilliseconds() []int64 {
7777
values := make([]int64, 0, len(*d))
7878
for _, t := range *d {
79-
values = append(values, int64(t/time.Millisecond))
79+
values = append(values, t.Milliseconds())
8080
}
8181

8282
return values

pkg/storage/tsdb/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestConfig_DurationList(t *testing.T) {
8686

8787
t.Run(name, func(t *testing.T) {
8888
testdata.f(&testdata.cfg)
89-
assert.Equal(t, testdata.expectedRanges, testdata.cfg.BlockRanges.ToMillisecond())
89+
assert.Equal(t, testdata.expectedRanges, testdata.cfg.BlockRanges.ToMilliseconds())
9090
})
9191
}
9292
}

0 commit comments

Comments
 (0)