Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* [ENHANCEMENT] Query-frontend / Querier / Ruler: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis and is enforced in the query-frontend, querier and ruler. #3452 #3458
* [ENHANCEMENT] Querier: added `-querier.query-store-for-labels-enabled` to query store for series API. Only works with blocks storage engine. #3461
* [ENHANCEMENT] Ingester: exposed `-blocks-storage.tsdb.wal-segment-size-bytes` config option to customise the TSDB WAL segment max size. #3476
* [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483
* [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ compactor:
# CLI flag: -compactor.compaction-concurrency
[compaction_concurrency: <int> | default = 1]

# Max number of tenants for which blocks should be cleaned up concurrently
# (deletion of blocks previously marked for deletion).
# CLI flag: -compactor.cleanup-concurrency
[cleanup_concurrency: <int> | default = 20]

# Time before a block marked for deletion is deleted from bucket. If not 0,
# blocks will be marked for deletion and compactor component will delete
# blocks marked for deletion from the bucket. If delete-delay is 0, blocks
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3865,6 +3865,11 @@ The `compactor_config` configures the compactor for the blocks storage.
# CLI flag: -compactor.compaction-concurrency
[compaction_concurrency: <int> | default = 1]

# Max number of tenants for which blocks should be cleaned up concurrently
# (deletion of blocks previously marked for deletion).
# CLI flag: -compactor.cleanup-concurrency
[cleanup_concurrency: <int> | default = 20]

# Time before a block marked for deletion is deleted from bucket. If not 0,
# blocks will be marked for deletion and compactor component will delete blocks
# marked for deletion from the bucket. If delete-delay is 0, blocks will be
Expand Down
20 changes: 5 additions & 15 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand All @@ -27,6 +27,7 @@ type BlocksCleanerConfig struct {
MetaSyncConcurrency int
DeletionDelay time.Duration
CleanupInterval time.Duration
CleanupConcurrency int
}

type BlocksCleaner struct {
Expand Down Expand Up @@ -120,20 +121,9 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context) error {
return errors.Wrap(err, "failed to discover users from bucket")
}

errs := tsdb_errors.NewMulti()
for _, userID := range users {
// Ensure the context has not been canceled (ie. shutdown has been triggered).
if ctx.Err() != nil {
return ctx.Err()
}

if err = c.cleanUser(ctx, userID); err != nil {
errs.Add(errors.Wrapf(err, "failed to delete user blocks (user: %s)", userID))
continue
}
}

return errs.Err()
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
return errors.Wrapf(c.cleanUser(ctx, userID), "failed to delete user blocks (user: %s)", userID)
})
}

func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error {
Expand Down
27 changes: 26 additions & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compactor
import (
"context"
"crypto/rand"
"fmt"
"io/ioutil"
"os"
"path"
Expand All @@ -22,6 +23,18 @@ import (
)

func TestBlocksCleaner(t *testing.T) {
for _, concurrency := range []int{1, 2, 10} {
concurrency := concurrency

t.Run(fmt.Sprintf("concurrency=%d", concurrency), func(t *testing.T) {
t.Parallel()

testBlocksCleanerWithConcurrency(t, concurrency)
})
}
}

func testBlocksCleanerWithConcurrency(t *testing.T, concurrency int) {
// Create a temporary directory for local storage.
storageDir, err := ioutil.TempDir(os.TempDir(), "storage")
require.NoError(t, err)
Expand All @@ -46,17 +59,21 @@ func TestBlocksCleaner(t *testing.T) {
block4 := ulid.MustNew(4, rand.Reader)
block5 := ulid.MustNew(5, rand.Reader)
block6 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 40, 50, nil)
block7 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 10, 20, nil)
block8 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 40, 50, nil)
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block2, now.Add(-deletionDelay).Add(time.Hour)) // Block hasn't reached the deletion threshold yet.
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block3, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block4, now.Add(-deletionDelay).Add(time.Hour)) // Partial block hasn't reached the deletion threshold yet.
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block5, now.Add(-deletionDelay).Add(-time.Hour)) // Partial block reached the deletion threshold.
require.NoError(t, bucketClient.Delete(ctx, path.Join("user-1", block6.String(), metadata.MetaFilename))) // Partial block without deletion mark.
createDeletionMark(t, filepath.Join(storageDir, "user-2"), block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.

cfg := BlocksCleanerConfig{
DataDir: dataDir,
MetaSyncConcurrency: 10,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: concurrency,
}

logger := log.NewNopLogger()
Expand All @@ -80,6 +97,14 @@ func TestBlocksCleaner(t *testing.T) {
require.NoError(t, err)
assert.False(t, exists)

exists, err = bucketClient.Exists(ctx, path.Join("user-2", block7.String(), metadata.MetaFilename))
require.NoError(t, err)
assert.False(t, exists)

exists, err = bucketClient.Exists(ctx, path.Join("user-2", block8.String(), metadata.MetaFilename))
require.NoError(t, err)
assert.True(t, exists)

// Should delete a partial block with deletion mark who hasn't reached the deletion threshold yet.
exists, err = bucketClient.Exists(ctx, path.Join("user-1", block4.String(), metadata.DeletionMarkFilename))
require.NoError(t, err)
Expand All @@ -98,6 +123,6 @@ func TestBlocksCleaner(t *testing.T) {
assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted))
assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted))
assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed))
assert.Equal(t, float64(3), testutil.ToFloat64(cleaner.blocksCleanedTotal))
assert.Equal(t, float64(4), testutil.ToFloat64(cleaner.blocksCleanedTotal))
assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.blocksFailedTotal))
}
13 changes: 12 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"hash/fnv"
"math/rand"
"path"
"strings"
"time"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Config struct {
CompactionInterval time.Duration `yaml:"compaction_interval"`
CompactionRetries int `yaml:"compaction_retries"`
CompactionConcurrency int `yaml:"compaction_concurrency"`
CleanupConcurrency int `yaml:"cleanup_concurrency"`
DeletionDelay time.Duration `yaml:"deletion_delay"`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
Expand Down Expand Up @@ -70,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
f.IntVar(&cfg.CompactionConcurrency, "compactor.compaction-concurrency", 1, "Max number of concurrent compactions running.")
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks should be cleaned up concurrently (deletion of blocks previously marked for deletion).")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
Expand Down Expand Up @@ -270,7 +273,8 @@ func (c *Compactor) starting(ctx context.Context) error {
DataDir: c.compactorCfg.DataDir,
MetaSyncConcurrency: c.compactorCfg.MetaSyncConcurrency,
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CompactionInterval, 0.05),
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CompactionInterval, 0.1),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased a bit the jitter.

CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
}, c.bucketClient, c.usersScanner, c.parentLogger, c.registerer)

// Ensure an initial cleanup occurred before starting the compactor.
Expand Down Expand Up @@ -344,6 +348,13 @@ func (c *Compactor) compactUsers(ctx context.Context) error {
}
level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users))

// When starting multiple compactor replicas nearly at the same time, running in a cluster with
// a large number of tenants, we may end up in a situation where the 1st user is compacted by
// multiple replicas at the same time. Shuffling users helps reduce the likelihood this will happen.
rand.Shuffle(len(users), func(i, j int) {
users[i], users[j] = users[j], users[i]
})

errs := tsdb_errors.NewMulti()

for _, userID := range users {
Expand Down
20 changes: 10 additions & 10 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package compactor

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -32,6 +31,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
// Ensure a plan has been executed for the blocks of each user.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2)

assert.Equal(t, []string{
assert.ElementsMatch(t, []string{
`level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`,
`level=info component=cleaner org_id=user-1 msg="started cleaning of blocks marked for deletion"`,
`level=info component=cleaner org_id=user-1 msg="cleaning of blocks marked for deletion done"`,
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
// Only one user's block is compacted.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 1)

assert.Equal(t, []string{
assert.ElementsMatch(t, []string{
`level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`,
`level=info component=cleaner org_id=user-1 msg="started cleaning of blocks marked for deletion"`,
`level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json bucket=mock`,
Expand Down Expand Up @@ -621,7 +621,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
// Ensure a plan has been executed for the blocks of each user.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2)

assert.Equal(t, []string{
assert.ElementsMatch(t, []string{
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=info component=compactor msg="compactor is ACTIVE in the ring"`,
`level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`,
Expand Down Expand Up @@ -671,8 +671,8 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
kvstore := consul.NewInMemoryClient(ring.GetCodec())

// Create two compactors
compactors := []*Compactor{}
logs := []*bytes.Buffer{}
var compactors []*Compactor
var logs []*concurrency.SyncBuffer

for i := 1; i <= 2; i++ {
cfg := prepareConfig()
Expand Down Expand Up @@ -808,9 +808,9 @@ func createDeletionMark(t *testing.T, dir string, blockID ulid.ULID, deletionTim
require.NoError(t, ioutil.WriteFile(markPath, []byte(content), os.ModePerm))
}

func findCompactorByUserID(compactors []*Compactor, logs []*bytes.Buffer, userID string) (*Compactor, *bytes.Buffer, error) {
func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
var compactor *Compactor
var log *bytes.Buffer
var log *concurrency.SyncBuffer

for i, c := range compactors {
owned, err := c.ownUser(userID)
Expand Down Expand Up @@ -858,7 +858,7 @@ func prepareConfig() Config {
return compactorCfg
}

func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *bytes.Buffer, prometheus.Gatherer, func()) {
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *concurrency.SyncBuffer, prometheus.Gatherer, func()) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&storageCfg)

Expand All @@ -872,7 +872,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*
}

tsdbCompactor := &tsdbCompactorMock{}
logs := &bytes.Buffer{}
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)
registry := prometheus.NewRegistry()

Expand Down
24 changes: 2 additions & 22 deletions pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package frontend

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand All @@ -10,7 +9,6 @@ import (
"net/url"
strconv "strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -29,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -94,25 +93,6 @@ func TestFrontend_RequestHostHeaderWhenDownstreamURLIsConfigured(t *testing.T) {
testFrontend(t, config, nil, test, true, nil)
}

type syncBuf struct {
mu sync.Mutex
buf bytes.Buffer
}

func (sb *syncBuf) Write(p []byte) (n int, err error) {
sb.mu.Lock()
defer sb.mu.Unlock()

return sb.buf.Write(p)
}

func (sb *syncBuf) String() string {
sb.mu.Lock()
defer sb.mu.Unlock()

return sb.buf.String()
}

func TestFrontend_LogsSlowQueriesFormValues(t *testing.T) {
// Create an HTTP server listening locally. This server mocks the downstream
// Prometheus API-compatible server.
Expand All @@ -134,7 +114,7 @@ func TestFrontend_LogsSlowQueriesFormValues(t *testing.T) {
config.Handler.LogQueriesLongerThan = 1 * time.Microsecond
config.DownstreamURL = fmt.Sprintf("http://%s", downstreamListen.Addr())

var buf syncBuf
var buf concurrency.SyncBuffer
l := log.NewLogfmtLogger(&buf)

test := func(addr string) {
Expand Down
Loading