Skip to content

Commit 4b8a4c1

Browse files
author
Ganesh Vernekar
committed
Merge remote-tracking branch 'upstream/master' into tokens-file
2 parents 5c33c1c + e3de878 commit 4b8a4c1

File tree

25 files changed

+555
-75
lines changed

25 files changed

+555
-75
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [CHANGE] The frontend component has been refactored to be easier to re-use. When upgrading the frontend, cache entries will be discarded and re-created with the new protobuf schema. #1734
66
* [CHANGE] Remove direct DB/API access from the ruler
77
* [CHANGE] Removed `Delta` encoding. Any old chunks with `Delta` encoding cannot be read anymore. If `ingester.chunk-encoding` is set to `Delta` the ingester will fail to start. #1706
8+
* [CHANGE] Setting `-ingester.max-transfer-retries` to 0 now disables hand-over when ingester is shutting down. Previously, zero meant infinite number of attempts. #1771
89
* [FEATURE] Global limit on the max series per user and metric #1760
910
* `-ingester.max-global-series-per-user`
1011
* `-ingester.max-global-series-per-metric`

docs/arguments.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,9 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
228228

229229
How long to wait in PENDING state during the [hand-over process](ingester-handover.md). (default 0s)
230230

231-
- `-ingester.ingester.max-transfer-retries`
231+
- `-ingester.max-transfer-retries`
232232

233-
How many times a LEAVING ingester tries to find a PENDING ingester during the [hand-over process](ingester-handover.md). Each attempt takes a second or so. (default 10)
233+
How many times a LEAVING ingester tries to find a PENDING ingester during the [hand-over process](ingester-handover.md). Each attempt takes a second or so. Negative value or zero disables hand-over process completely. (default 10)
234234

235235
- `-ingester.normalise-tokens`
236236

