Skip to content

Commit b3413a6

Browse files
authored
pkg/sdk: Make number of workers configurable (#418)
* add numWorkers to informer struct * added place to configure numworkers * fix unit tests * diffs are messed up * fixed template * create watchOpts struct * clarify struct vars * exported watchopts * add missing comma * add package to vars * export numworkers * proper struct access * opts * style nit * Update docs for new watch behavior * update readme * Update user guide * un-export funcs
1 parent d314923 commit b3413a6

File tree

4 files changed

+62
-5
lines changed

4 files changed

+62
-5
lines changed

doc/user-guide.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func main() {
6565
}
6666
```
6767

68+
**Note:** The number of concurrent informer workers can be configured with an additional Watch option. The default value is 1 if an argument is not given.
69+
```Go
70+
sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, sdk.WithNumWorkers(n))
71+
```
72+
6873
### Define the Memcached spec and status
6974

7075
Modify the spec and status of the `Memcached` CR at `pkg/apis/cache/v1alpha1/types.go`:

pkg/sdk/api.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ var (
4040
// Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/
4141
// namespace is the Namespace to watch for the resource
4242
// TODO: support opts for specifying label selector
43-
func Watch(apiVersion, kind, namespace string, resyncPeriod int) {
43+
func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...watchOption) {
4444
resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace)
4545
// TODO: Better error handling, e.g retry
4646
if err != nil {
@@ -51,7 +51,9 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int) {
5151
collector = metrics.New()
5252
metrics.RegisterCollector(collector)
5353
}
54-
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector)
54+
o := newWatchOp()
55+
o.applyOpts(opts)
56+
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, o.numWorkers)
5557
informers = append(informers, informer)
5658
}
5759

pkg/sdk/informer.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,17 @@ type informer struct {
4343
context context.Context
4444
deletedObjects map[string]interface{}
4545
collector *metrics.Collector
46+
numWorkers int
4647
}
4748

48-
func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector) Informer {
49+
func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector, n int) Informer {
4950
i := &informer{
5051
resourcePluralName: resourcePluralName,
5152
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName),
5253
namespace: namespace,
5354
deletedObjects: map[string]interface{}{},
5455
collector: c,
56+
numWorkers: n,
5557
}
5658

5759
resyncDuration := time.Duration(resyncPeriod) * time.Second
@@ -87,8 +89,7 @@ func (i *informer) Run(ctx context.Context) {
8789
panic("Timed out waiting for caches to sync")
8890
}
8991

90-
const numWorkers = 1
91-
for n := 0; n < numWorkers; n++ {
92+
for n := 0; n < i.numWorkers; n++ {
9293
go wait.Until(i.runWorker, time.Second, ctx.Done())
9394
}
9495
<-ctx.Done()

pkg/sdk/watch-opt.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2018 The Operator-SDK Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sdk
16+
17+
// WatchOp wraps all the options for Watch().
18+
type watchOp struct {
19+
numWorkers int
20+
}
21+
22+
// NewWatchOp create a new deafult WatchOp
23+
func newWatchOp() *watchOp {
24+
op := &watchOp{}
25+
op.setDefaults()
26+
return op
27+
}
28+
29+
func (op *watchOp) applyOpts(opts []watchOption) {
30+
for _, opt := range opts {
31+
opt(op)
32+
}
33+
}
34+
35+
func (op *watchOp) setDefaults() {
36+
if op.numWorkers == 0 {
37+
op.numWorkers = 1
38+
}
39+
}
40+
41+
// WatchOption configures WatchOp.
42+
type watchOption func(*watchOp)
43+
44+
// WithNumWorkers sets the number of workers for the Watch() operation.
45+
func WithNumWorkers(numWorkers int) watchOption {
46+
return func(op *watchOp) {
47+
op.numWorkers = numWorkers
48+
}
49+
}

0 commit comments

Comments
 (0)