Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / sigs.k8s.io / controller-runtime / pkg / source / source.go
1 /*
2 Copyright 2018 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package source
18
19 import (
20         "fmt"
21         "sync"
22
23         "k8s.io/apimachinery/pkg/api/meta"
24         "k8s.io/apimachinery/pkg/runtime"
25         "k8s.io/client-go/util/workqueue"
26         "sigs.k8s.io/controller-runtime/pkg/event"
27         "sigs.k8s.io/controller-runtime/pkg/handler"
28         "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
29         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
30         "sigs.k8s.io/controller-runtime/pkg/source/internal"
31
32         toolscache "k8s.io/client-go/tools/cache"
33         "sigs.k8s.io/controller-runtime/pkg/cache"
34         "sigs.k8s.io/controller-runtime/pkg/predicate"
35 )
36
37 var log = logf.KBLog.WithName("source")
38
39 const (
40         // defaultBufferSize is the default number of event notifications that can be buffered.
41         defaultBufferSize = 1024
42 )
43
44 // Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
45 // which should be processed by event.EventHandlers to enqueue reconcile.Requests.
46 //
47 // * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
48 //
49 // * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
50 //
51 // Users may build their own Source implementations.  If their implementations implement any of the inject package
52 // interfaces, the dependencies will be injected by the Controller when Watch is called.
53 type Source interface {
54         // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
55         // to enqueue reconcile.Requests.
56         Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
57 }
58
59 // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
60 type Kind struct {
61         // Type is the type of object to watch.  e.g. &v1.Pod{}
62         Type runtime.Object
63
64         // cache used to watch APIs
65         cache cache.Cache
66 }
67
68 var _ Source = &Kind{}
69
70 // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
71 // to enqueue reconcile.Requests.
72 func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
73         prct ...predicate.Predicate) error {
74
75         // Type should have been specified by the user.
76         if ks.Type == nil {
77                 return fmt.Errorf("must specify Kind.Type")
78         }
79
80         // cache should have been injected before Start was called
81         if ks.cache == nil {
82                 return fmt.Errorf("must call CacheInto on Kind before calling Start")
83         }
84
85         // Lookup the Informer from the Cache and add an EventHandler which populates the Queue
86         i, err := ks.cache.GetInformer(ks.Type)
87         if err != nil {
88                 if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
89                         log.Error(err, "if kind is a CRD, it should be installed before calling Start",
90                                 "kind", kindMatchErr.GroupKind)
91                 }
92                 return err
93         }
94         i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
95         return nil
96 }
97
98 func (ks *Kind) String() string {
99         if ks.Type != nil && ks.Type.GetObjectKind() != nil {
100                 return fmt.Sprintf("kind source: %v", ks.Type.GetObjectKind().GroupVersionKind().String())
101         }
102         return fmt.Sprintf("kind source: unknown GVK")
103 }
104
105 var _ inject.Cache = &Kind{}
106
107 // InjectCache is internal should be called only by the Controller.  InjectCache is used to inject
108 // the Cache dependency initialized by the ControllerManager.
109 func (ks *Kind) InjectCache(c cache.Cache) error {
110         if ks.cache == nil {
111                 ks.cache = c
112         }
113         return nil
114 }
115
116 var _ Source = &Channel{}
117
118 // Channel is used to provide a source of events originating outside the cluster
119 // (e.g. GitHub Webhook callback).  Channel requires the user to wire the external
120 // source (eh.g. http handler) to write GenericEvents to the underlying channel.
121 type Channel struct {
122         // once ensures the event distribution goroutine will be performed only once
123         once sync.Once
124
125         // Source is the source channel to fetch GenericEvents
126         Source <-chan event.GenericEvent
127
128         // stop is to end ongoing goroutine, and close the channels
129         stop <-chan struct{}
130
131         // dest is the destination channels of the added event handlers
132         dest []chan event.GenericEvent
133
134         // DestBufferSize is the specified buffer size of dest channels.
135         // Default to 1024 if not specified.
136         DestBufferSize int
137
138         // destLock is to ensure the destination channels are safely added/removed
139         destLock sync.Mutex
140 }
141
142 func (cs *Channel) String() string {
143         return fmt.Sprintf("channel source: %p", cs)
144 }
145
146 var _ inject.Stoppable = &Channel{}
147
148 // InjectStopChannel is internal should be called only by the Controller.
149 // It is used to inject the stop channel initialized by the ControllerManager.
150 func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error {
151         if cs.stop == nil {
152                 cs.stop = stop
153         }
154
155         return nil
156 }
157
158 // Start implements Source and should only be called by the Controller.
159 func (cs *Channel) Start(
160         handler handler.EventHandler,
161         queue workqueue.RateLimitingInterface,
162         prct ...predicate.Predicate) error {
163         // Source should have been specified by the user.
164         if cs.Source == nil {
165                 return fmt.Errorf("must specify Channel.Source")
166         }
167
168         // stop should have been injected before Start was called
169         if cs.stop == nil {
170                 return fmt.Errorf("must call InjectStop on Channel before calling Start")
171         }
172
173         // use default value if DestBufferSize not specified
174         if cs.DestBufferSize == 0 {
175                 cs.DestBufferSize = defaultBufferSize
176         }
177
178         cs.once.Do(func() {
179                 // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
180                 go cs.syncLoop()
181         })
182
183         dst := make(chan event.GenericEvent, cs.DestBufferSize)
184         go func() {
185                 for evt := range dst {
186                         shouldHandle := true
187                         for _, p := range prct {
188                                 if !p.Generic(evt) {
189                                         shouldHandle = false
190                                         break
191                                 }
192                         }
193
194                         if shouldHandle {
195                                 handler.Generic(evt, queue)
196                         }
197                 }
198         }()
199
200         cs.destLock.Lock()
201         defer cs.destLock.Unlock()
202
203         cs.dest = append(cs.dest, dst)
204
205         return nil
206 }
207
208 func (cs *Channel) doStop() {
209         cs.destLock.Lock()
210         defer cs.destLock.Unlock()
211
212         for _, dst := range cs.dest {
213                 close(dst)
214         }
215 }
216
217 func (cs *Channel) distribute(evt event.GenericEvent) {
218         cs.destLock.Lock()
219         defer cs.destLock.Unlock()
220
221         for _, dst := range cs.dest {
222                 // We cannot make it under goroutine here, or we'll meet the
223                 // race condition of writing message to closed channels.
224                 // To avoid blocking, the dest channels are expected to be of
225                 // proper buffer size. If we still see it blocked, then
226                 // the controller is thought to be in an abnormal state.
227                 dst <- evt
228         }
229 }
230
231 func (cs *Channel) syncLoop() {
232         for {
233                 select {
234                 case <-cs.stop:
235                         // Close destination channels
236                         cs.doStop()
237                         return
238                 case evt := <-cs.Source:
239                         cs.distribute(evt)
240                 }
241         }
242 }
243
244 // Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
245 type Informer struct {
246         // Informer is the generated client-go Informer
247         Informer toolscache.SharedIndexInformer
248 }
249
250 var _ Source = &Informer{}
251
252 // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
253 // to enqueue reconcile.Requests.
254 func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
255         prct ...predicate.Predicate) error {
256
257         // Informer should have been specified by the user.
258         if is.Informer == nil {
259                 return fmt.Errorf("must specify Informer.Informer")
260         }
261
262         is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
263         return nil
264 }
265
266 func (is *Informer) String() string {
267         return fmt.Sprintf("informer source: %p", is.Informer)
268 }
269
270 // Func is a function that implements Source
271 type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
272
273 // Start implements Source
274 func (f Func) Start(evt handler.EventHandler, queue workqueue.RateLimitingInterface,
275         pr ...predicate.Predicate) error {
276         return f(evt, queue, pr...)
277 }
278
279 func (f Func) String() string {
280         return fmt.Sprintf("func source: %p", f)
281 }