2 Copyright 2018 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
27 "github.com/prometheus/client_golang/prometheus/promhttp"
28 "k8s.io/apimachinery/pkg/api/meta"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/client-go/rest"
31 "k8s.io/client-go/tools/leaderelection"
32 "k8s.io/client-go/tools/leaderelection/resourcelock"
33 "k8s.io/client-go/tools/record"
34 "sigs.k8s.io/controller-runtime/pkg/cache"
35 "sigs.k8s.io/controller-runtime/pkg/client"
36 "sigs.k8s.io/controller-runtime/pkg/metrics"
37 "sigs.k8s.io/controller-runtime/pkg/recorder"
38 "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
39 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
40 "sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
43 var log = logf.KBLog.WithName("manager")
45 type controllerManager struct {
46 // config is the rest.config used to talk to the apiserver. Required.
49 // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults
51 scheme *runtime.Scheme
52 // admissionDecoder is used to decode an admission.Request.
53 admissionDecoder types.Decoder
55 // runnables is the set of Controllers that the controllerManager injects deps into and Starts.
60 // TODO(directxman12): Provide an escape hatch to get individual indexers
61 // client is the client injected into Controllers (and EventHandlers, Sources and Predicates).
64 // fieldIndexes knows how to add field indexes over the Cache used by this controller,
65 // which can later be consumed via field selectors from the injected client.
66 fieldIndexes client.FieldIndexer
68 // recorderProvider is used to generate event recorders that will be injected into Controllers
69 // (and EventHandlers, Sources and Predicates).
70 recorderProvider recorder.Provider
72 // resourceLock forms the basis for leader election
73 resourceLock resourcelock.Interface
75 // mapper is used to map resources to kind, and map kind and version.
76 mapper meta.RESTMapper
78 // metricsListener is used to serve prometheus metrics
79 metricsListener net.Listener
85 // internalStop is the stop channel *actually* used by everything involved
86 // with the manager as a stop channel, so that we can pass a stop channel
87 // to things that need it off the bat (like the Channel source). It can
88 // be closed via `internalStopper` (by being the same underlying channel).
89 internalStop <-chan struct{}
91 // internalStopper is the write side of the internal stop channel, allowing us to close it.
92 // It and `internalStop` should point to the same channel.
93 internalStopper chan<- struct{}
95 startCache func(stop <-chan struct{}) error
98 // Add sets dependencies on i, and adds it to the list of runnables to start.
99 func (cm *controllerManager) Add(r Runnable) error {
103 // Set dependencies on the object
104 if err := cm.SetFields(r); err != nil {
108 // Add the runnable to the list
109 cm.runnables = append(cm.runnables, r)
111 // If already started, start the controller
113 cm.errChan <- r.Start(cm.internalStop)
120 func (cm *controllerManager) SetFields(i interface{}) error {
121 if _, err := inject.ConfigInto(cm.config, i); err != nil {
124 if _, err := inject.ClientInto(cm.client, i); err != nil {
127 if _, err := inject.SchemeInto(cm.scheme, i); err != nil {
130 if _, err := inject.CacheInto(cm.cache, i); err != nil {
133 if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
136 if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
139 if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
145 func (cm *controllerManager) GetConfig() *rest.Config {
149 func (cm *controllerManager) GetClient() client.Client {
153 func (cm *controllerManager) GetScheme() *runtime.Scheme {
157 func (cm *controllerManager) GetAdmissionDecoder() types.Decoder {
158 return cm.admissionDecoder
161 func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
162 return cm.fieldIndexes
165 func (cm *controllerManager) GetCache() cache.Cache {
169 func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
170 return cm.recorderProvider.GetEventRecorderFor(name)
173 func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
177 func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
178 handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
179 ErrorHandling: promhttp.HTTPErrorOnError,
181 // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
182 mux := http.NewServeMux()
183 mux.Handle("/metrics", handler)
184 server := http.Server{
189 if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
194 // Shutdown the server when stop is closed
197 if err := server.Shutdown(context.Background()); err != nil {
203 func (cm *controllerManager) Start(stop <-chan struct{}) error {
204 // join the passed-in stop channel as an upstream feeding into cm.internalStopper
205 defer close(cm.internalStopper)
207 // Metrics should be served whether the controller is leader or not.
208 // (If we don't serve metrics for non-leaders, prometheus will still scrape
209 // the pod but will get a connection refused)
210 if cm.metricsListener != nil {
211 go cm.serveMetrics(cm.internalStop)
214 if cm.resourceLock != nil {
215 err := cm.startLeaderElection()
227 case err := <-cm.errChan:
228 // Error starting a controller
233 func (cm *controllerManager) start() {
237 // Start the Cache. Allow the function to start the cache to be mocked out for testing
238 if cm.startCache == nil {
239 cm.startCache = cm.cache.Start
242 if err := cm.startCache(cm.internalStop); err != nil {
247 // Wait for the caches to sync.
248 // TODO(community): Check the return value and write a test
249 cm.cache.WaitForCacheSync(cm.internalStop)
251 // Start the runnables after the cache has synced
252 for _, c := range cm.runnables {
253 // Controllers block, but we want to return an error if any have an error starting.
254 // Write any Start errors to a channel so we can return them
257 cm.errChan <- ctrl.Start(cm.internalStop)
264 func (cm *controllerManager) startLeaderElection() (err error) {
265 l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
266 Lock: cm.resourceLock,
267 // Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
268 // TODO(joelspeed): These timings should be configurable
269 LeaseDuration: 15 * time.Second,
270 RenewDeadline: 10 * time.Second,
271 RetryPeriod: 2 * time.Second,
272 Callbacks: leaderelection.LeaderCallbacks{
273 OnStartedLeading: func(_ context.Context) {
276 OnStoppedLeading: func() {
277 // Most implementations of leader election log.Fatal() here.
278 // Since Start is wrapped in log.Fatal when called, we can just return
279 // an error here which will cause the program to exit.
280 cm.errChan <- fmt.Errorf("leader election lost")
288 // Start the leader elector process
289 go l.Run(context.Background())