diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go index 58ebcec46fb..b8c6cf1e115 100644 --- a/pkg/generator/generator.go +++ b/pkg/generator/generator.go @@ -5,6 +5,9 @@ import ( "os" "path/filepath" "strings" + "text/template" + + "github.com/coreos/operator-sdk/pkg/templates" ) const defaultFileMode = 0750 @@ -41,7 +44,6 @@ func (g *Generator) Render() error { return err } - // pkg/apis/ groupName, apiVersion := func() (string, string) { splits := strings.Split(apiGroup, "/") return strings.Split(splits[0], ".")[0], splits[1] @@ -51,11 +53,34 @@ func (g *Generator) Render() error { return err } - // pkg/controller/ - err = os.MkdirAll(filepath.Join(projDir, "pkg/controller"), defaultFileMode) + controllerDir := filepath.Join(projDir, "pkg/controller") + err = os.MkdirAll(controllerDir, defaultFileMode) + if err != nil { + return err + } + + err = g.genWorkqueue(controllerDir) + if err != nil { + return err + } + + return nil +} + +func (g *Generator) genWorkqueue(controllerDir string) error { + f, err := os.OpenFile(filepath.Join(controllerDir, "controller.go"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, defaultFileMode) if err != nil { return err } + defer f.Close() + t, err := template.New("controller").Parse(templates.ControllerTemplate) + if err != nil { + return err + } + err = t.Execute(f, nil) + if err != nil { + return err + } return nil } diff --git a/pkg/templates/workqueue-tmpl.go b/pkg/templates/workqueue-tmpl.go new file mode 100644 index 00000000000..9bec1d84462 --- /dev/null +++ b/pkg/templates/workqueue-tmpl.go @@ -0,0 +1,118 @@ +package templates + +// TODO: fix imports + +const ControllerTemplate = ` +package controller + +type Controller struct { + indexer cache.Indexer + queue workqueue.RateLimitingInterface + informer cache.Controller +} + +func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller { + return &Controller{ + informer: informer, + indexer: indexer, + queue: queue, + } +} + +// sync is the business logic of the controller. +// In case an error happened, it has to simply return the error and will be retried after a backoff. +func (c *Controller) sync(key string) error { + obj, exists, err := c.indexer.GetByKey(key) + if err != nil { + return fmt.Errorf("Fetching object with key %s from store failed with %v", key, err) + } + + if !exists { + // We warmed up the cache, so this could only imply the object was deleted + return nil + } + + return nil +} +` + +const WorkqueueTemplate = ` +package controller + +const ( + // maxRetries is the number of times an event will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times of requeues: + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 +) + +func (c *Controller) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.queue.Get() + if quit { + return false + } + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.queue.Done(key) + + // Invoke the method containing the business logic + err := c.sync(key.(string)) + + // Handle the error if something went wrong during the execution of the business logic + c.handleErr(err, key) + return true +} + +// handleErr checks if an error happened and makes sure we will retry later. +func (c *Controller) handleErr(err error, key interface{}) { + if err == nil { + // Forget about the #AddRateLimited history of the key on every successful synchronization. + // This ensures that future processing of updates for this key is not delayed because of + // an outdated error history. + c.queue.Forget(key) + return + } + + // This controller retries maxRetries times if something goes wrong. After that, it stops trying. + if c.queue.NumRequeues(key) < maxRetries { + glog.Infof("Error syncing pod %v: %v", key, err) + + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + c.queue.AddRateLimited(key) + return + } + + c.queue.Forget(key) + // Report to an external entity that, even after several retries, we could not successfully process this key + runtime.HandleError(err) + glog.Infof("Dropping pod %q out of the queue: %v", key, err) +} + +// numWorkers is the number of goroutine workers to process events concurrently. +func (c *Controller) Run(ctx context.Context, numWorkers int) { + // Let the workers stop when we are done + defer c.queue.ShutDown() + + go c.informer.Run(stopCh) + + // Wait for the cache to be synced, before processing items from the queue is started + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + return + } + + for i := 0; i < numWorkers; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-ctx.Done() +} + +func (c *Controller) runWorker() { + for c.processNextItem() { + } +} +`