From 8a73dffa905822124b705c8039be36fb43b44d8e Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Fri, 17 Aug 2018 09:23:28 -0700 Subject: [PATCH 01/18] add numWorkers to informer struct --- pkg/sdk/api.go | 2 +- pkg/sdk/informer.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index cdadc4640f5..0e3361c5d56 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -51,7 +51,7 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int) { collector = metrics.New() metrics.RegisterCollector(collector) } - informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector) + informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, 1) informers = append(informers, informer) } diff --git a/pkg/sdk/informer.go b/pkg/sdk/informer.go index c576cb99e96..9f199b0d2be 100644 --- a/pkg/sdk/informer.go +++ b/pkg/sdk/informer.go @@ -43,15 +43,17 @@ type informer struct { context context.Context deletedObjects map[string]interface{} collector *metrics.Collector + numWorkers int } -func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector) Informer { +func NewInformer(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface, resyncPeriod int, c *metrics.Collector, n int) Informer { i := &informer{ resourcePluralName: resourcePluralName, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName), namespace: namespace, deletedObjects: map[string]interface{}{}, collector: c, + numWorkers: n, } resyncDuration := time.Duration(resyncPeriod) * time.Second @@ -87,8 +89,7 @@ func (i *informer) Run(ctx context.Context) { panic("Timed out waiting for caches to sync") } - const numWorkers = 1 - for n := 0; n < numWorkers; n++ { + for n := 0; n < i.numWorkers; n++ { go wait.Until(i.runWorker, time.Second, ctx.Done()) } <-ctx.Done() From 8f96da27d8132548fe4d32d415de72ece77a40ce Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Fri, 17 Aug 2018 09:26:18 -0700 Subject: [PATCH 02/18] added place to configure numworkers --- pkg/generator/templates.go | 5 +++-- pkg/sdk/api.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index 3b818c67421..edb06aed8b3 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -159,9 +159,10 @@ func main() { if err != nil { logrus.Fatalf("failed to get watch namespace: %v", err) } - resyncPeriod := 5 + resyncPeriod := 5 + numWorkers := 1 logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - sdk.Watch(resource, kind, namespace, resyncPeriod) + sdk.Watch(resource, kind, namespace, resyncPeriod, numWorkers) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index 0e3361c5d56..04887221d65 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -40,7 +40,7 @@ var ( // Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/ // namespace is the Namespace to watch for the resource // TODO: support opts for specifying label selector -func Watch(apiVersion, kind, namespace string, resyncPeriod int) { +func Watch(apiVersion, kind, namespace string, resyncPeriod int, numWorkers int) { resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) // TODO: Better error handling, e.g retry if err != nil { @@ -51,7 +51,7 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int) { collector = metrics.New() metrics.RegisterCollector(collector) } - informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, 1) + informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, numWorkers) informers = append(informers, informer) } From 1df790252a1796728390a7f429f35e15e297b15b Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Fri, 17 Aug 2018 09:37:09 -0700 Subject: [PATCH 03/18] fix unit tests --- pkg/generator/generator_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index d93152d704a..b84c097378b 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -419,10 +419,10 @@ import ( "context" "runtime" - stub "github.com/example-inc/app-operator/pkg/stub" - sdk "github.com/operator-framework/operator-sdk/pkg/sdk" - k8sutil "github.com/operator-framework/operator-sdk/pkg/util/k8sutil" - sdkVersion "github.com/operator-framework/operator-sdk/version" + stub "{{.StubImport}}" + sdk "{{.OperatorSDKImport}}" + k8sutil "{{.K8sutilImport}}" + sdkVersion "{{.SDKVersionImport}}" "github.com/sirupsen/logrus" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -439,15 +439,16 @@ func main() { sdk.ExposeMetricsPort() - resource := "app.example.com/v1alpha1" - kind := "AppService" + resource := "{{.APIVersion}}" + kind := "{{.Kind}}" namespace, err := k8sutil.GetWatchNamespace() if err != nil { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 + numWorkers := 1 logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - sdk.Watch(resource, kind, namespace, resyncPeriod) + sdk.Watch(resource, kind, namespace, resyncPeriod, numWorkers) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } From b9603215058f9b54750f5ead6b9bfe6359f3851e Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Fri, 17 Aug 2018 09:46:09 -0700 Subject: [PATCH 04/18] diffs are messed up --- pkg/generator/templates.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index edb06aed8b3..1be1fc5aa6b 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -159,8 +159,8 @@ func main() { if err != nil { logrus.Fatalf("failed to get watch namespace: %v", err) } - resyncPeriod := 5 - numWorkers := 1 + resyncPeriod := 5 + numWorkers := 1 logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, numWorkers) sdk.Handle(stub.NewHandler()) From e7db0d90095e10b468853d6f2d25ba8cd1d56ade Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Fri, 17 Aug 2018 09:48:17 -0700 Subject: [PATCH 05/18] fixed template --- pkg/generator/generator_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index b84c097378b..09d8c9e2303 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -419,10 +419,10 @@ import ( "context" "runtime" - stub "{{.StubImport}}" - sdk "{{.OperatorSDKImport}}" - k8sutil "{{.K8sutilImport}}" - sdkVersion "{{.SDKVersionImport}}" + stub "github.com/example-inc/app-operator/pkg/stub" + sdk "github.com/operator-framework/operator-sdk/pkg/sdk" + k8sutil "github.com/operator-framework/operator-sdk/pkg/util/k8sutil" + sdkVersion "github.com/operator-framework/operator-sdk/version" "github.com/sirupsen/logrus" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -439,8 +439,8 @@ func main() { sdk.ExposeMetricsPort() - resource := "{{.APIVersion}}" - kind := "{{.Kind}}" + resource := "app.example.com/v1alpha1" + kind := "AppService" namespace, err := k8sutil.GetWatchNamespace() if err != nil { logrus.Fatalf("failed to get watch namespace: %v", err) From e0cbf5d9ca5801f80d9d34a10884dcd7ce05d5fd Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Mon, 20 Aug 2018 09:42:25 -0700 Subject: [PATCH 06/18] create watchOpts struct --- pkg/generator/generator_test.go | 6 ++++-- pkg/generator/templates.go | 6 ++++-- pkg/sdk/api.go | 8 ++++++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index 09d8c9e2303..c5ada81291b 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -446,9 +446,11 @@ func main() { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 - numWorkers := 1 + opts := sdk.watchOpts { + 1 + } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - sdk.Watch(resource, kind, namespace, resyncPeriod, numWorkers) + sdk.Watch(resource, kind, namespace, resyncPeriod, opts) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index 1be1fc5aa6b..169f12b88c1 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -160,9 +160,11 @@ func main() { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 - numWorkers := 1 + opts = sdk.watchOpts { + 1 + } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - sdk.Watch(resource, kind, namespace, resyncPeriod, numWorkers) + sdk.Watch(resource, kind, namespace, resyncPeriod, opts) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index 04887221d65..ba6fabce4eb 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -29,6 +29,10 @@ var ( collector *metrics.Collector ) +type watchOpts struct { + numWorkers int +} + // Watch watches for changes on the given resource. // apiVersion for a resource is of the format "Group/Version" except for the "Core" group whose APIVersion is just "v1". For e.g: // - Deployments have Group "apps" and Version "v1beta2" giving the APIVersion "apps/v1beta2" @@ -40,7 +44,7 @@ var ( // Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/ // namespace is the Namespace to watch for the resource // TODO: support opts for specifying label selector -func Watch(apiVersion, kind, namespace string, resyncPeriod int, numWorkers int) { +func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts watchOpts) { resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) // TODO: Better error handling, e.g retry if err != nil { @@ -51,7 +55,7 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int, numWorkers int) collector = metrics.New() metrics.RegisterCollector(collector) } - informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, numWorkers) + informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, opts.numWorkers) informers = append(informers, informer) } From 109e5e6b186dcce3005ada2d8ceb1a7e9ebbe7fc Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Mon, 20 Aug 2018 09:43:30 -0700 Subject: [PATCH 07/18] clarify struct vars --- pkg/generator/generator_test.go | 2 +- pkg/generator/templates.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index c5ada81291b..b963440d9f4 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -447,7 +447,7 @@ func main() { } resyncPeriod := 5 opts := sdk.watchOpts { - 1 + numWorkers: 1 } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index 169f12b88c1..a5b24dd4aae 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -160,8 +160,8 @@ func main() { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 - opts = sdk.watchOpts { - 1 + opts := sdk.watchOpts { + numWorkers: 1 } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) From 21e62fa48c2cd0df99de0828444fec292ecfa79b Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Mon, 20 Aug 2018 09:57:24 -0700 Subject: [PATCH 08/18] exported watchopts --- pkg/generator/generator_test.go | 2 +- pkg/generator/templates.go | 2 +- pkg/sdk/api.go | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index b963440d9f4..2a28e3dc581 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -446,7 +446,7 @@ func main() { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 - opts := sdk.watchOpts { + opts := sdk.WatchOpts { numWorkers: 1 } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index a5b24dd4aae..b88d0247849 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -160,7 +160,7 @@ func main() { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 - opts := sdk.watchOpts { + opts := sdk.WatchOpts { numWorkers: 1 } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index ba6fabce4eb..45fa8b77810 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -29,7 +29,8 @@ var ( collector *metrics.Collector ) -type watchOpts struct { +// WatchOpts allows for future extensions to Watch without damaging interface compatibility in the future +type WatchOpts struct { numWorkers int } @@ -44,7 +45,7 @@ type watchOpts struct { // Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/ // namespace is the Namespace to watch for the resource // TODO: support opts for specifying label selector -func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts watchOpts) { +func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts WatchOpts) { resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) // TODO: Better error handling, e.g retry if err != nil { From 240e151ee47493550cbdf3696ba2c3609a93d532 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Mon, 20 Aug 2018 10:06:01 -0700 Subject: [PATCH 09/18] add missing comma --- pkg/generator/generator_test.go | 2 +- pkg/generator/templates.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index 2a28e3dc581..90d30e64f4d 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -447,7 +447,7 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - numWorkers: 1 + numWorkers: 1, } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index b88d0247849..005b0b3808a 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -161,7 +161,7 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - numWorkers: 1 + numWorkers: 1, } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) From 47e97be3035e2bff052751e59fb1d9205461abfb Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Mon, 20 Aug 2018 10:14:09 -0700 Subject: [PATCH 10/18] add package to vars --- pkg/generator/generator_test.go | 2 +- pkg/generator/templates.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index 90d30e64f4d..32d44f367be 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -447,7 +447,7 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - numWorkers: 1, + sdk.numWorkers: 1, } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index 005b0b3808a..36c2ca93472 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -161,8 +161,8 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - numWorkers: 1, - } + sdk.numWorkers: 1, + } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) sdk.Handle(stub.NewHandler()) From d865b0994d1c17bc079be1c7ad189b73c09b0f40 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Mon, 20 Aug 2018 10:22:05 -0700 Subject: [PATCH 11/18] export numworkers --- pkg/generator/generator_test.go | 2 +- pkg/generator/templates.go | 2 +- pkg/sdk/api.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index 32d44f367be..33a15dd88c2 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -447,7 +447,7 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - sdk.numWorkers: 1, + sdk.NumWorkers: 1, } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index 36c2ca93472..1bb23ca0436 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -161,7 +161,7 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - sdk.numWorkers: 1, + sdk.NumWorkers: 1, } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index 45fa8b77810..424301bb6eb 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -31,7 +31,7 @@ var ( // WatchOpts allows for future extensions to Watch without damaging interface compatibility in the future type WatchOpts struct { - numWorkers int + NumWorkers int } // Watch watches for changes on the given resource. @@ -56,7 +56,7 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts WatchOpts) collector = metrics.New() metrics.RegisterCollector(collector) } - informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, opts.numWorkers) + informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, opts.NumWorkers) informers = append(informers, informer) } From 35ff87091d47ced901d650d503d30f4c8d39b9f0 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Mon, 20 Aug 2018 10:28:56 -0700 Subject: [PATCH 12/18] proper struct access --- pkg/generator/generator_test.go | 2 +- pkg/generator/templates.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index 33a15dd88c2..c0752a18a41 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -447,7 +447,7 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - sdk.NumWorkers: 1, + NumWorkers: 1, } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index 1bb23ca0436..841ea3952ce 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -161,7 +161,7 @@ func main() { } resyncPeriod := 5 opts := sdk.WatchOpts { - sdk.NumWorkers: 1, + NumWorkers: 1, } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) sdk.Watch(resource, kind, namespace, resyncPeriod, opts) From fc01926870bb85d63f25c9704d3f8baceace0443 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Tue, 21 Aug 2018 09:06:11 -0700 Subject: [PATCH 13/18] opts --- pkg/generator/generator_test.go | 5 +--- pkg/generator/templates.go | 5 +--- pkg/sdk/api.go | 11 +++----- pkg/sdk/watch-opt.go | 49 +++++++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 15 deletions(-) create mode 100644 pkg/sdk/watch-opt.go diff --git a/pkg/generator/generator_test.go b/pkg/generator/generator_test.go index c0752a18a41..d93152d704a 100644 --- a/pkg/generator/generator_test.go +++ b/pkg/generator/generator_test.go @@ -446,11 +446,8 @@ func main() { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 - opts := sdk.WatchOpts { - NumWorkers: 1, - } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - sdk.Watch(resource, kind, namespace, resyncPeriod, opts) + sdk.Watch(resource, kind, namespace, resyncPeriod) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } diff --git a/pkg/generator/templates.go b/pkg/generator/templates.go index 841ea3952ce..3b818c67421 100644 --- a/pkg/generator/templates.go +++ b/pkg/generator/templates.go @@ -160,11 +160,8 @@ func main() { logrus.Fatalf("failed to get watch namespace: %v", err) } resyncPeriod := 5 - opts := sdk.WatchOpts { - NumWorkers: 1, - } logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - sdk.Watch(resource, kind, namespace, resyncPeriod, opts) + sdk.Watch(resource, kind, namespace, resyncPeriod) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index 424301bb6eb..26e4ef019f5 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -29,11 +29,6 @@ var ( collector *metrics.Collector ) -// WatchOpts allows for future extensions to Watch without damaging interface compatibility in the future -type WatchOpts struct { - NumWorkers int -} - // Watch watches for changes on the given resource. // apiVersion for a resource is of the format "Group/Version" except for the "Core" group whose APIVersion is just "v1". For e.g: // - Deployments have Group "apps" and Version "v1beta2" giving the APIVersion "apps/v1beta2" @@ -45,7 +40,7 @@ type WatchOpts struct { // Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/ // namespace is the Namespace to watch for the resource // TODO: support opts for specifying label selector -func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts WatchOpts) { +func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...WatchOption) { resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) // TODO: Better error handling, e.g retry if err != nil { @@ -56,7 +51,9 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts WatchOpts) collector = metrics.New() metrics.RegisterCollector(collector) } - informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, opts.NumWorkers) + o := NewWatchOp() + o.applyOpts(opts) + informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, o.NumWorkers) informers = append(informers, informer) } diff --git a/pkg/sdk/watch-opt.go b/pkg/sdk/watch-opt.go new file mode 100644 index 00000000000..f52a25a1245 --- /dev/null +++ b/pkg/sdk/watch-opt.go @@ -0,0 +1,49 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdk + +// WatchOp wraps all the options for Watch(). +type WatchOp struct { + NumWorkers int +} + +// NewWatchOp create a new deafult WatchOp +func NewWatchOp() *WatchOp { + op := &WatchOp{} + op.setDefaults() + return op +} + +func (op *WatchOp) applyOpts(opts []WatchOption) { + for _, opt := range opts { + opt(op) + } +} + +func (op *WatchOp) setDefaults() { + if op.NumWorkers == 0 { + op.NumWorkers = 1 + } +} + +// WatchOption configures WatchOp. +type WatchOption func(*WatchOp) + +// WithWatchOptions sets the number of workers for the Watch() operation. +func WithWatchOptions(numWorkers int) WatchOption { + return func(op *WatchOp) { + op.NumWorkers = numWorkers + } +} From 979b2bdfe697a0e80c8cdf8bdc6d49a8365c428e Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Tue, 21 Aug 2018 10:13:34 -0700 Subject: [PATCH 14/18] style nit --- pkg/sdk/watch-opt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sdk/watch-opt.go b/pkg/sdk/watch-opt.go index f52a25a1245..835c056702d 100644 --- a/pkg/sdk/watch-opt.go +++ b/pkg/sdk/watch-opt.go @@ -41,8 +41,8 @@ func (op *WatchOp) setDefaults() { // WatchOption configures WatchOp. type WatchOption func(*WatchOp) -// WithWatchOptions sets the number of workers for the Watch() operation. -func WithWatchOptions(numWorkers int) WatchOption { +// WithNumWorkers sets the number of workers for the Watch() operation. +func WithNumWorkers(numWorkers int) WatchOption { return func(op *WatchOp) { op.NumWorkers = numWorkers } From b861b5470adda10e244ff4c21b5ab5cc3ef3c966 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Tue, 21 Aug 2018 16:38:37 -0700 Subject: [PATCH 15/18] Update docs for new watch behavior --- doc/user-guide.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/doc/user-guide.md b/doc/user-guide.md index 2ec2c36b1a5..8974cbd79a4 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -58,11 +58,20 @@ By default, the memcached-operator watches `Memcached` resource events as shown ```Go func main() { + // Default number of worker to 1 sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } ``` +```Go +func main() { + // Allow for flexible number of workers (N) >= 1 + sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, N) + sdk.Handle(stub.NewHandler()) + sdk.Run(context.TODO()) +} +``` ### Define the Memcached spec and status From 9de65d05214ba40e6ed6badb078a094d009c21d8 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Wed, 22 Aug 2018 09:52:26 -0700 Subject: [PATCH 16/18] update readme --- doc/user-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/user-guide.md b/doc/user-guide.md index 8974cbd79a4..79cb0d78e6e 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -67,7 +67,7 @@ func main() { ```Go func main() { // Allow for flexible number of workers (N) >= 1 - sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, N) + sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, sdk.WithNumWorkers(N)) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } From 6c1110bef9df8189d6ad4c0facf45f6cd27ac523 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Wed, 22 Aug 2018 16:34:52 -0700 Subject: [PATCH 17/18] Update user guide --- doc/user-guide.md | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/doc/user-guide.md b/doc/user-guide.md index 79cb0d78e6e..aa4377b7197 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -58,19 +58,15 @@ By default, the memcached-operator watches `Memcached` resource events as shown ```Go func main() { - // Default number of worker to 1 sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5) sdk.Handle(stub.NewHandler()) sdk.Run(context.TODO()) } ``` + +**Note:** The number of concurrent informer workers can be configured with an additional Watch option. The default value is 1 if an argument is not given. ```Go -func main() { - // Allow for flexible number of workers (N) >= 1 - sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, sdk.WithNumWorkers(N)) - sdk.Handle(stub.NewHandler()) - sdk.Run(context.TODO()) -} +sdk.Watch("cache.example.com/v1alpha1", "Memcached", "default", 5, sdk.WithNumWorkers(n)) ``` ### Define the Memcached spec and status From 0e175a6c1c59b8fac7d2c1889121bb3a43d5c002 Mon Sep 17 00:00:00 2001 From: Ish Shah Date: Fri, 24 Aug 2018 10:50:18 -0700 Subject: [PATCH 18/18] un-export funcs --- pkg/sdk/api.go | 6 +++--- pkg/sdk/watch-opt.go | 24 ++++++++++++------------ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index 26e4ef019f5..54970515039 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -40,7 +40,7 @@ var ( // Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/ // namespace is the Namespace to watch for the resource // TODO: support opts for specifying label selector -func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...WatchOption) { +func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...watchOption) { resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) // TODO: Better error handling, e.g retry if err != nil { @@ -51,9 +51,9 @@ func Watch(apiVersion, kind, namespace string, resyncPeriod int, opts ...WatchOp collector = metrics.New() metrics.RegisterCollector(collector) } - o := NewWatchOp() + o := newWatchOp() o.applyOpts(opts) - informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, o.NumWorkers) + informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod, collector, o.numWorkers) informers = append(informers, informer) } diff --git a/pkg/sdk/watch-opt.go b/pkg/sdk/watch-opt.go index 835c056702d..8c889eac440 100644 --- a/pkg/sdk/watch-opt.go +++ b/pkg/sdk/watch-opt.go @@ -15,35 +15,35 @@ package sdk // WatchOp wraps all the options for Watch(). -type WatchOp struct { - NumWorkers int +type watchOp struct { + numWorkers int } // NewWatchOp create a new deafult WatchOp -func NewWatchOp() *WatchOp { - op := &WatchOp{} +func newWatchOp() *watchOp { + op := &watchOp{} op.setDefaults() return op } -func (op *WatchOp) applyOpts(opts []WatchOption) { +func (op *watchOp) applyOpts(opts []watchOption) { for _, opt := range opts { opt(op) } } -func (op *WatchOp) setDefaults() { - if op.NumWorkers == 0 { - op.NumWorkers = 1 +func (op *watchOp) setDefaults() { + if op.numWorkers == 0 { + op.numWorkers = 1 } } // WatchOption configures WatchOp. -type WatchOption func(*WatchOp) +type watchOption func(*watchOp) // WithNumWorkers sets the number of workers for the Watch() operation. -func WithNumWorkers(numWorkers int) WatchOption { - return func(op *WatchOp) { - op.NumWorkers = numWorkers +func WithNumWorkers(numWorkers int) watchOption { + return func(op *watchOp) { + op.numWorkers = numWorkers } }