Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / k8s.io / apimachinery / pkg / watch / mux.go
1 /*
2 Copyright 2014 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package watch
18
19 import (
20         "sync"
21
22         "k8s.io/apimachinery/pkg/runtime"
23         "k8s.io/apimachinery/pkg/runtime/schema"
24 )
25
26 // FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
27 // channel is full.
28 type FullChannelBehavior int
29
30 const (
31         WaitIfChannelFull FullChannelBehavior = iota
32         DropIfChannelFull
33 )
34
35 // Buffer the incoming queue a little bit even though it should rarely ever accumulate
36 // anything, just in case a few events are received in such a short window that
37 // Broadcaster can't move them onto the watchers' queues fast enough.
38 const incomingQueueLength = 25
39
40 // Broadcaster distributes event notifications among any number of watchers. Every event
41 // is delivered to every watcher.
42 type Broadcaster struct {
43         // TODO: see if this lock is needed now that new watchers go through
44         // the incoming channel.
45         lock sync.Mutex
46
47         watchers     map[int64]*broadcasterWatcher
48         nextWatcher  int64
49         distributing sync.WaitGroup
50
51         incoming chan Event
52
53         // How large to make watcher's channel.
54         watchQueueLength int
55         // If one of the watch channels is full, don't wait for it to become empty.
56         // Instead just deliver it to the watchers that do have space in their
57         // channels and move on to the next event.
58         // It's more fair to do this on a per-watcher basis than to do it on the
59         // "incoming" channel, which would allow one slow watcher to prevent all
60         // other watchers from getting new events.
61         fullChannelBehavior FullChannelBehavior
62 }
63
64 // NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
65 // It is guaranteed that events will be distributed in the order in which they occur,
66 // but the order in which a single event is distributed among all of the watchers is unspecified.
67 func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
68         m := &Broadcaster{
69                 watchers:            map[int64]*broadcasterWatcher{},
70                 incoming:            make(chan Event, incomingQueueLength),
71                 watchQueueLength:    queueLength,
72                 fullChannelBehavior: fullChannelBehavior,
73         }
74         m.distributing.Add(1)
75         go m.loop()
76         return m
77 }
78
79 const internalRunFunctionMarker = "internal-do-function"
80
81 // a function type we can shoehorn into the queue.
82 type functionFakeRuntimeObject func()
83
84 func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind {
85         return schema.EmptyObjectKind
86 }
87 func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
88         if obj == nil {
89                 return nil
90         }
91         // funcs are immutable. Hence, just return the original func.
92         return obj
93 }
94
95 // Execute f, blocking the incoming queue (and waiting for it to drain first).
96 // The purpose of this terrible hack is so that watchers added after an event
97 // won't ever see that event, and will always see any event after they are
98 // added.
99 func (b *Broadcaster) blockQueue(f func()) {
100         var wg sync.WaitGroup
101         wg.Add(1)
102         b.incoming <- Event{
103                 Type: internalRunFunctionMarker,
104                 Object: functionFakeRuntimeObject(func() {
105                         defer wg.Done()
106                         f()
107                 }),
108         }
109         wg.Wait()
110 }
111
112 // Watch adds a new watcher to the list and returns an Interface for it.
113 // Note: new watchers will only receive new events. They won't get an entire history
114 // of previous events.
115 func (m *Broadcaster) Watch() Interface {
116         var w *broadcasterWatcher
117         m.blockQueue(func() {
118                 m.lock.Lock()
119                 defer m.lock.Unlock()
120                 id := m.nextWatcher
121                 m.nextWatcher++
122                 w = &broadcasterWatcher{
123                         result:  make(chan Event, m.watchQueueLength),
124                         stopped: make(chan struct{}),
125                         id:      id,
126                         m:       m,
127                 }
128                 m.watchers[id] = w
129         })
130         return w
131 }
132
133 // WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
134 // queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
135 // The returned watch will have a queue length that is at least large enough to accommodate
136 // all of the items in queuedEvents.
137 func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
138         var w *broadcasterWatcher
139         m.blockQueue(func() {
140                 m.lock.Lock()
141                 defer m.lock.Unlock()
142                 id := m.nextWatcher
143                 m.nextWatcher++
144                 length := m.watchQueueLength
145                 if n := len(queuedEvents) + 1; n > length {
146                         length = n
147                 }
148                 w = &broadcasterWatcher{
149                         result:  make(chan Event, length),
150                         stopped: make(chan struct{}),
151                         id:      id,
152                         m:       m,
153                 }
154                 m.watchers[id] = w
155                 for _, e := range queuedEvents {
156                         w.result <- e
157                 }
158         })
159         return w
160 }
161
162 // stopWatching stops the given watcher and removes it from the list.
163 func (m *Broadcaster) stopWatching(id int64) {
164         m.lock.Lock()
165         defer m.lock.Unlock()
166         w, ok := m.watchers[id]
167         if !ok {
168                 // No need to do anything, it's already been removed from the list.
169                 return
170         }
171         delete(m.watchers, id)
172         close(w.result)
173 }
174
175 // closeAll disconnects all watchers (presumably in response to a Shutdown call).
176 func (m *Broadcaster) closeAll() {
177         m.lock.Lock()
178         defer m.lock.Unlock()
179         for _, w := range m.watchers {
180                 close(w.result)
181         }
182         // Delete everything from the map, since presence/absence in the map is used
183         // by stopWatching to avoid double-closing the channel.
184         m.watchers = map[int64]*broadcasterWatcher{}
185 }
186
187 // Action distributes the given event among all watchers.
188 func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
189         m.incoming <- Event{action, obj}
190 }
191
192 // Shutdown disconnects all watchers (but any queued events will still be distributed).
193 // You must not call Action or Watch* after calling Shutdown. This call blocks
194 // until all events have been distributed through the outbound channels. Note
195 // that since they can be buffered, this means that the watchers might not
196 // have received the data yet as it can remain sitting in the buffered
197 // channel.
198 func (m *Broadcaster) Shutdown() {
199         close(m.incoming)
200         m.distributing.Wait()
201 }
202
203 // loop receives from m.incoming and distributes to all watchers.
204 func (m *Broadcaster) loop() {
205         // Deliberately not catching crashes here. Yes, bring down the process if there's a
206         // bug in watch.Broadcaster.
207         for event := range m.incoming {
208                 if event.Type == internalRunFunctionMarker {
209                         event.Object.(functionFakeRuntimeObject)()
210                         continue
211                 }
212                 m.distribute(event)
213         }
214         m.closeAll()
215         m.distributing.Done()
216 }
217
218 // distribute sends event to all watchers. Blocking.
219 func (m *Broadcaster) distribute(event Event) {
220         m.lock.Lock()
221         defer m.lock.Unlock()
222         if m.fullChannelBehavior == DropIfChannelFull {
223                 for _, w := range m.watchers {
224                         select {
225                         case w.result <- event:
226                         case <-w.stopped:
227                         default: // Don't block if the event can't be queued.
228                         }
229                 }
230         } else {
231                 for _, w := range m.watchers {
232                         select {
233                         case w.result <- event:
234                         case <-w.stopped:
235                         }
236                 }
237         }
238 }
239
240 // broadcasterWatcher handles a single watcher of a broadcaster
241 type broadcasterWatcher struct {
242         result  chan Event
243         stopped chan struct{}
244         stop    sync.Once
245         id      int64
246         m       *Broadcaster
247 }
248
249 // ResultChan returns a channel to use for waiting on events.
250 func (mw *broadcasterWatcher) ResultChan() <-chan Event {
251         return mw.result
252 }
253
254 // Stop stops watching and removes mw from its list.
255 func (mw *broadcasterWatcher) Stop() {
256         mw.stop.Do(func() {
257                 close(mw.stopped)
258                 mw.m.stopWatching(mw.id)
259         })
260 }