|
| 1 | +package templates |
| 2 | + |
| 3 | +// TODO: fix imports |
| 4 | + |
| 5 | +const ControllerTemplate = ` |
| 6 | +package controller |
| 7 | +
|
| 8 | +type Controller struct { |
| 9 | + indexer cache.Indexer |
| 10 | + queue workqueue.RateLimitingInterface |
| 11 | + informer cache.Controller |
| 12 | +} |
| 13 | +
|
| 14 | +func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller { |
| 15 | + return &Controller{ |
| 16 | + informer: informer, |
| 17 | + indexer: indexer, |
| 18 | + queue: queue, |
| 19 | + } |
| 20 | +} |
| 21 | +
|
| 22 | +// sync is the business logic of the controller. |
| 23 | +// In case an error happened, it has to simply return the error and will be retried after a backoff. |
| 24 | +func (c *Controller) sync(key string) error { |
| 25 | + obj, exists, err := c.indexer.GetByKey(key) |
| 26 | + if err != nil { |
| 27 | + return fmt.Errorf("Fetching object with key %s from store failed with %v", key, err) |
| 28 | + } |
| 29 | +
|
| 30 | + if !exists { |
| 31 | + // We warmed up the cache, so this could only imply the object was deleted |
| 32 | + return nil |
| 33 | + } |
| 34 | + |
| 35 | + return nil |
| 36 | +} |
| 37 | +` |
| 38 | + |
| 39 | +const WorkqueueTemplate = ` |
| 40 | +package controller |
| 41 | +
|
| 42 | +const ( |
| 43 | + // maxRetries is the number of times an event will be retried before it is dropped out of the queue. |
| 44 | + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times of requeues: |
| 45 | + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s |
| 46 | + maxRetries = 15 |
| 47 | +) |
| 48 | +
|
| 49 | +func (c *Controller) processNextItem() bool { |
| 50 | + // Wait until there is a new item in the working queue |
| 51 | + key, quit := c.queue.Get() |
| 52 | + if quit { |
| 53 | + return false |
| 54 | + } |
| 55 | + // Tell the queue that we are done with processing this key. This unblocks the key for other workers |
| 56 | + // This allows safe parallel processing because two pods with the same key are never processed in |
| 57 | + // parallel. |
| 58 | + defer c.queue.Done(key) |
| 59 | +
|
| 60 | + // Invoke the method containing the business logic |
| 61 | + err := c.sync(key.(string)) |
| 62 | +
|
| 63 | + // Handle the error if something went wrong during the execution of the business logic |
| 64 | + c.handleErr(err, key) |
| 65 | + return true |
| 66 | +} |
| 67 | +
|
| 68 | +// handleErr checks if an error happened and makes sure we will retry later. |
| 69 | +func (c *Controller) handleErr(err error, key interface{}) { |
| 70 | + if err == nil { |
| 71 | + // Forget about the #AddRateLimited history of the key on every successful synchronization. |
| 72 | + // This ensures that future processing of updates for this key is not delayed because of |
| 73 | + // an outdated error history. |
| 74 | + c.queue.Forget(key) |
| 75 | + return |
| 76 | + } |
| 77 | +
|
| 78 | + // This controller retries maxRetries times if something goes wrong. After that, it stops trying. |
| 79 | + if c.queue.NumRequeues(key) < maxRetries { |
| 80 | + glog.Infof("Error syncing pod %v: %v", key, err) |
| 81 | +
|
| 82 | + // Re-enqueue the key rate limited. Based on the rate limiter on the |
| 83 | + // queue and the re-enqueue history, the key will be processed later again. |
| 84 | + c.queue.AddRateLimited(key) |
| 85 | + return |
| 86 | + } |
| 87 | +
|
| 88 | + c.queue.Forget(key) |
| 89 | + // Report to an external entity that, even after several retries, we could not successfully process this key |
| 90 | + runtime.HandleError(err) |
| 91 | + glog.Infof("Dropping pod %q out of the queue: %v", key, err) |
| 92 | +} |
| 93 | +
|
| 94 | +// numWorkers is the number of goroutine workers to process events concurrently. |
| 95 | +func (c *Controller) Run(ctx context.Context, numWorkers int) { |
| 96 | + // Let the workers stop when we are done |
| 97 | + defer c.queue.ShutDown() |
| 98 | +
|
| 99 | + go c.informer.Run(stopCh) |
| 100 | +
|
| 101 | + // Wait for the cache to be synced, before processing items from the queue is started |
| 102 | + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { |
| 103 | + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) |
| 104 | + return |
| 105 | + } |
| 106 | +
|
| 107 | + for i := 0; i < numWorkers; i++ { |
| 108 | + go wait.Until(c.runWorker, time.Second, stopCh) |
| 109 | + } |
| 110 | +
|
| 111 | + <-ctx.Done() |
| 112 | +} |
| 113 | +
|
| 114 | +func (c *Controller) runWorker() { |
| 115 | + for c.processNextItem() { |
| 116 | + } |
| 117 | +} |
| 118 | +` |
0 commit comments