summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools/record/events_cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/tools/record/events_cache.go')
-rw-r--r--vendor/k8s.io/client-go/tools/record/events_cache.go377
1 files changed, 377 insertions, 0 deletions
diff --git a/vendor/k8s.io/client-go/tools/record/events_cache.go b/vendor/k8s.io/client-go/tools/record/events_cache.go
new file mode 100644
index 000000000..785ec6477
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/events_cache.go
@@ -0,0 +1,377 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package record
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/groupcache/lru"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/clock"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/pkg/api/v1"
+)
+
+const (
+ maxLruCacheEntries = 4096
+
+ // if we see the same event that varies only by message
+ // more than 10 times in a 10 minute period, aggregate the event
+ defaultAggregateMaxEvents = 10
+ defaultAggregateIntervalInSeconds = 600
+)
+
+// getEventKey builds unique event key based on source, involvedObject, reason, message
+func getEventKey(event *v1.Event) string {
+ return strings.Join([]string{
+ event.Source.Component,
+ event.Source.Host,
+ event.InvolvedObject.Kind,
+ event.InvolvedObject.Namespace,
+ event.InvolvedObject.Name,
+ event.InvolvedObject.FieldPath,
+ string(event.InvolvedObject.UID),
+ event.InvolvedObject.APIVersion,
+ event.Type,
+ event.Reason,
+ event.Message,
+ },
+ "")
+}
+
+// EventFilterFunc is a function that returns true if the event should be skipped
+type EventFilterFunc func(event *v1.Event) bool
+
+// DefaultEventFilterFunc returns false for all incoming events
+func DefaultEventFilterFunc(event *v1.Event) bool {
+ return false
+}
+
+// EventAggregatorKeyFunc is responsible for grouping events for aggregation
+// It returns a tuple of the following:
+// aggregateKey - key the identifies the aggregate group to bucket this event
+// localKey - key that makes this event in the local group
+type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
+
+// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
+func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
+ return strings.Join([]string{
+ event.Source.Component,
+ event.Source.Host,
+ event.InvolvedObject.Kind,
+ event.InvolvedObject.Namespace,
+ event.InvolvedObject.Name,
+ string(event.InvolvedObject.UID),
+ event.InvolvedObject.APIVersion,
+ event.Type,
+ event.Reason,
+ },
+ ""), event.Message
+}
+
+// EventAggregatorMessageFunc is responsible for producing an aggregation message
+type EventAggregatorMessageFunc func(event *v1.Event) string
+
+// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
+func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
+ return "(combined from similar events): " + event.Message
+}
+
+// EventAggregator identifies similar events and aggregates them into a single event
+type EventAggregator struct {
+ sync.RWMutex
+
+ // The cache that manages aggregation state
+ cache *lru.Cache
+
+ // The function that groups events for aggregation
+ keyFunc EventAggregatorKeyFunc
+
+ // The function that generates a message for an aggregate event
+ messageFunc EventAggregatorMessageFunc
+
+ // The maximum number of events in the specified interval before aggregation occurs
+ maxEvents uint
+
+ // The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
+ maxIntervalInSeconds uint
+
+ // clock is used to allow for testing over a time interval
+ clock clock.Clock
+}
+
+// NewEventAggregator returns a new instance of an EventAggregator
+func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
+ maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
+ return &EventAggregator{
+ cache: lru.New(lruCacheSize),
+ keyFunc: keyFunc,
+ messageFunc: messageFunc,
+ maxEvents: uint(maxEvents),
+ maxIntervalInSeconds: uint(maxIntervalInSeconds),
+ clock: clock,
+ }
+}
+
+// aggregateRecord holds data used to perform aggregation decisions
+type aggregateRecord struct {
+ // we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
+ // if the size of this set exceeds the max, we know we need to aggregate
+ localKeys sets.String
+ // The last time at which the aggregate was recorded
+ lastTimestamp metav1.Time
+}
+
+// EventAggregate checks if a similar event has been seen according to the
+// aggregation configuration (max events, max interval, etc) and returns:
+//
+// - The (potentially modified) event that should be created
+// - The cache key for the event, for correlation purposes. This will be set to
+// the full key for normal events, and to the result of
+// EventAggregatorMessageFunc for aggregate events.
+func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
+ now := metav1.NewTime(e.clock.Now())
+ var record aggregateRecord
+ // eventKey is the full cache key for this event
+ eventKey := getEventKey(newEvent)
+ // aggregateKey is for the aggregate event, if one is needed.
+ aggregateKey, localKey := e.keyFunc(newEvent)
+
+ // Do we have a record of similar events in our cache?
+ e.Lock()
+ defer e.Unlock()
+ value, found := e.cache.Get(aggregateKey)
+ if found {
+ record = value.(aggregateRecord)
+ }
+
+ // Is the previous record too old? If so, make a fresh one. Note: if we didn't
+ // find a similar record, its lastTimestamp will be the zero value, so we
+ // create a new one in that case.
+ maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
+ interval := now.Time.Sub(record.lastTimestamp.Time)
+ if interval > maxInterval {
+ record = aggregateRecord{localKeys: sets.NewString()}
+ }
+
+ // Write the new event into the aggregation record and put it on the cache
+ record.localKeys.Insert(localKey)
+ record.lastTimestamp = now
+ e.cache.Add(aggregateKey, record)
+
+ // If we are not yet over the threshold for unique events, don't correlate them
+ if uint(record.localKeys.Len()) < e.maxEvents {
+ return newEvent, eventKey
+ }
+
+ // do not grow our local key set any larger than max
+ record.localKeys.PopAny()
+
+ // create a new aggregate event, and return the aggregateKey as the cache key
+ // (so that it can be overwritten.)
+ eventCopy := &v1.Event{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
+ Namespace: newEvent.Namespace,
+ },
+ Count: 1,
+ FirstTimestamp: now,
+ InvolvedObject: newEvent.InvolvedObject,
+ LastTimestamp: now,
+ Message: e.messageFunc(newEvent),
+ Type: newEvent.Type,
+ Reason: newEvent.Reason,
+ Source: newEvent.Source,
+ }
+ return eventCopy, aggregateKey
+}
+
+// eventLog records data about when an event was observed
+type eventLog struct {
+ // The number of times the event has occurred since first occurrence.
+ count uint
+
+ // The time at which the event was first recorded.
+ firstTimestamp metav1.Time
+
+ // The unique name of the first occurrence of this event
+ name string
+
+ // Resource version returned from previous interaction with server
+ resourceVersion string
+}
+
+// eventLogger logs occurrences of an event
+type eventLogger struct {
+ sync.RWMutex
+ cache *lru.Cache
+ clock clock.Clock
+}
+
+// newEventLogger observes events and counts their frequencies
+func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
+ return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
+}
+
+// eventObserve records an event, or updates an existing one if key is a cache hit
+func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
+ var (
+ patch []byte
+ err error
+ )
+ eventCopy := *newEvent
+ event := &eventCopy
+
+ e.Lock()
+ defer e.Unlock()
+
+ // Check if there is an existing event we should update
+ lastObservation := e.lastEventObservationFromCache(key)
+
+ // If we found a result, prepare a patch
+ if lastObservation.count > 0 {
+ // update the event based on the last observation so patch will work as desired
+ event.Name = lastObservation.name
+ event.ResourceVersion = lastObservation.resourceVersion
+ event.FirstTimestamp = lastObservation.firstTimestamp
+ event.Count = int32(lastObservation.count) + 1
+
+ eventCopy2 := *event
+ eventCopy2.Count = 0
+ eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
+ eventCopy2.Message = ""
+
+ newData, _ := json.Marshal(event)
+ oldData, _ := json.Marshal(eventCopy2)
+ patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
+ }
+
+ // record our new observation
+ e.cache.Add(
+ key,
+ eventLog{
+ count: uint(event.Count),
+ firstTimestamp: event.FirstTimestamp,
+ name: event.Name,
+ resourceVersion: event.ResourceVersion,
+ },
+ )
+ return event, patch, err
+}
+
+// updateState updates its internal tracking information based on latest server state
+func (e *eventLogger) updateState(event *v1.Event) {
+ key := getEventKey(event)
+ e.Lock()
+ defer e.Unlock()
+ // record our new observation
+ e.cache.Add(
+ key,
+ eventLog{
+ count: uint(event.Count),
+ firstTimestamp: event.FirstTimestamp,
+ name: event.Name,
+ resourceVersion: event.ResourceVersion,
+ },
+ )
+}
+
+// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
+func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
+ value, ok := e.cache.Get(key)
+ if ok {
+ observationValue, ok := value.(eventLog)
+ if ok {
+ return observationValue
+ }
+ }
+ return eventLog{}
+}
+
+// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
+// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
+// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
+// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
+type EventCorrelator struct {
+ // the function to filter the event
+ filterFunc EventFilterFunc
+ // the object that performs event aggregation
+ aggregator *EventAggregator
+ // the object that observes events as they come through
+ logger *eventLogger
+}
+
+// EventCorrelateResult is the result of a Correlate
+type EventCorrelateResult struct {
+ // the event after correlation
+ Event *v1.Event
+ // if provided, perform a strategic patch when updating the record on the server
+ Patch []byte
+ // if true, do no further processing of the event
+ Skip bool
+}
+
+// NewEventCorrelator returns an EventCorrelator configured with default values.
+//
+// The EventCorrelator is responsible for event filtering, aggregating, and counting
+// prior to interacting with the API server to record the event.
+//
+// The default behavior is as follows:
+// * No events are filtered from being recorded
+// * Aggregation is performed if a similar event is recorded 10 times in a
+// in a 10 minute rolling interval. A similar event is an event that varies only by
+// the Event.Message field. Rather than recording the precise event, aggregation
+// will create a new event whose message reports that it has combined events with
+// the same reason.
+// * Events are incrementally counted if the exact same event is encountered multiple
+// times.
+func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
+ cacheSize := maxLruCacheEntries
+ return &EventCorrelator{
+ filterFunc: DefaultEventFilterFunc,
+ aggregator: NewEventAggregator(
+ cacheSize,
+ EventAggregatorByReasonFunc,
+ EventAggregatorByReasonMessageFunc,
+ defaultAggregateMaxEvents,
+ defaultAggregateIntervalInSeconds,
+ clock),
+
+ logger: newEventLogger(cacheSize, clock),
+ }
+}
+
+// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
+func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
+ if c.filterFunc(newEvent) {
+ return &EventCorrelateResult{Skip: true}, nil
+ }
+ aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
+ observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
+ return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
+}
+
+// UpdateState based on the latest observed state from server
+func (c *EventCorrelator) UpdateState(event *v1.Event) {
+ c.logger.updateState(event)
+}