diff --git a/CHANGELOG.md b/CHANGELOG.md index 57166c18cb6..2f51315ae11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ * [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422 * [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452 +## Blocksconvert + +* [ENHANCEMENT] Scheduler: ability to ignore users based on regexp, using `-scheduler.ignore-users-regex` flag. #3477 + ## 1.5.0 / 2020-11-09 ### Cortex diff --git a/tools/blocksconvert/scanner/scanner.go b/tools/blocksconvert/scanner/scanner.go index e0b06414c2f..3653c792a66 100644 --- a/tools/blocksconvert/scanner/scanner.go +++ b/tools/blocksconvert/scanner/scanner.go @@ -56,7 +56,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.KeepFiles, "scanner.keep-files", false, "Keep plan files locally after uploading.") f.IntVar(&cfg.TablesLimit, "scanner.tables-limit", 0, "Number of tables to convert. 0 = all.") f.StringVar(&cfg.AllowedUsers, "scanner.allowed-users", "", "Allowed users that can be converted, comma-separated. If set, only these users have plan files generated.") - f.StringVar(&cfg.IgnoredUserPattern, "scanner.ignore-users-regex", "", "If set and user ID matches this regex pattern, it will be ignored. Only used if all -scanner.allowed-users is not set (i.e. all users are allowed by default).") + f.StringVar(&cfg.IgnoredUserPattern, "scanner.ignore-users-regex", "", "If set and user ID matches this regex pattern, it will be ignored. Checked after applying -scanner.allowed-users, if set.") f.BoolVar(&cfg.VerifyPlans, "scanner.verify-plans", true, "Verify plans before uploading to bucket. Enabled by default for extra check. Requires extra memory for large plans.") f.Var(&cfg.PeriodStart, "scanner.scan-period-start", "If specified, this is lower end of time period to scan. Specified date is included in the range. (format: \"2006-01-02\")") f.Var(&cfg.PeriodEnd, "scanner.scan-period-end", "If specified, this is upper end of time period to scan. Specified date is not included in the range. (format: \"2006-01-02\")") @@ -115,7 +115,7 @@ func NewScanner(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg p } var ignoredUserRegex *regexp.Regexp = nil - if users.AllUsersAllowed() && cfg.IgnoredUserPattern != "" { + if cfg.IgnoredUserPattern != "" { re, err := regexp.Compile(cfg.IgnoredUserPattern) if err != nil { return nil, errors.Wrap(err, "failed to compile ignored user regex") diff --git a/tools/blocksconvert/scheduler/scheduler.go b/tools/blocksconvert/scheduler/scheduler.go index 77094f8739d..844e856527e 100644 --- a/tools/blocksconvert/scheduler/scheduler.go +++ b/tools/blocksconvert/scheduler/scheduler.go @@ -6,6 +6,7 @@ import ( "html/template" "net/http" "path" + "regexp" "sort" "strconv" "strings" @@ -31,6 +32,7 @@ type Config struct { PlanScanConcurrency int MaxProgressFileAge time.Duration AllowedUsers string + IgnoredUserPattern string } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -38,6 +40,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.PlanScanConcurrency, "scheduler.plan-scan-concurrency", 5, "Limit of concurrent plan scans.") f.DurationVar(&cfg.MaxProgressFileAge, "scheduler.max-progress-file-age", 30*time.Minute, "Progress files older than this duration are deleted.") f.StringVar(&cfg.AllowedUsers, "scheduler.allowed-users", "", "Allowed users that can be converted, comma-separated") + f.StringVar(&cfg.IgnoredUserPattern, "scheduler.ignore-users-regex", "", "If set and user ID matches this regex pattern, it will be ignored. Checked after applying -scheduler.allowed-users, if set.") } func NewScheduler(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg prometheus.Registerer, http *mux.Router, grpcServ *grpc.Server) (*Scheduler, error) { @@ -51,19 +54,29 @@ func NewScheduler(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg users = blocksconvert.ParseAllowedUsers(cfg.AllowedUsers) } - s := newSchedulerWithBucket(l, b, scfg.BucketPrefix, users, cfg, reg) + var ignoredUserRegex *regexp.Regexp = nil + if cfg.IgnoredUserPattern != "" { + re, err := regexp.Compile(cfg.IgnoredUserPattern) + if err != nil { + return nil, errors.Wrap(err, "failed to compile ignored user regex") + } + ignoredUserRegex = re + } + + s := newSchedulerWithBucket(l, b, scfg.BucketPrefix, users, ignoredUserRegex, cfg, reg) blocksconvert.RegisterSchedulerServer(grpcServ, s) http.HandleFunc("/plans", s.httpPlans) return s, nil } -func newSchedulerWithBucket(l log.Logger, b objstore.Bucket, bucketPrefix string, users blocksconvert.AllowedUsers, cfg Config, reg prometheus.Registerer) *Scheduler { +func newSchedulerWithBucket(l log.Logger, b objstore.Bucket, bucketPrefix string, users blocksconvert.AllowedUsers, ignoredUsers *regexp.Regexp, cfg Config, reg prometheus.Registerer) *Scheduler { s := &Scheduler{ log: l, cfg: cfg, bucket: b, bucketPrefix: bucketPrefix, allowedUsers: users, + ignoredUsers: ignoredUsers, planStatus: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_blocksconvert_scheduler_scanned_plans", @@ -93,6 +106,7 @@ type Scheduler struct { log log.Logger allowedUsers blocksconvert.AllowedUsers + ignoredUsers *regexp.Regexp // Can be nil. bucket objstore.Bucket bucketPrefix string @@ -141,6 +155,7 @@ func (s *Scheduler) scanBucketForPlans(ctx context.Context) error { allUsers := len(users) users = s.allowedUsers.GetAllowedUsers(users) + users = s.ignoreUsers(users) level.Info(s.log).Log("msg", "found users", "all", allUsers, "allowed", len(users)) @@ -477,3 +492,17 @@ func (s *Scheduler) updateQueuedPlansMetrics() { s.newestPlanTimestamp.Set(0) } } + +func (s *Scheduler) ignoreUsers(users []string) []string { + if s.ignoredUsers == nil { + return users + } + + result := make([]string, 0, len(users)) + for _, u := range users { + if !s.ignoredUsers.MatchString(u) { + result = append(result, u) + } + } + return result +} diff --git a/tools/blocksconvert/scheduler/scheduler_test.go b/tools/blocksconvert/scheduler/scheduler_test.go index 43f6aaa23b6..e98014e32a4 100644 --- a/tools/blocksconvert/scheduler/scheduler_test.go +++ b/tools/blocksconvert/scheduler/scheduler_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "regexp" "strings" "testing" "time" @@ -89,7 +90,11 @@ func TestSchedulerScan(t *testing.T) { require.NoError(t, bucket.Upload(context.Background(), "migration/user4/5.error", strings.NewReader(""))) require.NoError(t, bucket.Upload(context.Background(), "migration/user4/6.finished.01E8GCW9J0HV0992HSZ0N6RAMN", strings.NewReader(""))) - s := newSchedulerWithBucket(log.NewLogfmtLogger(os.Stdout), bucket, "migration", blocksconvert.AllowAllUsers, Config{ + require.NoError(t, bucket.Upload(context.Background(), "migration/ignoredUser/7.plan", strings.NewReader(""))) + + ignoredUsers := regexp.MustCompile("ignored.*") + + s := newSchedulerWithBucket(log.NewLogfmtLogger(os.Stdout), bucket, "migration", blocksconvert.AllowAllUsers, ignoredUsers, Config{ ScanInterval: 10 * time.Second, PlanScanConcurrency: 5, MaxProgressFileAge: 5 * time.Minute,