2 Copyright 2014 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
25 "k8s.io/apimachinery/pkg/runtime"
28 // Interface can be implemented by anything that knows how to watch and report changes.
29 type Interface interface {
30 // Stops watching. Will close the channel returned by ResultChan(). Releases
31 // any resources used by the watch.
34 // Returns a chan which will receive all the events. If an error occurs
35 // or Stop() is called, this channel will be closed, in which case the
36 // watch should be completely cleaned up.
37 ResultChan() <-chan Event
40 // EventType defines the possible types of events.
44 Added EventType = "ADDED"
45 Modified EventType = "MODIFIED"
46 Deleted EventType = "DELETED"
47 Error EventType = "ERROR"
49 DefaultChanSize int32 = 100
52 // Event represents a single event to a watched resource.
53 // +k8s:deepcopy-gen=true
58 // * If Type is Added or Modified: the new state of the object.
59 // * If Type is Deleted: the state of the object immediately before deletion.
60 // * If Type is Error: *api.Status is recommended; other types may make sense
61 // depending on context.
65 type emptyWatch chan Event
67 // NewEmptyWatch returns a watch interface that returns no results and is closed.
68 // May be used in certain error conditions where no information is available but
69 // an error is not warranted.
70 func NewEmptyWatch() Interface {
71 ch := make(chan Event)
76 // Stop implements Interface
77 func (w emptyWatch) Stop() {
80 // ResultChan implements Interface
81 func (w emptyWatch) ResultChan() <-chan Event {
85 // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
86 type FakeWatcher struct {
92 func NewFake() *FakeWatcher {
94 result: make(chan Event),
98 func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
100 result: make(chan Event, size),
104 // Stop implements Interface.Stop().
105 func (f *FakeWatcher) Stop() {
109 klog.V(4).Infof("Stopping fake watcher.")
115 func (f *FakeWatcher) IsStopped() bool {
121 // Reset prepares the watcher to be reused.
122 func (f *FakeWatcher) Reset() {
126 f.result = make(chan Event)
129 func (f *FakeWatcher) ResultChan() <-chan Event {
133 // Add sends an add event.
134 func (f *FakeWatcher) Add(obj runtime.Object) {
135 f.result <- Event{Added, obj}
138 // Modify sends a modify event.
139 func (f *FakeWatcher) Modify(obj runtime.Object) {
140 f.result <- Event{Modified, obj}
143 // Delete sends a delete event.
144 func (f *FakeWatcher) Delete(lastValue runtime.Object) {
145 f.result <- Event{Deleted, lastValue}
148 // Error sends an Error event.
149 func (f *FakeWatcher) Error(errValue runtime.Object) {
150 f.result <- Event{Error, errValue}
153 // Action sends an event of the requested type, for table-based testing.
154 func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
155 f.result <- Event{action, obj}
158 // RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
159 type RaceFreeFakeWatcher struct {
165 func NewRaceFreeFake() *RaceFreeFakeWatcher {
166 return &RaceFreeFakeWatcher{
167 result: make(chan Event, DefaultChanSize),
171 // Stop implements Interface.Stop().
172 func (f *RaceFreeFakeWatcher) Stop() {
176 klog.V(4).Infof("Stopping fake watcher.")
182 func (f *RaceFreeFakeWatcher) IsStopped() bool {
188 // Reset prepares the watcher to be reused.
189 func (f *RaceFreeFakeWatcher) Reset() {
193 f.result = make(chan Event, DefaultChanSize)
196 func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
202 // Add sends an add event.
203 func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
208 case f.result <- Event{Added, obj}:
211 panic(fmt.Errorf("channel full"))
216 // Modify sends a modify event.
217 func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
222 case f.result <- Event{Modified, obj}:
225 panic(fmt.Errorf("channel full"))
230 // Delete sends a delete event.
231 func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
236 case f.result <- Event{Deleted, lastValue}:
239 panic(fmt.Errorf("channel full"))
244 // Error sends an Error event.
245 func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
250 case f.result <- Event{Error, errValue}:
253 panic(fmt.Errorf("channel full"))
258 // Action sends an event of the requested type, for table-based testing.
259 func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
264 case f.result <- Event{action, obj}:
267 panic(fmt.Errorf("channel full"))
272 // ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
273 type ProxyWatcher struct {
281 var _ Interface = &ProxyWatcher{}
283 // NewProxyWatcher creates new ProxyWatcher by wrapping a channel
284 func NewProxyWatcher(ch chan Event) *ProxyWatcher {
285 return &ProxyWatcher{
287 stopCh: make(chan struct{}),
292 // Stop implements Interface
293 func (pw *ProxyWatcher) Stop() {
295 defer pw.mutex.Unlock()
302 // Stopping returns true if Stop() has been called
303 func (pw *ProxyWatcher) Stopping() bool {
305 defer pw.mutex.Unlock()
309 // ResultChan implements Interface
310 func (pw *ProxyWatcher) ResultChan() <-chan Event {
314 // StopChan returns stop channel
315 func (pw *ProxyWatcher) StopChan() <-chan struct{} {