diff --git a/doc/user-guide.md b/doc/user-guide.md index 2ec2c36b1a5..aa4377b7197 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -64,6 +64,11 @@ func main() { } ``` +**Note:** The number of concurrent informer workers can be configured with an additional Watch option. The default value is 1 if an argument is not given. +```Go +sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, sdk.WithNumWorkers(n)) +``` + ### Define the Memcached spec and status Modify the spec and status of the `Memcached` CR at `pkg/apis/cache/v1alpha1/types.go`: diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index cdadc4640f5..54970515039 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -40,7 +40,7 @@ var ( // Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/ // namespace is the Namespace to watch for the resource // TODO: support opts for specifying label selector -func Watch(apiVersion, kind, namespace string, resyncPeriod int) { +func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...watchOption) { resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) // TODO: Better error handling, e.g retry if err != nil { @@ -51,7 +51,9 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int) { collector = metrics.New() metrics.RegisterCollector(collector) } - informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector) + o := newWatchOp() + o.applyOpts(opts) + informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, o.numWorkers) informers = append(informers, informer) } diff --git a/pkg/sdk/informer.go b/pkg/sdk/informer.go index c576cb99e96..9f199b0d2be 100644 --- a/pkg/sdk/informer.go +++ b/pkg/sdk/informer.go @@ -43,15 +43,17 @@ type informer struct { context context.Context deletedObjects map[string]interface{} collector *metrics.Collector + numWorkers int } -func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector) Informer { +func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector, n int) Informer { i := &informer{ resourcePluralName: resourcePluralName, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName), namespace: namespace, deletedObjects: map[string]interface{}{}, collector: c, + numWorkers: n, } resyncDuration := time.Duration(resyncPeriod) * time.Second @@ -87,8 +89,7 @@ func (i *informer) Run(ctx context.Context) { panic("Timed out waiting for caches to sync") } - const numWorkers = 1 - for n := 0; n < numWorkers; n++ { + for n := 0; n < i.numWorkers; n++ { go wait.Until(i.runWorker, time.Second, ctx.Done()) } <-ctx.Done() diff --git a/pkg/sdk/watch-opt.go b/pkg/sdk/watch-opt.go new file mode 100644 index 00000000000..8c889eac440 --- /dev/null +++ b/pkg/sdk/watch-opt.go @@ -0,0 +1,49 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdk + +// WatchOp wraps all the options for Watch(). +type watchOp struct { + numWorkers int +} + +// NewWatchOp create a new deafult WatchOp +func newWatchOp() *watchOp { + op := &watchOp{} + op.setDefaults() + return op +} + +func (op *watchOp) applyOpts(opts []watchOption) { + for _, opt := range opts { + opt(op) + } +} + +func (op *watchOp) setDefaults() { + if op.numWorkers == 0 { + op.numWorkers = 1 + } +} + +// WatchOption configures WatchOp. +type watchOption func(*watchOp) + +// WithNumWorkers sets the number of workers for the Watch() operation. +func WithNumWorkers(numWorkers int) watchOption { + return func(op *watchOp) { + op.numWorkers = numWorkers + } +}