2 Copyright 2018 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
23 "k8s.io/apimachinery/pkg/api/meta"
24 "k8s.io/apimachinery/pkg/runtime"
25 "k8s.io/client-go/util/workqueue"
26 "sigs.k8s.io/controller-runtime/pkg/event"
27 "sigs.k8s.io/controller-runtime/pkg/handler"
28 "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
29 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
30 "sigs.k8s.io/controller-runtime/pkg/source/internal"
32 toolscache "k8s.io/client-go/tools/cache"
33 "sigs.k8s.io/controller-runtime/pkg/cache"
34 "sigs.k8s.io/controller-runtime/pkg/predicate"
37 var log = logf.KBLog.WithName("source")
40 // defaultBufferSize is the default number of event notifications that can be buffered.
41 defaultBufferSize = 1024
44 // Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
45 // which should be processed by event.EventHandlers to enqueue reconcile.Requests.
47 // * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
49 // * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
51 // Users may build their own Source implementations. If their implementations implement any of the inject package
52 // interfaces, the dependencies will be injected by the Controller when Watch is called.
53 type Source interface {
54 // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
55 // to enqueue reconcile.Requests.
56 Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
59 // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
61 // Type is the type of object to watch. e.g. &v1.Pod{}
64 // cache used to watch APIs
68 var _ Source = &Kind{}
70 // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
71 // to enqueue reconcile.Requests.
72 func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
73 prct ...predicate.Predicate) error {
75 // Type should have been specified by the user.
77 return fmt.Errorf("must specify Kind.Type")
80 // cache should have been injected before Start was called
82 return fmt.Errorf("must call CacheInto on Kind before calling Start")
85 // Lookup the Informer from the Cache and add an EventHandler which populates the Queue
86 i, err := ks.cache.GetInformer(ks.Type)
88 if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
89 log.Error(err, "if kind is a CRD, it should be installed before calling Start",
90 "kind", kindMatchErr.GroupKind)
94 i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
98 func (ks *Kind) String() string {
99 if ks.Type != nil && ks.Type.GetObjectKind() != nil {
100 return fmt.Sprintf("kind source: %v", ks.Type.GetObjectKind().GroupVersionKind().String())
102 return fmt.Sprintf("kind source: unknown GVK")
105 var _ inject.Cache = &Kind{}
107 // InjectCache is internal should be called only by the Controller. InjectCache is used to inject
108 // the Cache dependency initialized by the ControllerManager.
109 func (ks *Kind) InjectCache(c cache.Cache) error {
116 var _ Source = &Channel{}
118 // Channel is used to provide a source of events originating outside the cluster
119 // (e.g. GitHub Webhook callback). Channel requires the user to wire the external
120 // source (eh.g. http handler) to write GenericEvents to the underlying channel.
121 type Channel struct {
122 // once ensures the event distribution goroutine will be performed only once
125 // Source is the source channel to fetch GenericEvents
126 Source <-chan event.GenericEvent
128 // stop is to end ongoing goroutine, and close the channels
131 // dest is the destination channels of the added event handlers
132 dest []chan event.GenericEvent
134 // DestBufferSize is the specified buffer size of dest channels.
135 // Default to 1024 if not specified.
138 // destLock is to ensure the destination channels are safely added/removed
142 func (cs *Channel) String() string {
143 return fmt.Sprintf("channel source: %p", cs)
146 var _ inject.Stoppable = &Channel{}
148 // InjectStopChannel is internal should be called only by the Controller.
149 // It is used to inject the stop channel initialized by the ControllerManager.
150 func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error {
158 // Start implements Source and should only be called by the Controller.
159 func (cs *Channel) Start(
160 handler handler.EventHandler,
161 queue workqueue.RateLimitingInterface,
162 prct ...predicate.Predicate) error {
163 // Source should have been specified by the user.
164 if cs.Source == nil {
165 return fmt.Errorf("must specify Channel.Source")
168 // stop should have been injected before Start was called
170 return fmt.Errorf("must call InjectStop on Channel before calling Start")
173 // use default value if DestBufferSize not specified
174 if cs.DestBufferSize == 0 {
175 cs.DestBufferSize = defaultBufferSize
179 // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
183 dst := make(chan event.GenericEvent, cs.DestBufferSize)
185 for evt := range dst {
187 for _, p := range prct {
195 handler.Generic(evt, queue)
201 defer cs.destLock.Unlock()
203 cs.dest = append(cs.dest, dst)
208 func (cs *Channel) doStop() {
210 defer cs.destLock.Unlock()
212 for _, dst := range cs.dest {
217 func (cs *Channel) distribute(evt event.GenericEvent) {
219 defer cs.destLock.Unlock()
221 for _, dst := range cs.dest {
222 // We cannot make it under goroutine here, or we'll meet the
223 // race condition of writing message to closed channels.
224 // To avoid blocking, the dest channels are expected to be of
225 // proper buffer size. If we still see it blocked, then
226 // the controller is thought to be in an abnormal state.
231 func (cs *Channel) syncLoop() {
235 // Close destination channels
238 case evt := <-cs.Source:
244 // Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
245 type Informer struct {
246 // Informer is the generated client-go Informer
247 Informer toolscache.SharedIndexInformer
250 var _ Source = &Informer{}
252 // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
253 // to enqueue reconcile.Requests.
254 func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
255 prct ...predicate.Predicate) error {
257 // Informer should have been specified by the user.
258 if is.Informer == nil {
259 return fmt.Errorf("must specify Informer.Informer")
262 is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
266 func (is *Informer) String() string {
267 return fmt.Sprintf("informer source: %p", is.Informer)
270 // Func is a function that implements Source
271 type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
273 // Start implements Source
274 func (f Func) Start(evt handler.EventHandler, queue workqueue.RateLimitingInterface,
275 pr ...predicate.Predicate) error {
276 return f(evt, queue, pr...)
279 func (f Func) String() string {
280 return fmt.Sprintf("func source: %p", f)