Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / sigs.k8s.io / controller-runtime / pkg / manager / internal.go
1 /*
2 Copyright 2018 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package manager
18
19 import (
20         "context"
21         "fmt"
22         "net"
23         "net/http"
24         "sync"
25         "time"
26
27         "github.com/prometheus/client_golang/prometheus/promhttp"
28         "k8s.io/apimachinery/pkg/api/meta"
29         "k8s.io/apimachinery/pkg/runtime"
30         "k8s.io/client-go/rest"
31         "k8s.io/client-go/tools/leaderelection"
32         "k8s.io/client-go/tools/leaderelection/resourcelock"
33         "k8s.io/client-go/tools/record"
34         "sigs.k8s.io/controller-runtime/pkg/cache"
35         "sigs.k8s.io/controller-runtime/pkg/client"
36         "sigs.k8s.io/controller-runtime/pkg/metrics"
37         "sigs.k8s.io/controller-runtime/pkg/recorder"
38         "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
39         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
40         "sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
41 )
42
43 var log = logf.KBLog.WithName("manager")
44
45 type controllerManager struct {
46         // config is the rest.config used to talk to the apiserver.  Required.
47         config *rest.Config
48
49         // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates.  Defaults
50         // to scheme.scheme.
51         scheme *runtime.Scheme
52         // admissionDecoder is used to decode an admission.Request.
53         admissionDecoder types.Decoder
54
55         // runnables is the set of Controllers that the controllerManager injects deps into and Starts.
56         runnables []Runnable
57
58         cache cache.Cache
59
60         // TODO(directxman12): Provide an escape hatch to get individual indexers
61         // client is the client injected into Controllers (and EventHandlers, Sources and Predicates).
62         client client.Client
63
64         // fieldIndexes knows how to add field indexes over the Cache used by this controller,
65         // which can later be consumed via field selectors from the injected client.
66         fieldIndexes client.FieldIndexer
67
68         // recorderProvider is used to generate event recorders that will be injected into Controllers
69         // (and EventHandlers, Sources and Predicates).
70         recorderProvider recorder.Provider
71
72         // resourceLock forms the basis for leader election
73         resourceLock resourcelock.Interface
74
75         // mapper is used to map resources to kind, and map kind and version.
76         mapper meta.RESTMapper
77
78         // metricsListener is used to serve prometheus metrics
79         metricsListener net.Listener
80
81         mu      sync.Mutex
82         started bool
83         errChan chan error
84
85         // internalStop is the stop channel *actually* used by everything involved
86         // with the manager as a stop channel, so that we can pass a stop channel
87         // to things that need it off the bat (like the Channel source).  It can
88         // be closed via `internalStopper` (by being the same underlying channel).
89         internalStop <-chan struct{}
90
91         // internalStopper is the write side of the internal stop channel, allowing us to close it.
92         // It and `internalStop` should point to the same channel.
93         internalStopper chan<- struct{}
94
95         startCache func(stop <-chan struct{}) error
96 }
97
98 // Add sets dependencies on i, and adds it to the list of runnables to start.
99 func (cm *controllerManager) Add(r Runnable) error {
100         cm.mu.Lock()
101         defer cm.mu.Unlock()
102
103         // Set dependencies on the object
104         if err := cm.SetFields(r); err != nil {
105                 return err
106         }
107
108         // Add the runnable to the list
109         cm.runnables = append(cm.runnables, r)
110         if cm.started {
111                 // If already started, start the controller
112                 go func() {
113                         cm.errChan <- r.Start(cm.internalStop)
114                 }()
115         }
116
117         return nil
118 }
119
120 func (cm *controllerManager) SetFields(i interface{}) error {
121         if _, err := inject.ConfigInto(cm.config, i); err != nil {
122                 return err
123         }
124         if _, err := inject.ClientInto(cm.client, i); err != nil {
125                 return err
126         }
127         if _, err := inject.SchemeInto(cm.scheme, i); err != nil {
128                 return err
129         }
130         if _, err := inject.CacheInto(cm.cache, i); err != nil {
131                 return err
132         }
133         if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
134                 return err
135         }
136         if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
137                 return err
138         }
139         if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
140                 return err
141         }
142         return nil
143 }
144
145 func (cm *controllerManager) GetConfig() *rest.Config {
146         return cm.config
147 }
148
149 func (cm *controllerManager) GetClient() client.Client {
150         return cm.client
151 }
152
153 func (cm *controllerManager) GetScheme() *runtime.Scheme {
154         return cm.scheme
155 }
156
157 func (cm *controllerManager) GetAdmissionDecoder() types.Decoder {
158         return cm.admissionDecoder
159 }
160
161 func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
162         return cm.fieldIndexes
163 }
164
165 func (cm *controllerManager) GetCache() cache.Cache {
166         return cm.cache
167 }
168
169 func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
170         return cm.recorderProvider.GetEventRecorderFor(name)
171 }
172
173 func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
174         return cm.mapper
175 }
176
177 func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
178         handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
179                 ErrorHandling: promhttp.HTTPErrorOnError,
180         })
181         // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
182         mux := http.NewServeMux()
183         mux.Handle("/metrics", handler)
184         server := http.Server{
185                 Handler: mux,
186         }
187         // Run the server
188         go func() {
189                 if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
190                         cm.errChan <- err
191                 }
192         }()
193
194         // Shutdown the server when stop is closed
195         select {
196         case <-stop:
197                 if err := server.Shutdown(context.Background()); err != nil {
198                         cm.errChan <- err
199                 }
200         }
201 }
202
203 func (cm *controllerManager) Start(stop <-chan struct{}) error {
204         // join the passed-in stop channel as an upstream feeding into cm.internalStopper
205         defer close(cm.internalStopper)
206
207         // Metrics should be served whether the controller is leader or not.
208         // (If we don't serve metrics for non-leaders, prometheus will still scrape
209         // the pod but will get a connection refused)
210         if cm.metricsListener != nil {
211                 go cm.serveMetrics(cm.internalStop)
212         }
213
214         if cm.resourceLock != nil {
215                 err := cm.startLeaderElection()
216                 if err != nil {
217                         return err
218                 }
219         } else {
220                 go cm.start()
221         }
222
223         select {
224         case <-stop:
225                 // We are done
226                 return nil
227         case err := <-cm.errChan:
228                 // Error starting a controller
229                 return err
230         }
231 }
232
233 func (cm *controllerManager) start() {
234         cm.mu.Lock()
235         defer cm.mu.Unlock()
236
237         // Start the Cache. Allow the function to start the cache to be mocked out for testing
238         if cm.startCache == nil {
239                 cm.startCache = cm.cache.Start
240         }
241         go func() {
242                 if err := cm.startCache(cm.internalStop); err != nil {
243                         cm.errChan <- err
244                 }
245         }()
246
247         // Wait for the caches to sync.
248         // TODO(community): Check the return value and write a test
249         cm.cache.WaitForCacheSync(cm.internalStop)
250
251         // Start the runnables after the cache has synced
252         for _, c := range cm.runnables {
253                 // Controllers block, but we want to return an error if any have an error starting.
254                 // Write any Start errors to a channel so we can return them
255                 ctrl := c
256                 go func() {
257                         cm.errChan <- ctrl.Start(cm.internalStop)
258                 }()
259         }
260
261         cm.started = true
262 }
263
264 func (cm *controllerManager) startLeaderElection() (err error) {
265         l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
266                 Lock: cm.resourceLock,
267                 // Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
268                 // TODO(joelspeed): These timings should be configurable
269                 LeaseDuration: 15 * time.Second,
270                 RenewDeadline: 10 * time.Second,
271                 RetryPeriod:   2 * time.Second,
272                 Callbacks: leaderelection.LeaderCallbacks{
273                         OnStartedLeading: func(_ context.Context) {
274                                 cm.start()
275                         },
276                         OnStoppedLeading: func() {
277                                 // Most implementations of leader election log.Fatal() here.
278                                 // Since Start is wrapped in log.Fatal when called, we can just return
279                                 // an error here which will cause the program to exit.
280                                 cm.errChan <- fmt.Errorf("leader election lost")
281                         },
282                 },
283         })
284         if err != nil {
285                 return err
286         }
287
288         // Start the leader elector process
289         go l.Run(context.Background())
290         return nil
291 }