diff --git a/CHANGELOG.md b/CHANGELOG.md index 57166c18cb6..25b6e90b2ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ * [ENHANCEMENT] Query-frontend / Querier / Ruler: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis and is enforced in the query-frontend, querier and ruler. #3452 #3458 * [ENHANCEMENT] Querier: added `-querier.query-store-for-labels-enabled` to query store for series API. Only works with blocks storage engine. #3461 * [ENHANCEMENT] Ingester: exposed `-blocks-storage.tsdb.wal-segment-size-bytes` config option to customise the TSDB WAL segment max size. #3476 +* [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483 +* [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483 * [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423 * [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422 * [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 2e050fee6a1..f07b1f1c76e 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -114,6 +114,11 @@ compactor: # CLI flag: -compactor.compaction-concurrency [compaction_concurrency: | default = 1] + # Max number of tenants for which blocks should be cleaned up concurrently + # (deletion of blocks previously marked for deletion). + # CLI flag: -compactor.cleanup-concurrency + [cleanup_concurrency: | default = 20] + # Time before a block marked for deletion is deleted from bucket. If not 0, # blocks will be marked for deletion and compactor component will delete # blocks marked for deletion from the bucket. If delete-delay is 0, blocks diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 950d198f111..8e25c448f49 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3865,6 +3865,11 @@ The `compactor_config` configures the compactor for the blocks storage. # CLI flag: -compactor.compaction-concurrency [compaction_concurrency: | default = 1] +# Max number of tenants for which blocks should be cleaned up concurrently +# (deletion of blocks previously marked for deletion). +# CLI flag: -compactor.cleanup-concurrency +[cleanup_concurrency: | default = 20] + # Time before a block marked for deletion is deleted from bucket. If not 0, # blocks will be marked for deletion and compactor component will delete blocks # marked for deletion from the bucket. If delete-delay is 0, blocks will be diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 981a83f22c0..215f8dfe346 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -11,7 +11,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -19,6 +18,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -27,6 +27,7 @@ type BlocksCleanerConfig struct { MetaSyncConcurrency int DeletionDelay time.Duration CleanupInterval time.Duration + CleanupConcurrency int } type BlocksCleaner struct { @@ -120,20 +121,9 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context) error { return errors.Wrap(err, "failed to discover users from bucket") } - errs := tsdb_errors.NewMulti() - for _, userID := range users { - // Ensure the context has not been canceled (ie. shutdown has been triggered). - if ctx.Err() != nil { - return ctx.Err() - } - - if err = c.cleanUser(ctx, userID); err != nil { - errs.Add(errors.Wrapf(err, "failed to delete user blocks (user: %s)", userID)) - continue - } - } - - return errs.Err() + return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { + return errors.Wrapf(c.cleanUser(ctx, userID), "failed to delete user blocks (user: %s)", userID) + }) } func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index cd4137bf20e..13a481217f7 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -3,6 +3,7 @@ package compactor import ( "context" "crypto/rand" + "fmt" "io/ioutil" "os" "path" @@ -22,6 +23,18 @@ import ( ) func TestBlocksCleaner(t *testing.T) { + for _, concurrency := range []int{1, 2, 10} { + concurrency := concurrency + + t.Run(fmt.Sprintf("concurrency=%d", concurrency), func(t *testing.T) { + t.Parallel() + + testBlocksCleanerWithConcurrency(t, concurrency) + }) + } +} + +func testBlocksCleanerWithConcurrency(t *testing.T, concurrency int) { // Create a temporary directory for local storage. storageDir, err := ioutil.TempDir(os.TempDir(), "storage") require.NoError(t, err) @@ -46,17 +59,21 @@ func TestBlocksCleaner(t *testing.T) { block4 := ulid.MustNew(4, rand.Reader) block5 := ulid.MustNew(5, rand.Reader) block6 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 40, 50, nil) + block7 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 10, 20, nil) + block8 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 40, 50, nil) createDeletionMark(t, filepath.Join(storageDir, "user-1"), block2, now.Add(-deletionDelay).Add(time.Hour)) // Block hasn't reached the deletion threshold yet. createDeletionMark(t, filepath.Join(storageDir, "user-1"), block3, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold. createDeletionMark(t, filepath.Join(storageDir, "user-1"), block4, now.Add(-deletionDelay).Add(time.Hour)) // Partial block hasn't reached the deletion threshold yet. createDeletionMark(t, filepath.Join(storageDir, "user-1"), block5, now.Add(-deletionDelay).Add(-time.Hour)) // Partial block reached the deletion threshold. require.NoError(t, bucketClient.Delete(ctx, path.Join("user-1", block6.String(), metadata.MetaFilename))) // Partial block without deletion mark. + createDeletionMark(t, filepath.Join(storageDir, "user-2"), block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold. cfg := BlocksCleanerConfig{ DataDir: dataDir, MetaSyncConcurrency: 10, DeletionDelay: deletionDelay, CleanupInterval: time.Minute, + CleanupConcurrency: concurrency, } logger := log.NewNopLogger() @@ -80,6 +97,14 @@ func TestBlocksCleaner(t *testing.T) { require.NoError(t, err) assert.False(t, exists) + exists, err = bucketClient.Exists(ctx, path.Join("user-2", block7.String(), metadata.MetaFilename)) + require.NoError(t, err) + assert.False(t, exists) + + exists, err = bucketClient.Exists(ctx, path.Join("user-2", block8.String(), metadata.MetaFilename)) + require.NoError(t, err) + assert.True(t, exists) + // Should delete a partial block with deletion mark who hasn't reached the deletion threshold yet. exists, err = bucketClient.Exists(ctx, path.Join("user-1", block4.String(), metadata.DeletionMarkFilename)) require.NoError(t, err) @@ -98,6 +123,6 @@ func TestBlocksCleaner(t *testing.T) { assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted)) assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted)) assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed)) - assert.Equal(t, float64(3), testutil.ToFloat64(cleaner.blocksCleanedTotal)) + assert.Equal(t, float64(4), testutil.ToFloat64(cleaner.blocksCleanedTotal)) assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.blocksFailedTotal)) } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index c1f6478fc81..89d6c16ef6a 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "hash/fnv" + "math/rand" "path" "strings" "time" @@ -38,6 +39,7 @@ type Config struct { CompactionInterval time.Duration `yaml:"compaction_interval"` CompactionRetries int `yaml:"compaction_retries"` CompactionConcurrency int `yaml:"compaction_concurrency"` + CleanupConcurrency int `yaml:"cleanup_concurrency"` DeletionDelay time.Duration `yaml:"deletion_delay"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` @@ -70,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs") f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval") f.IntVar(&cfg.CompactionConcurrency, "compactor.compaction-concurrency", 1, "Max number of concurrent compactions running.") + f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks should be cleaned up concurrently (deletion of blocks previously marked for deletion).") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+ @@ -270,7 +273,8 @@ func (c *Compactor) starting(ctx context.Context) error { DataDir: c.compactorCfg.DataDir, MetaSyncConcurrency: c.compactorCfg.MetaSyncConcurrency, DeletionDelay: c.compactorCfg.DeletionDelay, - CleanupInterval: util.DurationWithJitter(c.compactorCfg.CompactionInterval, 0.05), + CleanupInterval: util.DurationWithJitter(c.compactorCfg.CompactionInterval, 0.1), + CleanupConcurrency: c.compactorCfg.CleanupConcurrency, }, c.bucketClient, c.usersScanner, c.parentLogger, c.registerer) // Ensure an initial cleanup occurred before starting the compactor. @@ -344,6 +348,13 @@ func (c *Compactor) compactUsers(ctx context.Context) error { } level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users)) + // When starting multiple compactor replicas nearly at the same time, running in a cluster with + // a large number of tenants, we may end up in a situation where the 1st user is compacted by + // multiple replicas at the same time. Shuffling users helps reduce the likelihood this will happen. + rand.Shuffle(len(users), func(i, j int) { + users[i], users[j] = users[j], users[i] + }) + errs := tsdb_errors.NewMulti() for _, userID := range users { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 5dc53c8bb7f..d4090bcaa45 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1,7 +1,6 @@ package compactor import ( - "bytes" "context" "encoding/json" "errors" @@ -32,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" @@ -406,7 +406,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // Ensure a plan has been executed for the blocks of each user. tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2) - assert.Equal(t, []string{ + assert.ElementsMatch(t, []string{ `level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`, `level=info component=cleaner org_id=user-1 msg="started cleaning of blocks marked for deletion"`, `level=info component=cleaner org_id=user-1 msg="cleaning of blocks marked for deletion done"`, @@ -517,7 +517,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { // Only one user's block is compacted. tsdbCompactor.AssertNumberOfCalls(t, "Plan", 1) - assert.Equal(t, []string{ + assert.ElementsMatch(t, []string{ `level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`, `level=info component=cleaner org_id=user-1 msg="started cleaning of blocks marked for deletion"`, `level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json bucket=mock`, @@ -621,7 +621,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni // Ensure a plan has been executed for the blocks of each user. tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2) - assert.Equal(t, []string{ + assert.ElementsMatch(t, []string{ `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, `level=info component=compactor msg="compactor is ACTIVE in the ring"`, `level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`, @@ -671,8 +671,8 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM kvstore := consul.NewInMemoryClient(ring.GetCodec()) // Create two compactors - compactors := []*Compactor{} - logs := []*bytes.Buffer{} + var compactors []*Compactor + var logs []*concurrency.SyncBuffer for i := 1; i <= 2; i++ { cfg := prepareConfig() @@ -808,9 +808,9 @@ func createDeletionMark(t *testing.T, dir string, blockID ulid.ULID, deletionTim require.NoError(t, ioutil.WriteFile(markPath, []byte(content), os.ModePerm)) } -func findCompactorByUserID(compactors []*Compactor, logs []*bytes.Buffer, userID string) (*Compactor, *bytes.Buffer, error) { +func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) { var compactor *Compactor - var log *bytes.Buffer + var log *concurrency.SyncBuffer for i, c := range compactors { owned, err := c.ownUser(userID) @@ -858,7 +858,7 @@ func prepareConfig() Config { return compactorCfg } -func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *bytes.Buffer, prometheus.Gatherer, func()) { +func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *concurrency.SyncBuffer, prometheus.Gatherer, func()) { storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) @@ -872,7 +872,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (* } tsdbCompactor := &tsdbCompactorMock{} - logs := &bytes.Buffer{} + logs := &concurrency.SyncBuffer{} logger := log.NewLogfmtLogger(logs) registry := prometheus.NewRegistry() diff --git a/pkg/frontend/frontend_test.go b/pkg/frontend/frontend_test.go index e3bf45c02dc..dc2ad654db7 100644 --- a/pkg/frontend/frontend_test.go +++ b/pkg/frontend/frontend_test.go @@ -1,7 +1,6 @@ package frontend import ( - "bytes" "context" "fmt" "io/ioutil" @@ -10,7 +9,6 @@ import ( "net/url" strconv "strconv" "strings" - "sync" "testing" "time" @@ -29,6 +27,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" + "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -94,25 +93,6 @@ func TestFrontend_RequestHostHeaderWhenDownstreamURLIsConfigured(t *testing.T) { testFrontend(t, config, nil, test, true, nil) } -type syncBuf struct { - mu sync.Mutex - buf bytes.Buffer -} - -func (sb *syncBuf) Write(p []byte) (n int, err error) { - sb.mu.Lock() - defer sb.mu.Unlock() - - return sb.buf.Write(p) -} - -func (sb *syncBuf) String() string { - sb.mu.Lock() - defer sb.mu.Unlock() - - return sb.buf.String() -} - func TestFrontend_LogsSlowQueriesFormValues(t *testing.T) { // Create an HTTP server listening locally. This server mocks the downstream // Prometheus API-compatible server. @@ -134,7 +114,7 @@ func TestFrontend_LogsSlowQueriesFormValues(t *testing.T) { config.Handler.LogQueriesLongerThan = 1 * time.Microsecond config.DownstreamURL = fmt.Sprintf("http://%s", downstreamListen.Addr()) - var buf syncBuf + var buf concurrency.SyncBuffer l := log.NewLogfmtLogger(&buf) test := func(addr string) { diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index a5087aa26b6..ce2b1e1cdc5 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -32,6 +32,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" @@ -1339,11 +1340,11 @@ func (i *Ingester) shipBlocks(ctx context.Context) { // Number of concurrent workers is limited in order to avoid to concurrently sync a lot // of tenants in a large cluster. - i.runConcurrentUserWorkers(ctx, i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(userID string) { + _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(ctx context.Context, userID string) error { // Get the user's DB. If the user doesn't exist, we skip it. userDB := i.getTSDB(userID) if userDB == nil || userDB.shipper == nil { - return + return nil } // Run the shipper's Sync() to upload unshipped blocks. @@ -1352,6 +1353,8 @@ func (i *Ingester) shipBlocks(ctx context.Context) { } else { level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) } + + return nil }) } @@ -1391,16 +1394,16 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { } } - i.runConcurrentUserWorkers(ctx, i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(userID string) { + _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(ctx context.Context, userID string) error { userDB := i.getTSDB(userID) if userDB == nil { - return + return nil } // Don't do anything, if there is nothing to compact. h := userDB.Head() if h.NumSeries() == 0 { - return + return nil } var err error @@ -1429,39 +1432,9 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { } else { level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID, "compactReason", reason) } - }) -} - -func (i *Ingester) runConcurrentUserWorkers(ctx context.Context, concurrency int, userFunc func(userID string)) { - wg := sync.WaitGroup{} - ch := make(chan string) - - for ix := 0; ix < concurrency; ix++ { - wg.Add(1) - go func() { - defer wg.Done() - - for userID := range ch { - userFunc(userID) - } - }() - } - -sendLoop: - for _, userID := range i.getTSDBUsers() { - select { - case ch <- userID: - // ok - case <-ctx.Done(): - // don't start new tasks. - break sendLoop - } - } - - close(ch) - // wait for ongoing workers to finish. - wg.Wait() + return nil + }) } // This method will flush all data. It is called as part of Lifecycler's shutdown (if flush on shutdown is configured), or from the flusher. diff --git a/pkg/util/concurrency/buffer.go b/pkg/util/concurrency/buffer.go new file mode 100644 index 00000000000..7110fb8c4a6 --- /dev/null +++ b/pkg/util/concurrency/buffer.go @@ -0,0 +1,25 @@ +package concurrency + +import ( + "bytes" + "sync" +) + +type SyncBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (sb *SyncBuffer) Write(p []byte) (n int, err error) { + sb.mu.Lock() + defer sb.mu.Unlock() + + return sb.buf.Write(p) +} + +func (sb *SyncBuffer) String() string { + sb.mu.Lock() + defer sb.mu.Unlock() + + return sb.buf.String() +} diff --git a/pkg/util/concurrency/runner.go b/pkg/util/concurrency/runner.go new file mode 100644 index 00000000000..9a18ade8dc0 --- /dev/null +++ b/pkg/util/concurrency/runner.go @@ -0,0 +1,64 @@ +package concurrency + +import ( + "context" + "sync" + + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" +) + +// ForEachUser runs the provided userFunc for each userIDs up to concurrency concurrent workers. +// In case userFunc returns error, it will continue to process remaining users but returns an +// error with all errors userFunc has returned. +func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error) error { + wg := sync.WaitGroup{} + ch := make(chan string) + + // Keep track of all errors occurred. + errs := tsdb_errors.NewMulti() + errsMx := sync.Mutex{} + + for ix := 0; ix < concurrency; ix++ { + wg.Add(1) + go func() { + defer wg.Done() + + for userID := range ch { + // Ensure the context has not been canceled (ie. shutdown has been triggered). + if ctx.Err() != nil { + break + } + + if err := userFunc(ctx, userID); err != nil { + errsMx.Lock() + errs.Add(err) + errsMx.Unlock() + } + } + }() + } + +sendLoop: + for _, userID := range userIDs { + select { + case ch <- userID: + // ok + case <-ctx.Done(): + // don't start new tasks. + break sendLoop + } + } + + close(ch) + + // wait for ongoing workers to finish. + wg.Wait() + + if ctx.Err() != nil { + return ctx.Err() + } + + errsMx.Lock() + defer errsMx.Unlock() + return errs.Err() +}