Skip to content

Commit e810294

Browse files
author
Thor
committed
ingester_v2: open TSDB on startup, limit the number of concurrently
opening Signed-off-by: Thor <[email protected]>
1 parent dd511dd commit e810294

File tree

4 files changed

+177
-4
lines changed

4 files changed

+177
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
* [FEATURE] EXPERIMENTAL: Added `/series` API endpoint support with TSDB blocks storage. #1830
2424
* [FEATURE] Added "multi" KV store that can interact with two other KV stores, primary one for all reads and writes, and secondary one, which only receives writes. Primary/secondary store can be modified in runtime via runtime-config mechanism (previously "overrides"). #1749
2525
* [ENHANCEMENT] Added `password` and `enable_tls` options to redis cache configuration. Enables usage of Microsoft Azure Cache for Redis service.
26+
* [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917
27+
* `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup`
2628
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
2729
* [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921
2830
* [BUGFIX] TSDB: Fixed handling of out of order/bound samples in ingesters with the experimental TSDB blocks storage. #1864

pkg/ingester/ingester_v2.go

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package ingester
22

33
import (
44
"fmt"
5+
"io"
56
"net/http"
7+
"os"
8+
"path/filepath"
69
"sync"
710
"time"
811

@@ -14,6 +17,7 @@ import (
1417
"github.com/go-kit/kit/log/level"
1518
"github.com/pkg/errors"
1619
"github.com/prometheus/client_golang/prometheus"
20+
"github.com/prometheus/prometheus/pkg/gate"
1721
"github.com/prometheus/prometheus/pkg/labels"
1822
"github.com/prometheus/prometheus/tsdb"
1923
"github.com/thanos-io/thanos/pkg/block/metadata"
@@ -73,6 +77,11 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
7377
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
7478
i.userStates = newUserStates(i.limiter, cfg)
7579

80+
// Scan and open TSDB's that already exist on disk
81+
if err := i.openExistingTSDB(context.Background()); err != nil {
82+
return nil, err
83+
}
84+
7685
// Now that user states have been created, we can start the lifecycler
7786
i.lifecycler.Start()
7887

@@ -382,11 +391,23 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)
382391
return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState)
383392
}
384393

394+
// Create the database and a shipper for a user
395+
db, err := i.createTSDB(userID)
396+
if err != nil {
397+
return nil, err
398+
}
399+
400+
// Add the db to list of user databases
401+
i.TSDBState.dbs[userID] = db
402+
return db, nil
403+
}
404+
405+
// createTSDB creates a TSDB for a given userID, and returns the created db.
406+
func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
385407
udir := i.cfg.TSDBConfig.BlocksDir(userID)
386408

387409
// Create a new user database
388-
var err error
389-
db, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
410+
db, err := tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
390411
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
391412
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
392413
NoLockfile: true,
@@ -422,8 +443,6 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)
422443
}()
423444
}
424445

425-
i.TSDBState.dbs[userID] = db
426-
427446
return db, nil
428447
}
429448

@@ -459,3 +478,73 @@ func (i *Ingester) closeAllTSDB() {
459478
i.userStatesMtx.Unlock()
460479
wg.Wait()
461480
}
481+
482+
// openExistingTSDB walks the user tsdb dir, and opens a tsdb for each user. This may start a WAL replay, so we limit the number of
483+
// concurrently opening TSDB.
484+
func (i *Ingester) openExistingTSDB(ctx context.Context) error {
485+
level.Info(util.Logger).Log("msg", "opening existing TSDBs")
486+
wg := &sync.WaitGroup{}
487+
openGate := gate.New(i.cfg.TSDBConfig.MaxTSDBOpeningConcurrencyOnStartup)
488+
489+
err := filepath.Walk(i.cfg.TSDBConfig.Dir, func(path string, info os.FileInfo, err error) error {
490+
if err != nil {
491+
return filepath.SkipDir
492+
}
493+
494+
// Skip root dir and all other files
495+
if path == i.cfg.TSDBConfig.Dir || !info.IsDir() {
496+
return nil
497+
}
498+
499+
// Top level directories are assumed to be user TSDBs
500+
userID := info.Name()
501+
f, err := os.Open(path)
502+
if err != nil {
503+
level.Error(util.Logger).Log("msg", "unable to open user TSDB dir", "err", err, "user", userID, "path", path)
504+
return filepath.SkipDir
505+
}
506+
defer f.Close()
507+
508+
// If the dir is empty skip it
509+
if _, err := f.Readdirnames(1); err != nil {
510+
if err != io.EOF {
511+
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
512+
}
513+
514+
return filepath.SkipDir
515+
}
516+
517+
// Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled.
518+
if err := openGate.Start(ctx); err != nil {
519+
return err
520+
}
521+
522+
wg.Add(1)
523+
go func(userID string) {
524+
defer wg.Done()
525+
defer openGate.Done()
526+
db, err := i.createTSDB(userID)
527+
if err != nil {
528+
level.Error(util.Logger).Log("msg", "unable to open user TSDB", "err", err, "user", userID)
529+
return
530+
}
531+
532+
// Add the database to the map of user databases
533+
i.userStatesMtx.Lock()
534+
i.TSDBState.dbs[userID] = db
535+
i.userStatesMtx.Unlock()
536+
537+
}(userID)
538+
539+
return filepath.SkipDir // Don't descend into directories
540+
})
541+
542+
// Wait for all opening routines to finish
543+
wg.Wait()
544+
if err != nil {
545+
level.Error(util.Logger).Log("msg", "error while opening existing TSDBs")
546+
} else {
547+
level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs")
548+
}
549+
return err
550+
}

