2 Copyright 2015 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
26 "github.com/golang/groupcache/lru"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/clock"
31 "k8s.io/apimachinery/pkg/util/sets"
32 "k8s.io/apimachinery/pkg/util/strategicpatch"
33 "k8s.io/client-go/util/flowcontrol"
37 maxLruCacheEntries = 4096
39 // if we see the same event that varies only by message
40 // more than 10 times in a 10 minute period, aggregate the event
41 defaultAggregateMaxEvents = 10
42 defaultAggregateIntervalInSeconds = 600
44 // by default, allow a source to send 25 events about an object
45 // but control the refill rate to 1 new event every 5 minutes
46 // this helps control the long-tail of events for things that are always
49 defaultSpamQPS = 1. / 300.
52 // getEventKey builds unique event key based on source, involvedObject, reason, message
53 func getEventKey(event *v1.Event) string {
54 return strings.Join([]string{
55 event.Source.Component,
57 event.InvolvedObject.Kind,
58 event.InvolvedObject.Namespace,
59 event.InvolvedObject.Name,
60 event.InvolvedObject.FieldPath,
61 string(event.InvolvedObject.UID),
62 event.InvolvedObject.APIVersion,
70 // getSpamKey builds unique event key based on source, involvedObject
71 func getSpamKey(event *v1.Event) string {
72 return strings.Join([]string{
73 event.Source.Component,
75 event.InvolvedObject.Kind,
76 event.InvolvedObject.Namespace,
77 event.InvolvedObject.Name,
78 string(event.InvolvedObject.UID),
79 event.InvolvedObject.APIVersion,
84 // EventFilterFunc is a function that returns true if the event should be skipped
85 type EventFilterFunc func(event *v1.Event) bool
87 // EventSourceObjectSpamFilter is responsible for throttling
88 // the amount of events a source and object can produce.
89 type EventSourceObjectSpamFilter struct {
92 // the cache that manages last synced state
95 // burst is the amount of events we allow per source + object
98 // qps is the refill rate of the token bucket in queries per second
101 // clock is used to allow for testing over a time interval
105 // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
106 func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
107 return &EventSourceObjectSpamFilter{
108 cache: lru.New(lruCacheSize),
115 // spamRecord holds data used to perform spam filtering decisions.
116 type spamRecord struct {
117 // rateLimiter controls the rate of events about this object
118 rateLimiter flowcontrol.RateLimiter
121 // Filter controls that a given source+object are not exceeding the allowed rate.
122 func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
123 var record spamRecord
125 // controls our cached information about this event (source+object)
126 eventKey := getSpamKey(event)
128 // do we have a record of similar events in our cache?
131 value, found := f.cache.Get(eventKey)
133 record = value.(spamRecord)
136 // verify we have a rate limiter for this record
137 if record.rateLimiter == nil {
138 record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
141 // ensure we have available rate
142 filter := !record.rateLimiter.TryAccept()
145 f.cache.Add(eventKey, record)
150 // EventAggregatorKeyFunc is responsible for grouping events for aggregation
151 // It returns a tuple of the following:
152 // aggregateKey - key the identifies the aggregate group to bucket this event
153 // localKey - key that makes this event in the local group
154 type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
156 // EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
157 func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
158 return strings.Join([]string{
159 event.Source.Component,
161 event.InvolvedObject.Kind,
162 event.InvolvedObject.Namespace,
163 event.InvolvedObject.Name,
164 string(event.InvolvedObject.UID),
165 event.InvolvedObject.APIVersion,
172 // EventAggregatorMessageFunc is responsible for producing an aggregation message
173 type EventAggregatorMessageFunc func(event *v1.Event) string
175 // EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
176 func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
177 return "(combined from similar events): " + event.Message
180 // EventAggregator identifies similar events and aggregates them into a single event
181 type EventAggregator struct {
184 // The cache that manages aggregation state
187 // The function that groups events for aggregation
188 keyFunc EventAggregatorKeyFunc
190 // The function that generates a message for an aggregate event
191 messageFunc EventAggregatorMessageFunc
193 // The maximum number of events in the specified interval before aggregation occurs
196 // The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
197 maxIntervalInSeconds uint
199 // clock is used to allow for testing over a time interval
203 // NewEventAggregator returns a new instance of an EventAggregator
204 func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
205 maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
206 return &EventAggregator{
207 cache: lru.New(lruCacheSize),
209 messageFunc: messageFunc,
210 maxEvents: uint(maxEvents),
211 maxIntervalInSeconds: uint(maxIntervalInSeconds),
216 // aggregateRecord holds data used to perform aggregation decisions
217 type aggregateRecord struct {
218 // we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
219 // if the size of this set exceeds the max, we know we need to aggregate
220 localKeys sets.String
221 // The last time at which the aggregate was recorded
222 lastTimestamp metav1.Time
225 // EventAggregate checks if a similar event has been seen according to the
226 // aggregation configuration (max events, max interval, etc) and returns:
228 // - The (potentially modified) event that should be created
229 // - The cache key for the event, for correlation purposes. This will be set to
230 // the full key for normal events, and to the result of
231 // EventAggregatorMessageFunc for aggregate events.
232 func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
233 now := metav1.NewTime(e.clock.Now())
234 var record aggregateRecord
235 // eventKey is the full cache key for this event
236 eventKey := getEventKey(newEvent)
237 // aggregateKey is for the aggregate event, if one is needed.
238 aggregateKey, localKey := e.keyFunc(newEvent)
240 // Do we have a record of similar events in our cache?
243 value, found := e.cache.Get(aggregateKey)
245 record = value.(aggregateRecord)
248 // Is the previous record too old? If so, make a fresh one. Note: if we didn't
249 // find a similar record, its lastTimestamp will be the zero value, so we
250 // create a new one in that case.
251 maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
252 interval := now.Time.Sub(record.lastTimestamp.Time)
253 if interval > maxInterval {
254 record = aggregateRecord{localKeys: sets.NewString()}
257 // Write the new event into the aggregation record and put it on the cache
258 record.localKeys.Insert(localKey)
259 record.lastTimestamp = now
260 e.cache.Add(aggregateKey, record)
262 // If we are not yet over the threshold for unique events, don't correlate them
263 if uint(record.localKeys.Len()) < e.maxEvents {
264 return newEvent, eventKey
267 // do not grow our local key set any larger than max
268 record.localKeys.PopAny()
270 // create a new aggregate event, and return the aggregateKey as the cache key
271 // (so that it can be overwritten.)
272 eventCopy := &v1.Event{
273 ObjectMeta: metav1.ObjectMeta{
274 Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
275 Namespace: newEvent.Namespace,
279 InvolvedObject: newEvent.InvolvedObject,
281 Message: e.messageFunc(newEvent),
283 Reason: newEvent.Reason,
284 Source: newEvent.Source,
286 return eventCopy, aggregateKey
289 // eventLog records data about when an event was observed
290 type eventLog struct {
291 // The number of times the event has occurred since first occurrence.
294 // The time at which the event was first recorded.
295 firstTimestamp metav1.Time
297 // The unique name of the first occurrence of this event
300 // Resource version returned from previous interaction with server
301 resourceVersion string
304 // eventLogger logs occurrences of an event
305 type eventLogger struct {
311 // newEventLogger observes events and counts their frequencies
312 func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
313 return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
316 // eventObserve records an event, or updates an existing one if key is a cache hit
317 func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
322 eventCopy := *newEvent
328 // Check if there is an existing event we should update
329 lastObservation := e.lastEventObservationFromCache(key)
331 // If we found a result, prepare a patch
332 if lastObservation.count > 0 {
333 // update the event based on the last observation so patch will work as desired
334 event.Name = lastObservation.name
335 event.ResourceVersion = lastObservation.resourceVersion
336 event.FirstTimestamp = lastObservation.firstTimestamp
337 event.Count = int32(lastObservation.count) + 1
341 eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
342 eventCopy2.Message = ""
344 newData, _ := json.Marshal(event)
345 oldData, _ := json.Marshal(eventCopy2)
346 patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
349 // record our new observation
353 count: uint(event.Count),
354 firstTimestamp: event.FirstTimestamp,
356 resourceVersion: event.ResourceVersion,
359 return event, patch, err
362 // updateState updates its internal tracking information based on latest server state
363 func (e *eventLogger) updateState(event *v1.Event) {
364 key := getEventKey(event)
367 // record our new observation
371 count: uint(event.Count),
372 firstTimestamp: event.FirstTimestamp,
374 resourceVersion: event.ResourceVersion,
379 // lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
380 func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
381 value, ok := e.cache.Get(key)
383 observationValue, ok := value.(eventLog)
385 return observationValue
391 // EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
392 // incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
393 // frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
394 // to ensure events that are observed multiple times are compacted into a single event with increasing counts.
395 type EventCorrelator struct {
396 // the function to filter the event
397 filterFunc EventFilterFunc
398 // the object that performs event aggregation
399 aggregator *EventAggregator
400 // the object that observes events as they come through
404 // EventCorrelateResult is the result of a Correlate
405 type EventCorrelateResult struct {
406 // the event after correlation
408 // if provided, perform a strategic patch when updating the record on the server
410 // if true, do no further processing of the event
414 // NewEventCorrelator returns an EventCorrelator configured with default values.
416 // The EventCorrelator is responsible for event filtering, aggregating, and counting
417 // prior to interacting with the API server to record the event.
419 // The default behavior is as follows:
420 // * Aggregation is performed if a similar event is recorded 10 times in a
421 // in a 10 minute rolling interval. A similar event is an event that varies only by
422 // the Event.Message field. Rather than recording the precise event, aggregation
423 // will create a new event whose message reports that it has combined events with
425 // * Events are incrementally counted if the exact same event is encountered multiple
427 // * A source may burst 25 events about an object, but has a refill rate budget
428 // per object of 1 event every 5 minutes to control long-tail of spam.
429 func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
430 cacheSize := maxLruCacheEntries
431 spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
432 return &EventCorrelator{
433 filterFunc: spamFilter.Filter,
434 aggregator: NewEventAggregator(
436 EventAggregatorByReasonFunc,
437 EventAggregatorByReasonMessageFunc,
438 defaultAggregateMaxEvents,
439 defaultAggregateIntervalInSeconds,
442 logger: newEventLogger(cacheSize, clock),
446 // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
447 func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
449 return nil, fmt.Errorf("event is nil")
451 aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
452 observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
453 if c.filterFunc(observedEvent) {
454 return &EventCorrelateResult{Skip: true}, nil
456 return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
459 // UpdateState based on the latest observed state from server
460 func (c *EventCorrelator) UpdateState(event *v1.Event) {
461 c.logger.updateState(event)