Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / sigs.k8s.io / controller-runtime / pkg / internal / controller / controller.go
1 /*
2 Copyright 2018 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package controller
18
19 import (
20         "fmt"
21         "sync"
22         "time"
23
24         "k8s.io/apimachinery/pkg/runtime"
25         utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26         "k8s.io/apimachinery/pkg/util/wait"
27         "k8s.io/client-go/rest"
28         "k8s.io/client-go/tools/record"
29         "k8s.io/client-go/util/workqueue"
30         "sigs.k8s.io/controller-runtime/pkg/cache"
31         "sigs.k8s.io/controller-runtime/pkg/client"
32         "sigs.k8s.io/controller-runtime/pkg/handler"
33         ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
34         "sigs.k8s.io/controller-runtime/pkg/predicate"
35         "sigs.k8s.io/controller-runtime/pkg/reconcile"
36         "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
37         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
38         "sigs.k8s.io/controller-runtime/pkg/source"
39 )
40
41 var log = logf.KBLog.WithName("controller")
42
43 var _ inject.Injector = &Controller{}
44
45 // Controller implements controller.Controller
46 type Controller struct {
47         // Name is used to uniquely identify a Controller in tracing, logging and monitoring.  Name is required.
48         Name string
49
50         // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
51         MaxConcurrentReconciles int
52
53         // Reconciler is a function that can be called at any time with the Name / Namespace of an object and
54         // ensures that the state of the system matches the state specified in the object.
55         // Defaults to the DefaultReconcileFunc.
56         Do reconcile.Reconciler
57
58         // Client is a lazily initialized Client.  The controllerManager will initialize this when Start is called.
59         Client client.Client
60
61         // Scheme is injected by the controllerManager when controllerManager.Start is called
62         Scheme *runtime.Scheme
63
64         // informers are injected by the controllerManager when controllerManager.Start is called
65         Cache cache.Cache
66
67         // Config is the rest.Config used to talk to the apiserver.  Defaults to one of in-cluster, environment variable
68         // specified, or the ~/.kube/Config.
69         Config *rest.Config
70
71         // Queue is an listeningQueue that listens for events from Informers and adds object keys to
72         // the Queue for processing
73         Queue workqueue.RateLimitingInterface
74
75         // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
76         SetFields func(i interface{}) error
77
78         // mu is used to synchronize Controller setup
79         mu sync.Mutex
80
81         // JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
82         JitterPeriod time.Duration
83
84         // WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error
85         // defaults to Cache.WaitForCacheSync
86         WaitForCacheSync func(stopCh <-chan struct{}) bool
87
88         // Started is true if the Controller has been Started
89         Started bool
90
91         // Recorder is an event recorder for recording Event resources to the
92         // Kubernetes API.
93         Recorder record.EventRecorder
94
95         // TODO(community): Consider initializing a logger with the Controller Name as the tag
96 }
97
98 // Reconcile implements reconcile.Reconciler
99 func (c *Controller) Reconcile(r reconcile.Request) (reconcile.Result, error) {
100         return c.Do.Reconcile(r)
101 }
102
103 // Watch implements controller.Controller
104 func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
105         c.mu.Lock()
106         defer c.mu.Unlock()
107
108         // Inject Cache into arguments
109         if err := c.SetFields(src); err != nil {
110                 return err
111         }
112         if err := c.SetFields(evthdler); err != nil {
113                 return err
114         }
115         for _, pr := range prct {
116                 if err := c.SetFields(pr); err != nil {
117                         return err
118                 }
119         }
120
121         log.Info("Starting EventSource", "controller", c.Name, "source", src)
122         return src.Start(evthdler, c.Queue, prct...)
123 }
124
125 // Start implements controller.Controller
126 func (c *Controller) Start(stop <-chan struct{}) error {
127         c.mu.Lock()
128
129         // TODO(pwittrock): Reconsider HandleCrash
130         defer utilruntime.HandleCrash()
131         defer c.Queue.ShutDown()
132
133         // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
134         log.Info("Starting Controller", "controller", c.Name)
135
136         // Wait for the caches to be synced before starting workers
137         if c.WaitForCacheSync == nil {
138                 c.WaitForCacheSync = c.Cache.WaitForCacheSync
139         }
140         if ok := c.WaitForCacheSync(stop); !ok {
141                 // This code is unreachable right now since WaitForCacheSync will never return an error
142                 // Leaving it here because that could happen in the future
143                 err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
144                 log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
145                 c.mu.Unlock()
146                 return err
147         }
148
149         if c.JitterPeriod == 0 {
150                 c.JitterPeriod = 1 * time.Second
151         }
152
153         // Launch workers to process resources
154         log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
155         for i := 0; i < c.MaxConcurrentReconciles; i++ {
156                 // Process work items
157                 go wait.Until(func() {
158                         for c.processNextWorkItem() {
159                         }
160                 }, c.JitterPeriod, stop)
161         }
162
163         c.Started = true
164         c.mu.Unlock()
165
166         <-stop
167         log.Info("Stopping workers", "controller", c.Name)
168         return nil
169 }
170
171 // processNextWorkItem will read a single work item off the workqueue and
172 // attempt to process it, by calling the syncHandler.
173 func (c *Controller) processNextWorkItem() bool {
174         // This code copy-pasted from the sample-Controller.
175
176         // Update metrics after processing each item
177         reconcileStartTS := time.Now()
178         defer func() {
179                 c.updateMetrics(time.Now().Sub(reconcileStartTS))
180         }()
181
182         obj, shutdown := c.Queue.Get()
183         if obj == nil {
184                 // Sometimes the Queue gives us nil items when it starts up
185                 c.Queue.Forget(obj)
186         }
187
188         if shutdown {
189                 // Stop working
190                 return false
191         }
192
193         // We call Done here so the workqueue knows we have finished
194         // processing this item. We also must remember to call Forget if we
195         // do not want this work item being re-queued. For example, we do
196         // not call Forget if a transient error occurs, instead the item is
197         // put back on the workqueue and attempted again after a back-off
198         // period.
199         defer c.Queue.Done(obj)
200         var req reconcile.Request
201         var ok bool
202         if req, ok = obj.(reconcile.Request); !ok {
203                 // As the item in the workqueue is actually invalid, we call
204                 // Forget here else we'd go into a loop of attempting to
205                 // process a work item that is invalid.
206                 c.Queue.Forget(obj)
207                 log.Error(nil, "Queue item was not a Request",
208                         "controller", c.Name, "type", fmt.Sprintf("%T", obj), "value", obj)
209                 // Return true, don't take a break
210                 return true
211         }
212
213         // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
214         // resource to be synced.
215         if result, err := c.Do.Reconcile(req); err != nil {
216                 c.Queue.AddRateLimited(req)
217                 log.Error(err, "Reconciler error", "controller", c.Name, "request", req)
218                 ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
219                 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "error").Inc()
220                 return false
221         } else if result.RequeueAfter > 0 {
222                 c.Queue.AddAfter(req, result.RequeueAfter)
223                 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue_after").Inc()
224                 return true
225         } else if result.Requeue {
226                 c.Queue.AddRateLimited(req)
227                 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue").Inc()
228                 return true
229         }
230
231         // Finally, if no error occurs we Forget this item so it does not
232         // get queued again until another change happens.
233         c.Queue.Forget(obj)
234
235         // TODO(directxman12): What does 1 mean?  Do we want level constants?  Do we want levels at all?
236         log.V(1).Info("Successfully Reconciled", "controller", c.Name, "request", req)
237
238         ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "success").Inc()
239         // Return true, don't take a break
240         return true
241 }
242
243 // InjectFunc implement SetFields.Injector
244 func (c *Controller) InjectFunc(f inject.Func) error {
245         c.SetFields = f
246         return nil
247 }
248
249 // updateMetrics updates prometheus metrics within the controller
250 func (c *Controller) updateMetrics(reconcileTime time.Duration) {
251         ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
252 }