diff --git a/CHANGELOG.md b/CHANGELOG.md index 0960ef7b..ec50ff13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `JobDeleteMany` operations that remove many jobs in a single operation according to input criteria. [PR #962](https://github.com/riverqueue/river/pull/962) - Added `Client.Schema()` method to return a client's configured schema. [PR #983](https://github.com/riverqueue/river/pull/983). - Integrated riverui queries into the driver system to pave the way for multi-driver UI support. [PR #983](https://github.com/riverqueue/river/pull/983). +- Added `QueueConfig` level `FetchCooldown` and `FetchPollInterval` settings to enable queue-specific job fetch intervals. For example, a queue of high-priority jobs could be checked more often to improve responsiveness, while one with slow or time-insensitive tasks could be checked infrequently to reduce database load. [PR #994](https://github.com/riverqueue/river/pull/994). ### Changed diff --git a/client.go b/client.go index d861c56d..814712a7 100644 --- a/client.go +++ b/client.go @@ -130,6 +130,8 @@ type Config struct { // // Throughput is limited by this value. // + // Individual QueueConfig structs may override this for a specific queue. + // // Defaults to 100 ms. FetchCooldown time.Duration @@ -137,6 +139,8 @@ type Config struct { // jobs. Typically new jobs will be picked up ~immediately after insert via // LISTEN/NOTIFY, but this provides a fallback. // + // Individual QueueConfig structs may override this for a specific queue. + // // Defaults to 1 second. FetchPollInterval time.Duration @@ -484,7 +488,7 @@ func (c *Config) validate() error { } for queue, queueConfig := range c.Queues { - if err := queueConfig.validate(queue); err != nil { + if err := queueConfig.validate(queue, c.FetchCooldown, c.FetchPollInterval); err != nil { return err } } @@ -521,6 +525,25 @@ func (c *Config) willExecuteJobs() bool { // QueueConfig contains queue-specific configuration. type QueueConfig struct { + // FetchCooldown is the minimum amount of time to wait between fetches of new + // jobs. Jobs will only be fetched *at most* this often, but if no new jobs + // are coming in via LISTEN/NOTIFY then fetches may be delayed as long as + // FetchPollInterval. + // + // Throughput is limited by this value. + // + // If non-zero, this overrides the FetchCooldown setting in the Client's + // Config. + FetchCooldown time.Duration + + // FetchPollInterval is the amount of time between periodic fetches for new + // jobs. Typically new jobs will be picked up ~immediately after insert via + // LISTEN/NOTIFY, but this provides a fallback. + // + // If non-zero, this overrides the FetchCooldown setting in the Client's + // Config. + FetchPollInterval time.Duration + // MaxWorkers is the maximum number of workers to run for the queue, or put // otherwise, the maximum parallelism to run. // @@ -534,7 +557,20 @@ type QueueConfig struct { MaxWorkers int } -func (c QueueConfig) validate(queueName string) error { +func (c QueueConfig) validate(queueName string, clientFetchCooldown time.Duration, clientFetchPollInterval time.Duration) error { + if c.FetchCooldown < 0 { + return fmt.Errorf("FetchCooldown cannot be less than zero") + } + if c.FetchPollInterval < 0 { + return fmt.Errorf("FetchPollInterval cannot be less than zero") + } + + resolvedFetchCooldown := cmp.Or(c.FetchCooldown, clientFetchCooldown) + resolvedFetchPollInterval := cmp.Or(c.FetchPollInterval, clientFetchPollInterval) + if resolvedFetchPollInterval < resolvedFetchCooldown { + return fmt.Errorf("FetchPollInterval cannot be less than FetchCooldown") + } + if c.MaxWorkers < 1 || c.MaxWorkers > QueueNumWorkersMax { return fmt.Errorf("invalid number of workers for queue %q: %d", queueName, c.MaxWorkers) } @@ -691,8 +727,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } client.queues = &QueueBundle{ - addProducer: client.addProducer, - clientWillExecuteJobs: config.willExecuteJobs(), + addProducer: client.addProducer, + clientFetchCooldown: config.FetchCooldown, + clientFetchPollInterval: config.FetchPollInterval, + clientWillExecuteJobs: config.willExecuteJobs(), } baseservice.Init(archetype, &client.baseService) @@ -2022,8 +2060,8 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p ClientID: c.config.ID, Completer: c.completer, ErrorHandler: c.config.ErrorHandler, - FetchCooldown: c.config.FetchCooldown, - FetchPollInterval: c.config.FetchPollInterval, + FetchCooldown: cmp.Or(queueConfig.FetchCooldown, c.config.FetchCooldown), + FetchPollInterval: cmp.Or(queueConfig.FetchPollInterval, c.config.FetchPollInterval), HookLookupByJob: c.hookLookupByJob, HookLookupGlobal: c.hookLookupGlobal, JobTimeout: c.config.JobTimeout, @@ -2582,6 +2620,9 @@ type QueueBundle struct { // Function that adds a producer to the associated client. addProducer func(queueName string, queueConfig QueueConfig) (*producer, error) + clientFetchCooldown time.Duration + clientFetchPollInterval time.Duration + clientWillExecuteJobs bool fetchCtx context.Context //nolint:containedctx @@ -2602,7 +2643,7 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { return errors.New("client is not configured to execute jobs, cannot add queue") } - if err := queueConfig.validate(queueName); err != nil { + if err := queueConfig.validate(queueName, b.clientFetchCooldown, b.clientFetchPollInterval); err != nil { return err } diff --git a/client_test.go b/client_test.go index 6123a542..74d2add0 100644 --- a/client_test.go +++ b/client_test.go @@ -7183,6 +7183,32 @@ func Test_NewClient_Validations(t *testing.T) { name: "Queues can be empty", configFunc: func(config *Config) { config.Queues = make(map[string]QueueConfig) }, }, + { + name: "Queues FetchCooldown can be overridden", + configFunc: func(config *Config) { + config.Queues = map[string]QueueConfig{QueueDefault: {FetchCooldown: 9 * time.Millisecond, MaxWorkers: 1}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Equal(t, 9*time.Millisecond, client.producersByQueueName[QueueDefault].config.FetchCooldown) + }, + }, + { + name: "Queues FetchCooldown can't be greater than Client FetchPollInterval", + configFunc: func(config *Config) { + config.Queues = map[string]QueueConfig{QueueDefault: {FetchCooldown: 10 * time.Millisecond, MaxWorkers: 1}} + config.FetchPollInterval = 9 * time.Millisecond + }, + wantErr: fmt.Errorf("FetchPollInterval cannot be shorter than FetchCooldown (%s)", FetchCooldownDefault), + }, + { + name: "Queues FetchPollInterval can be overridden", + configFunc: func(config *Config) { + config.Queues = map[string]QueueConfig{QueueDefault: {FetchPollInterval: 9 * time.Second, MaxWorkers: 1}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Equal(t, 9*time.Second, client.producersByQueueName[QueueDefault].config.FetchPollInterval) + }, + }, { name: "Queues MaxWorkers can't be negative", configFunc: func(config *Config) {