Skip to content

*: generate controller struct #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions pkg/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"os"
"path/filepath"
"strings"
"text/template"

"github.com/coreos/operator-sdk/pkg/templates"
)

const defaultFileMode = 0750
Expand Down Expand Up @@ -41,7 +44,6 @@ func (g *Generator) Render() error {
return err
}

// pkg/apis/
groupName, apiVersion := func() (string, string) {
splits := strings.Split(apiGroup, "/")
return strings.Split(splits[0], ".")[0], splits[1]
Expand All @@ -51,11 +53,34 @@ func (g *Generator) Render() error {
return err
}

// pkg/controller/
err = os.MkdirAll(filepath.Join(projDir, "pkg/controller"), defaultFileMode)
controllerDir := filepath.Join(projDir, "pkg/controller")
err = os.MkdirAll(controllerDir, defaultFileMode)
if err != nil {
return err
}

err = g.genWorkqueue(controllerDir)
if err != nil {
return err
}

return nil
}

func (g *Generator) genWorkqueue(controllerDir string) error {
f, err := os.OpenFile(filepath.Join(controllerDir, "controller.go"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, defaultFileMode)
if err != nil {
return err
}
defer f.Close()

t, err := template.New("controller").Parse(templates.ControllerTemplate)
if err != nil {
return err
}
err = t.Execute(f, nil)
if err != nil {
return err
}
return nil
}
118 changes: 118 additions & 0 deletions pkg/templates/workqueue-tmpl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package templates

// TODO: fix imports

const ControllerTemplate = `
package controller

type Controller struct {
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
}

func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
informer: informer,
indexer: indexer,
queue: queue,
}
}

// sync is the business logic of the controller.
// In case an error happened, it has to simply return the error and will be retried after a backoff.
func (c *Controller) sync(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
return fmt.Errorf("Fetching object with key %s from store failed with %v", key, err)
}

if !exists {
// We warmed up the cache, so this could only imply the object was deleted
return nil
}

return nil
}
`

const WorkqueueTemplate = `
package controller

const (
// maxRetries is the number of times an event will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times of requeues:
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)

func (c *Controller) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)

// Invoke the method containing the business logic
err := c.sync(key.(string))

// Handle the error if something went wrong during the execution of the business logic
c.handleErr(err, key)
return true
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
c.queue.Forget(key)
return
}

// This controller retries maxRetries times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < maxRetries {
glog.Infof("Error syncing pod %v: %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}

c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
runtime.HandleError(err)
glog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

// numWorkers is the number of goroutine workers to process events concurrently.
func (c *Controller) Run(ctx context.Context, numWorkers int) {
// Let the workers stop when we are done
defer c.queue.ShutDown()

go c.informer.Run(stopCh)

// Wait for the cache to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}

for i := 0; i < numWorkers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

<-ctx.Done()
}

func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
`