pkg/ingester/ingester_v2_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"math"
77
"net/http"
88
"os"
9+
"path/filepath"
910
"strings"
1011
"testing"
1112
"time"
@@ -745,3 +746,80 @@ func newIngesterMockWithTSDBStorage(ingesterCfg Config, registerer prometheus.Re
745746

746747
return ingester, cleanup, nil
747748
}
749+
750+
func TestIngester_v2LoadTSDBOnStartup(t *testing.T) {
751+
t.Parallel()
752+
753+
tests := map[string]struct {
754+
setup func(*testing.T, string)
755+
check func(*testing.T, *Ingester)
756+
}{
757+
"empty user dir": {
758+
setup: func(t *testing.T, dir string) {
759+
require.NoError(t, os.Mkdir(filepath.Join(dir, "user0"), 0700))
760+
},
761+
check: func(t *testing.T, i *Ingester) {
762+
require.Empty(t, i.getTSDB("user0"), "tsdb created for empty user dir")
763+
},
764+
},
765+
"empty tsdbs": {
766+
setup: func(t *testing.T, dir string) {},
767+
check: func(t *testing.T, i *Ingester) {
768+
require.Zero(t, len(i.TSDBState.dbs), "user tsdb's were created on empty dir")
769+
},
770+
},
771+
"missing tsdb dir": {
772+
setup: func(t *testing.T, dir string) {
773+
require.NoError(t, os.Remove(dir))
774+
},
775+
check: func(t *testing.T, i *Ingester) {
776+
require.Zero(t, len(i.TSDBState.dbs), "user tsdb's were created on missing dir")
777+
},
778+
},
779+
"populated user dirs with unpopulated": {
780+
setup: func(t *testing.T, dir string) {
781+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
782+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
783+
require.NoError(t, os.Mkdir(filepath.Join(dir, "user2"), 0700))
784+
},
785+
check: func(t *testing.T, i *Ingester) {
786+
require.NotNil(t, i.getTSDB("user0"), "tsdb not created for non-empty user dir")
787+
require.NotNil(t, i.getTSDB("user1"), "tsdb not created for non-empty user dir")
788+
require.Empty(t, i.getTSDB("user2"), "tsdb created for empty user dir")
789+
},
790+
},
791+
}
792+
793+
for name, test := range tests {
794+
testName := name
795+
testData := test
796+
t.Run(testName, func(t *testing.T) {
797+
clientCfg := defaultClientTestConfig()
798+
limits := defaultLimitsTestConfig()
799+
800+
overrides, err := validation.NewOverrides(limits, nil)
801+
require.NoError(t, err)
802+
803+
// Create a temporary directory for TSDB
804+
tempDir, err := ioutil.TempDir("", "tsdb")
805+
require.NoError(t, err)
806+
defer os.RemoveAll(tempDir)
807+
808+
ingesterCfg := defaultIngesterTestConfig()
809+
ingesterCfg.TSDBEnabled = true
810+
ingesterCfg.TSDBConfig.Dir = tempDir
811+
ingesterCfg.TSDBConfig.Backend = "s3"
812+
ingesterCfg.TSDBConfig.S3.Endpoint = "localhost"
813+
814+
// setup the tsdbs dir
815+
testData.setup(t, tempDir)
816+
817+
ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil)
818+
require.NoError(t, err)
819+
820+
defer ingester.Shutdown()
821+
822+
testData.check(t, ingester)
823+
})
824+
}
825+
}

pkg/storage/tsdb/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ type Config struct {
3737
Backend string `yaml:"backend"`
3838
BucketStore BucketStoreConfig `yaml:"bucket_store"`
3939

40+
// MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup
41+
MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"`
42+
4043
// Backends
4144
S3 s3.Config `yaml:"s3"`
4245
GCS gcs.Config `yaml:"gcs"`
@@ -87,6 +90,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8790
f.DurationVar(&cfg.Retention, "experimental.tsdb.retention-period", 6*time.Hour, "TSDB block retention")
8891
f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 30*time.Second, "the frequency at which tsdb blocks are scanned for shipping. 0 means shipping is disabled.")
8992
f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use")
93+
f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
9094
}
9195

9296
// Validate the config

0 commit comments

Comments
 (0)