2 Copyright 2014 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
25 "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/apimachinery/pkg/util/clock"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/watch"
31 restclient "k8s.io/client-go/rest"
32 ref "k8s.io/client-go/tools/reference"
39 const maxTriesPerEvent = 12
41 var defaultSleepDuration = 10 * time.Second
43 const maxQueuedEvents = 1000
45 // EventSink knows how to store events (client.Client implements it.)
46 // EventSink must respect the namespace that will be embedded in 'event'.
47 // It is assumed that EventSink will return the same sorts of errors as
48 // pkg/client's REST client.
49 type EventSink interface {
50 Create(event *v1.Event) (*v1.Event, error)
51 Update(event *v1.Event) (*v1.Event, error)
52 Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
55 // EventRecorder knows how to record events on behalf of an EventSource.
56 type EventRecorder interface {
57 // Event constructs an event from the given information and puts it in the queue for sending.
58 // 'object' is the object this event is about. Event will make a reference-- or you may also
59 // pass a reference to the object directly.
60 // 'type' of this event, and can be one of Normal, Warning. New types could be added in future
61 // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
62 // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
63 // to automate handling of events, so imagine people writing switch statements to handle them.
64 // You want to make that easy.
65 // 'message' is intended to be human readable.
67 // The resulting event will be created in the same namespace as the reference object.
68 Event(object runtime.Object, eventtype, reason, message string)
70 // Eventf is just like Event, but with Sprintf for the message field.
71 Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
73 // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
74 PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
76 // AnnotatedEventf is just like eventf, but with annotations attached
77 AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
80 // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
81 type EventBroadcaster interface {
82 // StartEventWatcher starts sending events received from this EventBroadcaster to the given
83 // event handler function. The return value can be ignored or used to stop recording, if
85 StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
87 // StartRecordingToSink starts sending events received from this EventBroadcaster to the given
88 // sink. The return value can be ignored or used to stop recording, if desired.
89 StartRecordingToSink(sink EventSink) watch.Interface
91 // StartLogging starts sending events received from this EventBroadcaster to the given logging
92 // function. The return value can be ignored or used to stop recording, if desired.
93 StartLogging(logf func(format string, args ...interface{})) watch.Interface
95 // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
96 // with the event source set to the given event source.
97 NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
100 // Creates a new event broadcaster.
101 func NewBroadcaster() EventBroadcaster {
102 return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
105 func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
106 return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
109 type eventBroadcasterImpl struct {
111 sleepDuration time.Duration
114 // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
115 // The return value can be ignored or used to stop recording, if desired.
116 // TODO: make me an object with parameterizable queue length and retry interval
117 func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
118 // The default math/rand package functions aren't thread safe, so create a
119 // new Rand object for each StartRecording call.
120 randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
121 eventCorrelator := NewEventCorrelator(clock.RealClock{})
122 return eventBroadcaster.StartEventWatcher(
123 func(event *v1.Event) {
124 recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
128 func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
129 // Make a copy before modification, because there could be multiple listeners.
130 // Events are safe to copy like this.
133 result, err := eventCorrelator.EventCorrelate(event)
135 utilruntime.HandleError(err)
142 if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
146 if tries >= maxTriesPerEvent {
147 klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
150 // Randomize the first sleep so that various clients won't all be
151 // synced up if the master goes down.
153 time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
155 time.Sleep(sleepDuration)
160 func isKeyNotFoundError(err error) bool {
161 statusErr, _ := err.(*errors.StatusError)
163 if statusErr != nil && statusErr.Status().Code == http.StatusNotFound {
170 // recordEvent attempts to write event to a sink. It returns true if the event
171 // was successfully recorded or discarded, false if it should be retried.
172 // If updateExistingEvent is false, it creates a new event, otherwise it updates
174 func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
175 var newEvent *v1.Event
177 if updateExistingEvent {
178 newEvent, err = sink.Patch(event, patch)
180 // Update can fail because the event may have been removed and it no longer exists.
181 if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
182 // Making sure that ResourceVersion is empty on creation
183 event.ResourceVersion = ""
184 newEvent, err = sink.Create(event)
187 // we need to update our event correlator with the server returned state to handle name/resourceversion
188 eventCorrelator.UpdateState(newEvent)
192 // If we can't contact the server, then hold everything while we keep trying.
193 // Otherwise, something about the event is malformed and we should abandon it.
195 case *restclient.RequestConstructionError:
196 // We will construct the request the same next time, so don't keep trying.
197 klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
199 case *errors.StatusError:
200 if errors.IsAlreadyExists(err) {
201 klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
203 klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
206 case *errors.UnexpectedObjectError:
207 // We don't expect this; it implies the server's response didn't match a
208 // known pattern. Go ahead and retry.
210 // This case includes actual http transport errors. Go ahead and retry.
212 klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
216 // StartLogging starts sending events received from this EventBroadcaster to the given logging function.
217 // The return value can be ignored or used to stop recording, if desired.
218 func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
219 return eventBroadcaster.StartEventWatcher(
221 logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
225 // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
226 // The return value can be ignored or used to stop recording, if desired.
227 func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
228 watcher := eventBroadcaster.Watch()
230 defer utilruntime.HandleCrash()
231 for watchEvent := range watcher.ResultChan() {
232 event, ok := watchEvent.Object.(*v1.Event)
234 // This is all local, so there's no reason this should
244 // NewRecorder returns an EventRecorder that records events with the given event source.
245 func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
246 return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
249 type recorderImpl struct {
250 scheme *runtime.Scheme
251 source v1.EventSource
256 func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
257 ref, err := ref.GetReference(recorder.scheme, object)
259 klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
263 if !validateEventType(eventtype) {
264 klog.Errorf("Unsupported event type: '%v'", eventtype)
268 event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
269 event.Source = recorder.source
272 // NOTE: events should be a non-blocking operation
273 defer utilruntime.HandleCrash()
274 recorder.Action(watch.Added, event)
278 func validateEventType(eventtype string) bool {
280 case v1.EventTypeNormal, v1.EventTypeWarning:
286 func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
287 recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
290 func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
291 recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
294 func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
295 recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
298 func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
299 recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
302 func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
303 t := metav1.Time{Time: recorder.clock.Now()}
304 namespace := ref.Namespace
306 namespace = metav1.NamespaceDefault
309 ObjectMeta: metav1.ObjectMeta{
310 Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
311 Namespace: namespace,
312 Annotations: annotations,
314 InvolvedObject: *ref,