Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / record / events_cache.go
1 /*
2 Copyright 2015 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package record
18
19 import (
20         "encoding/json"
21         "fmt"
22         "strings"
23         "sync"
24         "time"
25
26         "github.com/golang/groupcache/lru"
27
28         "k8s.io/api/core/v1"
29         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30         "k8s.io/apimachinery/pkg/util/clock"
31         "k8s.io/apimachinery/pkg/util/sets"
32         "k8s.io/apimachinery/pkg/util/strategicpatch"
33         "k8s.io/client-go/util/flowcontrol"
34 )
35
36 const (
37         maxLruCacheEntries = 4096
38
39         // if we see the same event that varies only by message
40         // more than 10 times in a 10 minute period, aggregate the event
41         defaultAggregateMaxEvents         = 10
42         defaultAggregateIntervalInSeconds = 600
43
44         // by default, allow a source to send 25 events about an object
45         // but control the refill rate to 1 new event every 5 minutes
46         // this helps control the long-tail of events for things that are always
47         // unhealthy
48         defaultSpamBurst = 25
49         defaultSpamQPS   = 1. / 300.
50 )
51
52 // getEventKey builds unique event key based on source, involvedObject, reason, message
53 func getEventKey(event *v1.Event) string {
54         return strings.Join([]string{
55                 event.Source.Component,
56                 event.Source.Host,
57                 event.InvolvedObject.Kind,
58                 event.InvolvedObject.Namespace,
59                 event.InvolvedObject.Name,
60                 event.InvolvedObject.FieldPath,
61                 string(event.InvolvedObject.UID),
62                 event.InvolvedObject.APIVersion,
63                 event.Type,
64                 event.Reason,
65                 event.Message,
66         },
67                 "")
68 }
69
70 // getSpamKey builds unique event key based on source, involvedObject
71 func getSpamKey(event *v1.Event) string {
72         return strings.Join([]string{
73                 event.Source.Component,
74                 event.Source.Host,
75                 event.InvolvedObject.Kind,
76                 event.InvolvedObject.Namespace,
77                 event.InvolvedObject.Name,
78                 string(event.InvolvedObject.UID),
79                 event.InvolvedObject.APIVersion,
80         },
81                 "")
82 }
83
84 // EventFilterFunc is a function that returns true if the event should be skipped
85 type EventFilterFunc func(event *v1.Event) bool
86
87 // EventSourceObjectSpamFilter is responsible for throttling
88 // the amount of events a source and object can produce.
89 type EventSourceObjectSpamFilter struct {
90         sync.RWMutex
91
92         // the cache that manages last synced state
93         cache *lru.Cache
94
95         // burst is the amount of events we allow per source + object
96         burst int
97
98         // qps is the refill rate of the token bucket in queries per second
99         qps float32
100
101         // clock is used to allow for testing over a time interval
102         clock clock.Clock
103 }
104
105 // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
106 func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
107         return &EventSourceObjectSpamFilter{
108                 cache: lru.New(lruCacheSize),
109                 burst: burst,
110                 qps:   qps,
111                 clock: clock,
112         }
113 }
114
115 // spamRecord holds data used to perform spam filtering decisions.
116 type spamRecord struct {
117         // rateLimiter controls the rate of events about this object
118         rateLimiter flowcontrol.RateLimiter
119 }
120
121 // Filter controls that a given source+object are not exceeding the allowed rate.
122 func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
123         var record spamRecord
124
125         // controls our cached information about this event (source+object)
126         eventKey := getSpamKey(event)
127
128         // do we have a record of similar events in our cache?
129         f.Lock()
130         defer f.Unlock()
131         value, found := f.cache.Get(eventKey)
132         if found {
133                 record = value.(spamRecord)
134         }
135
136         // verify we have a rate limiter for this record
137         if record.rateLimiter == nil {
138                 record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
139         }
140
141         // ensure we have available rate
142         filter := !record.rateLimiter.TryAccept()
143
144         // update the cache
145         f.cache.Add(eventKey, record)
146
147         return filter
148 }
149
150 // EventAggregatorKeyFunc is responsible for grouping events for aggregation
151 // It returns a tuple of the following:
152 // aggregateKey - key the identifies the aggregate group to bucket this event
153 // localKey - key that makes this event in the local group
154 type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
155
156 // EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
157 func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
158         return strings.Join([]string{
159                 event.Source.Component,
160                 event.Source.Host,
161                 event.InvolvedObject.Kind,
162                 event.InvolvedObject.Namespace,
163                 event.InvolvedObject.Name,
164                 string(event.InvolvedObject.UID),
165                 event.InvolvedObject.APIVersion,
166                 event.Type,
167                 event.Reason,
168         },
169                 ""), event.Message
170 }
171
172 // EventAggregatorMessageFunc is responsible for producing an aggregation message
173 type EventAggregatorMessageFunc func(event *v1.Event) string
174
175 // EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
176 func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
177         return "(combined from similar events): " + event.Message
178 }
179
180 // EventAggregator identifies similar events and aggregates them into a single event
181 type EventAggregator struct {
182         sync.RWMutex
183
184         // The cache that manages aggregation state
185         cache *lru.Cache
186
187         // The function that groups events for aggregation
188         keyFunc EventAggregatorKeyFunc
189
190         // The function that generates a message for an aggregate event
191         messageFunc EventAggregatorMessageFunc
192
193         // The maximum number of events in the specified interval before aggregation occurs
194         maxEvents uint
195
196         // The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
197         maxIntervalInSeconds uint
198
199         // clock is used to allow for testing over a time interval
200         clock clock.Clock
201 }
202
203 // NewEventAggregator returns a new instance of an EventAggregator
204 func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
205         maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
206         return &EventAggregator{
207                 cache:                lru.New(lruCacheSize),
208                 keyFunc:              keyFunc,
209                 messageFunc:          messageFunc,
210                 maxEvents:            uint(maxEvents),
211                 maxIntervalInSeconds: uint(maxIntervalInSeconds),
212                 clock:                clock,
213         }
214 }
215
216 // aggregateRecord holds data used to perform aggregation decisions
217 type aggregateRecord struct {
218         // we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
219         // if the size of this set exceeds the max, we know we need to aggregate
220         localKeys sets.String
221         // The last time at which the aggregate was recorded
222         lastTimestamp metav1.Time
223 }
224
225 // EventAggregate checks if a similar event has been seen according to the
226 // aggregation configuration (max events, max interval, etc) and returns:
227 //
228 // - The (potentially modified) event that should be created
229 // - The cache key for the event, for correlation purposes. This will be set to
230 //   the full key for normal events, and to the result of
231 //   EventAggregatorMessageFunc for aggregate events.
232 func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
233         now := metav1.NewTime(e.clock.Now())
234         var record aggregateRecord
235         // eventKey is the full cache key for this event
236         eventKey := getEventKey(newEvent)
237         // aggregateKey is for the aggregate event, if one is needed.
238         aggregateKey, localKey := e.keyFunc(newEvent)
239
240         // Do we have a record of similar events in our cache?
241         e.Lock()
242         defer e.Unlock()
243         value, found := e.cache.Get(aggregateKey)
244         if found {
245                 record = value.(aggregateRecord)
246         }
247
248         // Is the previous record too old? If so, make a fresh one. Note: if we didn't
249         // find a similar record, its lastTimestamp will be the zero value, so we
250         // create a new one in that case.
251         maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
252         interval := now.Time.Sub(record.lastTimestamp.Time)
253         if interval > maxInterval {
254                 record = aggregateRecord{localKeys: sets.NewString()}
255         }
256
257         // Write the new event into the aggregation record and put it on the cache
258         record.localKeys.Insert(localKey)
259         record.lastTimestamp = now
260         e.cache.Add(aggregateKey, record)
261
262         // If we are not yet over the threshold for unique events, don't correlate them
263         if uint(record.localKeys.Len()) < e.maxEvents {
264                 return newEvent, eventKey
265         }
266
267         // do not grow our local key set any larger than max
268         record.localKeys.PopAny()
269
270         // create a new aggregate event, and return the aggregateKey as the cache key
271         // (so that it can be overwritten.)
272         eventCopy := &v1.Event{
273                 ObjectMeta: metav1.ObjectMeta{
274                         Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
275                         Namespace: newEvent.Namespace,
276                 },
277                 Count:          1,
278                 FirstTimestamp: now,
279                 InvolvedObject: newEvent.InvolvedObject,
280                 LastTimestamp:  now,
281                 Message:        e.messageFunc(newEvent),
282                 Type:           newEvent.Type,
283                 Reason:         newEvent.Reason,
284                 Source:         newEvent.Source,
285         }
286         return eventCopy, aggregateKey
287 }
288
289 // eventLog records data about when an event was observed
290 type eventLog struct {
291         // The number of times the event has occurred since first occurrence.
292         count uint
293
294         // The time at which the event was first recorded.
295         firstTimestamp metav1.Time
296
297         // The unique name of the first occurrence of this event
298         name string
299
300         // Resource version returned from previous interaction with server
301         resourceVersion string
302 }
303
304 // eventLogger logs occurrences of an event
305 type eventLogger struct {
306         sync.RWMutex
307         cache *lru.Cache
308         clock clock.Clock
309 }
310
311 // newEventLogger observes events and counts their frequencies
312 func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
313         return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
314 }
315
316 // eventObserve records an event, or updates an existing one if key is a cache hit
317 func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
318         var (
319                 patch []byte
320                 err   error
321         )
322         eventCopy := *newEvent
323         event := &eventCopy
324
325         e.Lock()
326         defer e.Unlock()
327
328         // Check if there is an existing event we should update
329         lastObservation := e.lastEventObservationFromCache(key)
330
331         // If we found a result, prepare a patch
332         if lastObservation.count > 0 {
333                 // update the event based on the last observation so patch will work as desired
334                 event.Name = lastObservation.name
335                 event.ResourceVersion = lastObservation.resourceVersion
336                 event.FirstTimestamp = lastObservation.firstTimestamp
337                 event.Count = int32(lastObservation.count) + 1
338
339                 eventCopy2 := *event
340                 eventCopy2.Count = 0
341                 eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
342                 eventCopy2.Message = ""
343
344                 newData, _ := json.Marshal(event)
345                 oldData, _ := json.Marshal(eventCopy2)
346                 patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
347         }
348
349         // record our new observation
350         e.cache.Add(
351                 key,
352                 eventLog{
353                         count:           uint(event.Count),
354                         firstTimestamp:  event.FirstTimestamp,
355                         name:            event.Name,
356                         resourceVersion: event.ResourceVersion,
357                 },
358         )
359         return event, patch, err
360 }
361
362 // updateState updates its internal tracking information based on latest server state
363 func (e *eventLogger) updateState(event *v1.Event) {
364         key := getEventKey(event)
365         e.Lock()
366         defer e.Unlock()
367         // record our new observation
368         e.cache.Add(
369                 key,
370                 eventLog{
371                         count:           uint(event.Count),
372                         firstTimestamp:  event.FirstTimestamp,
373                         name:            event.Name,
374                         resourceVersion: event.ResourceVersion,
375                 },
376         )
377 }
378
379 // lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
380 func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
381         value, ok := e.cache.Get(key)
382         if ok {
383                 observationValue, ok := value.(eventLog)
384                 if ok {
385                         return observationValue
386                 }
387         }
388         return eventLog{}
389 }
390
391 // EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system.  It can filter all
392 // incoming events to see if the event should be filtered from further processing.  It can aggregate similar events that occur
393 // frequently to protect the system from spamming events that are difficult for users to distinguish.  It performs de-duplication
394 // to ensure events that are observed multiple times are compacted into a single event with increasing counts.
395 type EventCorrelator struct {
396         // the function to filter the event
397         filterFunc EventFilterFunc
398         // the object that performs event aggregation
399         aggregator *EventAggregator
400         // the object that observes events as they come through
401         logger *eventLogger
402 }
403
404 // EventCorrelateResult is the result of a Correlate
405 type EventCorrelateResult struct {
406         // the event after correlation
407         Event *v1.Event
408         // if provided, perform a strategic patch when updating the record on the server
409         Patch []byte
410         // if true, do no further processing of the event
411         Skip bool
412 }
413
414 // NewEventCorrelator returns an EventCorrelator configured with default values.
415 //
416 // The EventCorrelator is responsible for event filtering, aggregating, and counting
417 // prior to interacting with the API server to record the event.
418 //
419 // The default behavior is as follows:
420 //   * Aggregation is performed if a similar event is recorded 10 times in a
421 //     in a 10 minute rolling interval.  A similar event is an event that varies only by
422 //     the Event.Message field.  Rather than recording the precise event, aggregation
423 //     will create a new event whose message reports that it has combined events with
424 //     the same reason.
425 //   * Events are incrementally counted if the exact same event is encountered multiple
426 //     times.
427 //   * A source may burst 25 events about an object, but has a refill rate budget
428 //     per object of 1 event every 5 minutes to control long-tail of spam.
429 func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
430         cacheSize := maxLruCacheEntries
431         spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
432         return &EventCorrelator{
433                 filterFunc: spamFilter.Filter,
434                 aggregator: NewEventAggregator(
435                         cacheSize,
436                         EventAggregatorByReasonFunc,
437                         EventAggregatorByReasonMessageFunc,
438                         defaultAggregateMaxEvents,
439                         defaultAggregateIntervalInSeconds,
440                         clock),
441
442                 logger: newEventLogger(cacheSize, clock),
443         }
444 }
445
446 // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
447 func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
448         if newEvent == nil {
449                 return nil, fmt.Errorf("event is nil")
450         }
451         aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
452         observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
453         if c.filterFunc(observedEvent) {
454                 return &EventCorrelateResult{Skip: true}, nil
455         }
456         return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
457 }
458
459 // UpdateState based on the latest observed state from server
460 func (c *EventCorrelator) UpdateState(event *v1.Event) {
461         c.logger.updateState(event)
462 }