Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ instructions below to upgrade your Postgres.
* `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup`
* [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983
* [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996
* [ENHANCEMENT] Experimental TSDB: Improved multi-tenant bucket store. #1991
* Allowed to configure the blocks sync interval via `-experimental.tsdb.bucket-store.sync-interval` (0 disables the sync)
* Limited the number of tenants concurrently synched by `-experimental.tsdb.bucket-store.block-sync-concurrency`
* Renamed `cortex_querier_sync_seconds` metric to `cortex_querier_blocks_sync_seconds`
* Track `cortex_querier_blocks_sync_seconds` metric for the initial sync too
* Fixed race condition
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
* [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921
* [BUGFIX] Reduce memory usage when ingester Push() errors. #1922
Expand Down
5 changes: 5 additions & 0 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ tsdb:
# CLI flag: -experimental.tsdb.bucket-store.sync-dir
[sync_dir: <string> | default = "tsdb-sync"]

# How frequently scan the bucket to look for changes (new blocks shipped by
# ingesters and blocks removed by retention or compaction).
# CLI flag: -experimental.tsdb.bucket-store.sync-interval
[sync_interval: <duration> | default = 5m]

# Size - in bytes - of a per-tenant in-memory index cache used to speed up
# blocks index lookups.
# CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes
Expand Down
40 changes: 8 additions & 32 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/encoding"
Expand All @@ -17,54 +16,31 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/logging"
"google.golang.org/grpc/metadata"
)

// BlockQuerier is a querier of thanos blocks
type BlockQuerier struct {
syncTimes prometheus.Histogram
us *UserStore
us *UserStore
}

// NewBlockQuerier returns a client to query a block store
func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, r prometheus.Registerer) (*BlockQuerier, error) {
b := &BlockQuerier{
syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_querier_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Buckets: prometheus.DefBuckets,
}),
}

us, err := NewUserStore(cfg, logLevel, util.Logger)
func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, registerer prometheus.Registerer) (*BlockQuerier, error) {
bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-userstore", util.Logger)
if err != nil {
return nil, err
}
b.us = us

if r != nil {
r.MustRegister(b.syncTimes, us.tsdbMetrics)
}

level.Info(util.Logger).Log("msg", "synchronizing TSDB blocks for all users")
if err := us.InitialSync(context.Background()); err != nil {
level.Warn(util.Logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
us, err := NewUserStore(cfg, bucketClient, logLevel, util.Logger, registerer)
if err != nil {
return nil, err
}
level.Info(util.Logger).Log("msg", "successfully synchronized TSDB blocks for all users")

stopc := make(chan struct{})
go runutil.Repeat(30*time.Second, stopc, func() error {
ts := time.Now()
if err := us.SyncStores(context.Background()); err != nil && err != io.EOF {
level.Warn(util.Logger).Log("msg", "sync stores failed", "err", err)
}
b.syncTimes.Observe(time.Since(ts).Seconds())
return nil
})
b := &BlockQuerier{
us: us,
}

return b, nil
}
Expand Down
Loading