Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / record / event.go
1 /*
2 Copyright 2014 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package record
18
19 import (
20         "fmt"
21         "math/rand"
22         "time"
23
24         "k8s.io/api/core/v1"
25         "k8s.io/apimachinery/pkg/api/errors"
26         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27         "k8s.io/apimachinery/pkg/runtime"
28         "k8s.io/apimachinery/pkg/util/clock"
29         utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30         "k8s.io/apimachinery/pkg/watch"
31         restclient "k8s.io/client-go/rest"
32         ref "k8s.io/client-go/tools/reference"
33
34         "net/http"
35
36         "k8s.io/klog"
37 )
38
39 const maxTriesPerEvent = 12
40
41 var defaultSleepDuration = 10 * time.Second
42
43 const maxQueuedEvents = 1000
44
45 // EventSink knows how to store events (client.Client implements it.)
46 // EventSink must respect the namespace that will be embedded in 'event'.
47 // It is assumed that EventSink will return the same sorts of errors as
48 // pkg/client's REST client.
49 type EventSink interface {
50         Create(event *v1.Event) (*v1.Event, error)
51         Update(event *v1.Event) (*v1.Event, error)
52         Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
53 }
54
55 // EventRecorder knows how to record events on behalf of an EventSource.
56 type EventRecorder interface {
57         // Event constructs an event from the given information and puts it in the queue for sending.
58         // 'object' is the object this event is about. Event will make a reference-- or you may also
59         // pass a reference to the object directly.
60         // 'type' of this event, and can be one of Normal, Warning. New types could be added in future
61         // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
62         // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
63         // to automate handling of events, so imagine people writing switch statements to handle them.
64         // You want to make that easy.
65         // 'message' is intended to be human readable.
66         //
67         // The resulting event will be created in the same namespace as the reference object.
68         Event(object runtime.Object, eventtype, reason, message string)
69
70         // Eventf is just like Event, but with Sprintf for the message field.
71         Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
72
73         // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
74         PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
75
76         // AnnotatedEventf is just like eventf, but with annotations attached
77         AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
78 }
79
80 // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
81 type EventBroadcaster interface {
82         // StartEventWatcher starts sending events received from this EventBroadcaster to the given
83         // event handler function. The return value can be ignored or used to stop recording, if
84         // desired.
85         StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
86
87         // StartRecordingToSink starts sending events received from this EventBroadcaster to the given
88         // sink. The return value can be ignored or used to stop recording, if desired.
89         StartRecordingToSink(sink EventSink) watch.Interface
90
91         // StartLogging starts sending events received from this EventBroadcaster to the given logging
92         // function. The return value can be ignored or used to stop recording, if desired.
93         StartLogging(logf func(format string, args ...interface{})) watch.Interface
94
95         // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
96         // with the event source set to the given event source.
97         NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
98 }
99
100 // Creates a new event broadcaster.
101 func NewBroadcaster() EventBroadcaster {
102         return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
103 }
104
105 func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
106         return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
107 }
108
109 type eventBroadcasterImpl struct {
110         *watch.Broadcaster
111         sleepDuration time.Duration
112 }
113
114 // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
115 // The return value can be ignored or used to stop recording, if desired.
116 // TODO: make me an object with parameterizable queue length and retry interval
117 func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
118         // The default math/rand package functions aren't thread safe, so create a
119         // new Rand object for each StartRecording call.
120         randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
121         eventCorrelator := NewEventCorrelator(clock.RealClock{})
122         return eventBroadcaster.StartEventWatcher(
123                 func(event *v1.Event) {
124                         recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
125                 })
126 }
127
128 func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
129         // Make a copy before modification, because there could be multiple listeners.
130         // Events are safe to copy like this.
131         eventCopy := *event
132         event = &eventCopy
133         result, err := eventCorrelator.EventCorrelate(event)
134         if err != nil {
135                 utilruntime.HandleError(err)
136         }
137         if result.Skip {
138                 return
139         }
140         tries := 0
141         for {
142                 if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
143                         break
144                 }
145                 tries++
146                 if tries >= maxTriesPerEvent {
147                         klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
148                         break
149                 }
150                 // Randomize the first sleep so that various clients won't all be
151                 // synced up if the master goes down.
152                 if tries == 1 {
153                         time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
154                 } else {
155                         time.Sleep(sleepDuration)
156                 }
157         }
158 }
159
160 func isKeyNotFoundError(err error) bool {
161         statusErr, _ := err.(*errors.StatusError)
162
163         if statusErr != nil && statusErr.Status().Code == http.StatusNotFound {
164                 return true
165         }
166
167         return false
168 }
169
170 // recordEvent attempts to write event to a sink. It returns true if the event
171 // was successfully recorded or discarded, false if it should be retried.
172 // If updateExistingEvent is false, it creates a new event, otherwise it updates
173 // existing event.
174 func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
175         var newEvent *v1.Event
176         var err error
177         if updateExistingEvent {
178                 newEvent, err = sink.Patch(event, patch)
179         }
180         // Update can fail because the event may have been removed and it no longer exists.
181         if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
182                 // Making sure that ResourceVersion is empty on creation
183                 event.ResourceVersion = ""
184                 newEvent, err = sink.Create(event)
185         }
186         if err == nil {
187                 // we need to update our event correlator with the server returned state to handle name/resourceversion
188                 eventCorrelator.UpdateState(newEvent)
189                 return true
190         }
191
192         // If we can't contact the server, then hold everything while we keep trying.
193         // Otherwise, something about the event is malformed and we should abandon it.
194         switch err.(type) {
195         case *restclient.RequestConstructionError:
196                 // We will construct the request the same next time, so don't keep trying.
197                 klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
198                 return true
199         case *errors.StatusError:
200                 if errors.IsAlreadyExists(err) {
201                         klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
202                 } else {
203                         klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
204                 }
205                 return true
206         case *errors.UnexpectedObjectError:
207                 // We don't expect this; it implies the server's response didn't match a
208                 // known pattern. Go ahead and retry.
209         default:
210                 // This case includes actual http transport errors. Go ahead and retry.
211         }
212         klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
213         return false
214 }
215
216 // StartLogging starts sending events received from this EventBroadcaster to the given logging function.
217 // The return value can be ignored or used to stop recording, if desired.
218 func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
219         return eventBroadcaster.StartEventWatcher(
220                 func(e *v1.Event) {
221                         logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
222                 })
223 }
224
225 // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
226 // The return value can be ignored or used to stop recording, if desired.
227 func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
228         watcher := eventBroadcaster.Watch()
229         go func() {
230                 defer utilruntime.HandleCrash()
231                 for watchEvent := range watcher.ResultChan() {
232                         event, ok := watchEvent.Object.(*v1.Event)
233                         if !ok {
234                                 // This is all local, so there's no reason this should
235                                 // ever happen.
236                                 continue
237                         }
238                         eventHandler(event)
239                 }
240         }()
241         return watcher
242 }
243
244 // NewRecorder returns an EventRecorder that records events with the given event source.
245 func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
246         return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
247 }
248
249 type recorderImpl struct {
250         scheme *runtime.Scheme
251         source v1.EventSource
252         *watch.Broadcaster
253         clock clock.Clock
254 }
255
256 func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
257         ref, err := ref.GetReference(recorder.scheme, object)
258         if err != nil {
259                 klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
260                 return
261         }
262
263         if !validateEventType(eventtype) {
264                 klog.Errorf("Unsupported event type: '%v'", eventtype)
265                 return
266         }
267
268         event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
269         event.Source = recorder.source
270
271         go func() {
272                 // NOTE: events should be a non-blocking operation
273                 defer utilruntime.HandleCrash()
274                 recorder.Action(watch.Added, event)
275         }()
276 }
277
278 func validateEventType(eventtype string) bool {
279         switch eventtype {
280         case v1.EventTypeNormal, v1.EventTypeWarning:
281                 return true
282         }
283         return false
284 }
285
286 func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
287         recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
288 }
289
290 func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
291         recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
292 }
293
294 func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
295         recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
296 }
297
298 func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
299         recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
300 }
301
302 func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
303         t := metav1.Time{Time: recorder.clock.Now()}
304         namespace := ref.Namespace
305         if namespace == "" {
306                 namespace = metav1.NamespaceDefault
307         }
308         return &v1.Event{
309                 ObjectMeta: metav1.ObjectMeta{
310                         Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
311                         Namespace:   namespace,
312                         Annotations: annotations,
313                 },
314                 InvolvedObject: *ref,
315                 Reason:         reason,
316                 Message:        message,
317                 FirstTimestamp: t,
318                 LastTimestamp:  t,
319                 Count:          1,
320                 Type:           eventtype,
321         }
322 }