Skip to content

pkg/sdk: Make number of workers configurable #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 24, 2018
6 changes: 4 additions & 2 deletions pkg/sdk/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
// Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/
// namespace is the Namespace to watch for the resource
// TODO: support opts for specifying label selector
func Watch(apiVersion, kind, namespace string, resyncPeriod int) {
func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...WatchOption) {
resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace)
// TODO: Better error handling, e.g retry
if err != nil {
Expand All @@ -51,7 +51,9 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int) {
collector = metrics.New()
metrics.RegisterCollector(collector)
}
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector)
o := NewWatchOp()
o.applyOpts(opts)
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, o.NumWorkers)
informers = append(informers, informer)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/sdk/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ type informer struct {
context context.Context
deletedObjects map[string]interface{}
collector *metrics.Collector
numWorkers int
}

func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector) Informer {
func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector, n int) Informer {
i := &informer{
resourcePluralName: resourcePluralName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName),
namespace: namespace,
deletedObjects: map[string]interface{}{},
collector: c,
numWorkers: n,
}

resyncDuration := time.Duration(resyncPeriod) * time.Second
Expand Down Expand Up @@ -87,8 +89,7 @@ func (i *informer) Run(ctx context.Context) {
panic("Timed out waiting for caches to sync")
}

const numWorkers = 1
for n := 0; n < numWorkers; n++ {
for n := 0; n < i.numWorkers; n++ {
go wait.Until(i.runWorker, time.Second, ctx.Done())
}
<-ctx.Done()
Expand Down
49 changes: 49 additions & 0 deletions pkg/sdk/watch-opt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2018 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sdk

// WatchOp wraps all the options for Watch().
type WatchOp struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past we've needlessly exported structs and utility functions when they aren't meant to be exposed outside the sdk pkg, e.g we do this with the query and action options with NewQueryOps() etc.

But for this PR let's keep that to a minimum. We only need to export WithNumWorkers(). Everything else is internal to the pkg. So keep this unexported:

type watchOp struct {

NumWorkers int
}

// NewWatchOp create a new deafult WatchOp
func NewWatchOp() *WatchOp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unexport this as well:

func newWatchOp() *watchOp {

op := &WatchOp{}
op.setDefaults()
return op
}

func (op *WatchOp) applyOpts(opts []WatchOption) {
for _, opt := range opts {
opt(op)
}
}

func (op *WatchOp) setDefaults() {
if op.NumWorkers == 0 {
op.NumWorkers = 1
}
}

// WatchOption configures WatchOp.
type WatchOption func(*WatchOp)

// WithWatchOptions sets the number of workers for the Watch() operation.
func WithWatchOptions(numWorkers int) WatchOption {
Copy link
Contributor

@fanminshi fanminshi Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can name this function to match the meaning of the wanted option for readability. for example,
func WithNumOfWorks(numWorkers int) WatchOption is probably more intuitive.

return func(op *WatchOp) {
op.NumWorkers = numWorkers
}
}