Skip to content

pkg/sdk: Make number of workers configurable #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 24, 2018
5 changes: 5 additions & 0 deletions doc/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func main() {
}
```

**Note:** The number of concurrent informer workers can be configured with an additional Watch option. The default value is 1 if an argument is not given.
```Go
sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, sdk.WithNumWorkers(n))
```

### Define the Memcached spec and status

Modify the spec and status of the `Memcached` CR at `pkg/apis/cache/v1alpha1/types.go`:
Expand Down
6 changes: 4 additions & 2 deletions pkg/sdk/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
// Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/
// namespace is the Namespace to watch for the resource
// TODO: support opts for specifying label selector
func Watch(apiVersion, kind, namespace string, resyncPeriod int) {
func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...watchOption) {
resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace)
// TODO: Better error handling, e.g retry
if err != nil {
Expand All @@ -51,7 +51,9 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int) {
collector = metrics.New()
metrics.RegisterCollector(collector)
}
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector)
o := newWatchOp()
o.applyOpts(opts)
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, o.numWorkers)
informers = append(informers, informer)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/sdk/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ type informer struct {
context context.Context
deletedObjects map[string]interface{}
collector *metrics.Collector
numWorkers int
}

func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector) Informer {
func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector, n int) Informer {
i := &informer{
resourcePluralName: resourcePluralName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName),
namespace: namespace,
deletedObjects: map[string]interface{}{},
collector: c,
numWorkers: n,
}

resyncDuration := time.Duration(resyncPeriod) * time.Second
Expand Down Expand Up @@ -87,8 +89,7 @@ func (i *informer) Run(ctx context.Context) {
panic("Timed out waiting for caches to sync")
}

const numWorkers = 1
for n := 0; n < numWorkers; n++ {
for n := 0; n < i.numWorkers; n++ {
go wait.Until(i.runWorker, time.Second, ctx.Done())
}
<-ctx.Done()
Expand Down
49 changes: 49 additions & 0 deletions pkg/sdk/watch-opt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2018 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sdk

// WatchOp wraps all the options for Watch().
type watchOp struct {
numWorkers int
}

// NewWatchOp create a new deafult WatchOp
func newWatchOp() *watchOp {
op := &watchOp{}
op.setDefaults()
return op
}

func (op *watchOp) applyOpts(opts []watchOption) {
for _, opt := range opts {
opt(op)
}
}

func (op *watchOp) setDefaults() {
if op.numWorkers == 0 {
op.numWorkers = 1
}
}

// WatchOption configures WatchOp.
type watchOption func(*watchOp)

// WithNumWorkers sets the number of workers for the Watch() operation.
func WithNumWorkers(numWorkers int) watchOption {
return func(op *watchOp) {
op.numWorkers = numWorkers
}
}