2 Copyright 2018 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
24 "k8s.io/apimachinery/pkg/runtime"
25 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/client-go/rest"
28 "k8s.io/client-go/tools/record"
29 "k8s.io/client-go/util/workqueue"
30 "sigs.k8s.io/controller-runtime/pkg/cache"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32 "sigs.k8s.io/controller-runtime/pkg/handler"
33 ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
34 "sigs.k8s.io/controller-runtime/pkg/predicate"
35 "sigs.k8s.io/controller-runtime/pkg/reconcile"
36 "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
37 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
38 "sigs.k8s.io/controller-runtime/pkg/source"
41 var log = logf.KBLog.WithName("controller")
43 var _ inject.Injector = &Controller{}
45 // Controller implements controller.Controller
46 type Controller struct {
47 // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
50 // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
51 MaxConcurrentReconciles int
53 // Reconciler is a function that can be called at any time with the Name / Namespace of an object and
54 // ensures that the state of the system matches the state specified in the object.
55 // Defaults to the DefaultReconcileFunc.
56 Do reconcile.Reconciler
58 // Client is a lazily initialized Client. The controllerManager will initialize this when Start is called.
61 // Scheme is injected by the controllerManager when controllerManager.Start is called
62 Scheme *runtime.Scheme
64 // informers are injected by the controllerManager when controllerManager.Start is called
67 // Config is the rest.Config used to talk to the apiserver. Defaults to one of in-cluster, environment variable
68 // specified, or the ~/.kube/Config.
71 // Queue is an listeningQueue that listens for events from Informers and adds object keys to
72 // the Queue for processing
73 Queue workqueue.RateLimitingInterface
75 // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
76 SetFields func(i interface{}) error
78 // mu is used to synchronize Controller setup
81 // JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
82 JitterPeriod time.Duration
84 // WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error
85 // defaults to Cache.WaitForCacheSync
86 WaitForCacheSync func(stopCh <-chan struct{}) bool
88 // Started is true if the Controller has been Started
91 // Recorder is an event recorder for recording Event resources to the
93 Recorder record.EventRecorder
95 // TODO(community): Consider initializing a logger with the Controller Name as the tag
98 // Reconcile implements reconcile.Reconciler
99 func (c *Controller) Reconcile(r reconcile.Request) (reconcile.Result, error) {
100 return c.Do.Reconcile(r)
103 // Watch implements controller.Controller
104 func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
108 // Inject Cache into arguments
109 if err := c.SetFields(src); err != nil {
112 if err := c.SetFields(evthdler); err != nil {
115 for _, pr := range prct {
116 if err := c.SetFields(pr); err != nil {
121 log.Info("Starting EventSource", "controller", c.Name, "source", src)
122 return src.Start(evthdler, c.Queue, prct...)
125 // Start implements controller.Controller
126 func (c *Controller) Start(stop <-chan struct{}) error {
129 // TODO(pwittrock): Reconsider HandleCrash
130 defer utilruntime.HandleCrash()
131 defer c.Queue.ShutDown()
133 // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
134 log.Info("Starting Controller", "controller", c.Name)
136 // Wait for the caches to be synced before starting workers
137 if c.WaitForCacheSync == nil {
138 c.WaitForCacheSync = c.Cache.WaitForCacheSync
140 if ok := c.WaitForCacheSync(stop); !ok {
141 // This code is unreachable right now since WaitForCacheSync will never return an error
142 // Leaving it here because that could happen in the future
143 err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
144 log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
149 if c.JitterPeriod == 0 {
150 c.JitterPeriod = 1 * time.Second
153 // Launch workers to process resources
154 log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
155 for i := 0; i < c.MaxConcurrentReconciles; i++ {
156 // Process work items
157 go wait.Until(func() {
158 for c.processNextWorkItem() {
160 }, c.JitterPeriod, stop)
167 log.Info("Stopping workers", "controller", c.Name)
171 // processNextWorkItem will read a single work item off the workqueue and
172 // attempt to process it, by calling the syncHandler.
173 func (c *Controller) processNextWorkItem() bool {
174 // This code copy-pasted from the sample-Controller.
176 // Update metrics after processing each item
177 reconcileStartTS := time.Now()
179 c.updateMetrics(time.Now().Sub(reconcileStartTS))
182 obj, shutdown := c.Queue.Get()
184 // Sometimes the Queue gives us nil items when it starts up
193 // We call Done here so the workqueue knows we have finished
194 // processing this item. We also must remember to call Forget if we
195 // do not want this work item being re-queued. For example, we do
196 // not call Forget if a transient error occurs, instead the item is
197 // put back on the workqueue and attempted again after a back-off
199 defer c.Queue.Done(obj)
200 var req reconcile.Request
202 if req, ok = obj.(reconcile.Request); !ok {
203 // As the item in the workqueue is actually invalid, we call
204 // Forget here else we'd go into a loop of attempting to
205 // process a work item that is invalid.
207 log.Error(nil, "Queue item was not a Request",
208 "controller", c.Name, "type", fmt.Sprintf("%T", obj), "value", obj)
209 // Return true, don't take a break
213 // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
214 // resource to be synced.
215 if result, err := c.Do.Reconcile(req); err != nil {
216 c.Queue.AddRateLimited(req)
217 log.Error(err, "Reconciler error", "controller", c.Name, "request", req)
218 ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
219 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "error").Inc()
221 } else if result.RequeueAfter > 0 {
222 c.Queue.AddAfter(req, result.RequeueAfter)
223 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue_after").Inc()
225 } else if result.Requeue {
226 c.Queue.AddRateLimited(req)
227 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue").Inc()
231 // Finally, if no error occurs we Forget this item so it does not
232 // get queued again until another change happens.
235 // TODO(directxman12): What does 1 mean? Do we want level constants? Do we want levels at all?
236 log.V(1).Info("Successfully Reconciled", "controller", c.Name, "request", req)
238 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "success").Inc()
239 // Return true, don't take a break
243 // InjectFunc implement SetFields.Injector
244 func (c *Controller) InjectFunc(f inject.Func) error {
249 // updateMetrics updates prometheus metrics within the controller
250 func (c *Controller) updateMetrics(reconcileTime time.Duration) {
251 ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())