pkg/chunk/chunk.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,9 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error {
288288
if err != nil {
289289
return errors.Wrap(err, "when decoding chunk metadata")
290290
}
291-
if len(input)-r.Len() != int(metadataLen) {
291+
metadataRead := len(input) - r.Len()
292+
// Older versions of Cortex included the initial length word; newer versions do not.
293+
if !(metadataRead == int(metadataLen) || metadataRead == int(metadataLen)+4) {
292294
return ErrMetadataLength
293295
}
294296

pkg/chunk/schema.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
)
1414

1515
var (
16+
chunkTimeRangeKeyV1a = []byte{1}
1617
chunkTimeRangeKeyV1 = []byte{'1'}
1718
chunkTimeRangeKeyV2 = []byte{'2'}
1819
chunkTimeRangeKeyV3 = []byte{'3'}

pkg/chunk/schema_util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ func parseChunkTimeRangeValue(rangeValue []byte, value []byte) (
187187

188188
// v3 schema had four components - label name, label value, chunk ID and version.
189189
// "version" is 1 and label value is base64 encoded.
190+
// (older code wrote "version" as 1, not '1')
191+
case bytes.Equal(components[3], chunkTimeRangeKeyV1a):
192+
fallthrough
190193
case bytes.Equal(components[3], chunkTimeRangeKeyV1):
191194
chunkID = string(components[2])
192195
labelValue, err = decodeBase64Value(components[1])

pkg/cortex/modules.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/prometheus/common/route"
1313
"github.com/prometheus/prometheus/config"
1414
v1 "github.com/prometheus/prometheus/web/api/v1"
15-
"github.com/thanos-io/thanos/pkg/objstore/s3"
1615
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
1716
"github.com/weaveworks/common/middleware"
1817
"github.com/weaveworks/common/server"
@@ -198,14 +197,7 @@ func (t *Cortex) initQuerier(cfg *Config) (err error) {
198197
var store querier.ChunkStore
199198

200199
if cfg.Storage.Engine == storage.StorageEngineTSDB {
201-
s3cfg := s3.Config{
202-
Bucket: cfg.TSDB.S3.BucketName,
203-
Endpoint: cfg.TSDB.S3.Endpoint,
204-
AccessKey: cfg.TSDB.S3.AccessKeyID,
205-
SecretKey: cfg.TSDB.S3.SecretAccessKey,
206-
Insecure: cfg.TSDB.S3.Insecure,
207-
}
208-
store, err = querier.NewBlockQuerier(s3cfg, cfg.TSDB.SyncDir, prometheus.DefaultRegisterer)
200+
store, err = querier.NewBlockQuerier(cfg.TSDB, prometheus.DefaultRegisterer)
209201
if err != nil {
210202
return err
211203
}

pkg/ingester/flush.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ var (
6767
Name: "cortex_ingester_dropped_chunks_total",
6868
Help: "Total number of chunks dropped from flushing because they have too few samples.",
6969
})
70+
oldestUnflushedChunkTimestamp = promauto.NewGauge(prometheus.GaugeOpts{
71+
Name: "cortex_oldest_unflushed_chunk_timestamp_seconds",
72+
Help: "Unix timestamp of the oldest unflushed chunk in the memory",
73+
})
7074
)
7175

7276
// Flush triggers a flush of all the chunks and closes the flush queues.
@@ -110,14 +114,23 @@ func (i *Ingester) sweepUsers(immediate bool) {
110114
return
111115
}
112116

117+
oldest := model.Time(0)
118+
113119
for id, state := range i.userStates.cp() {
114120
for pair := range state.fpToSeries.iter() {
115121
state.fpLocker.Lock(pair.fp)
116122
i.sweepSeries(id, pair.fp, pair.series, immediate)
117123
i.removeFlushedChunks(state, pair.fp, pair.series)
124+
first := pair.series.firstUnflushedChunkTime()
118125
state.fpLocker.Unlock(pair.fp)
126+
127+
if first > 0 && (oldest == 0 || first < oldest) {
128+
oldest = first
129+
}
119130
}
120131
}
132+
133+
oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
121134
}
122135

123136
type flushReason int

pkg/ingester/ingester.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
102102
type Config struct {
103103
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
104104

105-
// Config for transferring chunks.
105+
// Config for transferring chunks. Zero or negative = no retries.
106106
MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`
107107

108108
// Config for chunk flushing.
@@ -134,12 +134,12 @@ type Config struct {
134134
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
135135
cfg.LifecyclerConfig.RegisterFlags(f)
136136

137-
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.")
137+
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. Negative value or zero disables hand-over.")
138138
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
139139
f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.")
140140
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.")
141141
f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 5*time.Minute, "Maximum chunk idle time before flushing.")
142-
f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-stale-chunk-idle", 0, "Maximum chunk idle time for chunks terminating in stale markers before flushing. 0 disables it and a stale series is not flushed until the max-chunk-idle timeout is reached.")
142+
f.DurationVar(&cfg.MaxStaleChunkIdle, "ingester.max-stale-chunk-idle", 0, "Maximum chunk idle time for chunks terminating in stale markers before flushing. 0 disables it and a stale series is not flushed until the max-chunk-idle timeout is reached.")
143143
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age before flushing.")
144144
f.DurationVar(&cfg.ChunkAgeJitter, "ingester.chunk-age-jitter", 20*time.Minute, "Range of time to subtract from MaxChunkAge to spread out flushes")
145145
f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge")

pkg/ingester/ingester_v2.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/cortexproject/cortex/pkg/ingester/client"
99
"github.com/cortexproject/cortex/pkg/ring"
10+
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
1011
"github.com/cortexproject/cortex/pkg/util"
1112
"github.com/cortexproject/cortex/pkg/util/validation"
1213
"github.com/go-kit/kit/log/level"
@@ -16,7 +17,6 @@ import (
1617
lbls "github.com/prometheus/prometheus/tsdb/labels"
1718
"github.com/thanos-io/thanos/pkg/block/metadata"
1819
"github.com/thanos-io/thanos/pkg/objstore"
19-
"github.com/thanos-io/thanos/pkg/objstore/s3"
2020
"github.com/thanos-io/thanos/pkg/runutil"
2121
"github.com/thanos-io/thanos/pkg/shipper"
2222
"github.com/weaveworks/common/user"
@@ -32,16 +32,7 @@ type TSDBState struct {
3232

3333
// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
3434
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) {
35-
var bkt *s3.Bucket
36-
s3Cfg := s3.Config{
37-
Bucket: cfg.TSDBConfig.S3.BucketName,
38-
Endpoint: cfg.TSDBConfig.S3.Endpoint,
39-
AccessKey: cfg.TSDBConfig.S3.AccessKeyID,
40-
SecretKey: cfg.TSDBConfig.S3.SecretAccessKey,
41-
Insecure: cfg.TSDBConfig.S3.Insecure,
42-
}
43-
var err error
44-
bkt, err = s3.NewBucketWithConfig(util.Logger, s3Cfg, "cortex")
35+
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "cortex", util.Logger)
4536
if err != nil {
4637
return nil, err
4738
}
@@ -56,7 +47,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
5647

5748
TSDBState: TSDBState{
5849
dbs: make(map[string]*tsdb.DB),
59-
bucket: bkt,
50+
bucket: bucketClient,
6051
},
6152
}
6253

pkg/ingester/lifecycle_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func defaultIngesterTestConfig() Config {
4545
cfg.LifecyclerConfig.Addr = "localhost"
4646
cfg.LifecyclerConfig.ID = "localhost"
4747
cfg.LifecyclerConfig.FinalSleep = 0
48-
cfg.MaxTransferRetries = -1
48+
cfg.MaxTransferRetries = 0
4949
return cfg
5050
}
5151

0 commit comments

Comments
 (0)