aboutsummaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools/record
diff options
context:
space:
mode:
authorMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
committerMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
commita031b83a09a8628435317a03f199cdc18b78262f (patch)
treebc017a96769ce6de33745b8b0b1304ccf38e9df0 /vendor/k8s.io/client-go/tools/record
parent2b74391cd5281f6fdf391ff8ad50fd1490f6bf89 (diff)
downloadpodman-a031b83a09a8628435317a03f199cdc18b78262f.tar.gz
podman-a031b83a09a8628435317a03f199cdc18b78262f.tar.bz2
podman-a031b83a09a8628435317a03f199cdc18b78262f.zip
Initial checkin from CRI-O repo
Signed-off-by: Matthew Heon <matthew.heon@gmail.com>
Diffstat (limited to 'vendor/k8s.io/client-go/tools/record')
-rw-r--r--vendor/k8s.io/client-go/tools/record/doc.go18
-rw-r--r--vendor/k8s.io/client-go/tools/record/event.go318
-rw-r--r--vendor/k8s.io/client-go/tools/record/events_cache.go377
-rw-r--r--vendor/k8s.io/client-go/tools/record/fake.go54
4 files changed, 767 insertions, 0 deletions
diff --git a/vendor/k8s.io/client-go/tools/record/doc.go b/vendor/k8s.io/client-go/tools/record/doc.go
new file mode 100644
index 000000000..657ddecbc
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package record has all client logic for recording and reporting events.
+package record // import "k8s.io/client-go/tools/record"
diff --git a/vendor/k8s.io/client-go/tools/record/event.go b/vendor/k8s.io/client-go/tools/record/event.go
new file mode 100644
index 000000000..6b2fad409
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/event.go
@@ -0,0 +1,318 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package record
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/pkg/api/v1"
+ "k8s.io/client-go/pkg/api/v1/ref"
+ restclient "k8s.io/client-go/rest"
+
+ "net/http"
+
+ "github.com/golang/glog"
+)
+
+const maxTriesPerEvent = 12
+
+var defaultSleepDuration = 10 * time.Second
+
+const maxQueuedEvents = 1000
+
+// EventSink knows how to store events (client.Client implements it.)
+// EventSink must respect the namespace that will be embedded in 'event'.
+// It is assumed that EventSink will return the same sorts of errors as
+// pkg/client's REST client.
+type EventSink interface {
+ Create(event *v1.Event) (*v1.Event, error)
+ Update(event *v1.Event) (*v1.Event, error)
+ Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
+}
+
+// EventRecorder knows how to record events on behalf of an EventSource.
+type EventRecorder interface {
+ // Event constructs an event from the given information and puts it in the queue for sending.
+ // 'object' is the object this event is about. Event will make a reference-- or you may also
+ // pass a reference to the object directly.
+ // 'type' of this event, and can be one of Normal, Warning. New types could be added in future
+ // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
+ // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
+ // to automate handling of events, so imagine people writing switch statements to handle them.
+ // You want to make that easy.
+ // 'message' is intended to be human readable.
+ //
+ // The resulting event will be created in the same namespace as the reference object.
+ Event(object runtime.Object, eventtype, reason, message string)
+
+ // Eventf is just like Event, but with Sprintf for the message field.
+ Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
+
+ // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
+ PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
+}
+
+// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
+type EventBroadcaster interface {
+ // StartEventWatcher starts sending events received from this EventBroadcaster to the given
+ // event handler function. The return value can be ignored or used to stop recording, if
+ // desired.
+ StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
+
+ // StartRecordingToSink starts sending events received from this EventBroadcaster to the given
+ // sink. The return value can be ignored or used to stop recording, if desired.
+ StartRecordingToSink(sink EventSink) watch.Interface
+
+ // StartLogging starts sending events received from this EventBroadcaster to the given logging
+ // function. The return value can be ignored or used to stop recording, if desired.
+ StartLogging(logf func(format string, args ...interface{})) watch.Interface
+
+ // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
+ // with the event source set to the given event source.
+ NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
+}
+
+// Creates a new event broadcaster.
+func NewBroadcaster() EventBroadcaster {
+ return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
+}
+
+func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
+ return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
+}
+
+type eventBroadcasterImpl struct {
+ *watch.Broadcaster
+ sleepDuration time.Duration
+}
+
+// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
+// The return value can be ignored or used to stop recording, if desired.
+// TODO: make me an object with parameterizable queue length and retry interval
+func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
+ // The default math/rand package functions aren't thread safe, so create a
+ // new Rand object for each StartRecording call.
+ randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
+ eventCorrelator := NewEventCorrelator(clock.RealClock{})
+ return eventBroadcaster.StartEventWatcher(
+ func(event *v1.Event) {
+ recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
+ })
+}
+
+func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
+ // Make a copy before modification, because there could be multiple listeners.
+ // Events are safe to copy like this.
+ eventCopy := *event
+ event = &eventCopy
+ result, err := eventCorrelator.EventCorrelate(event)
+ if err != nil {
+ utilruntime.HandleError(err)
+ }
+ if result.Skip {
+ return
+ }
+ tries := 0
+ for {
+ if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
+ break
+ }
+ tries++
+ if tries >= maxTriesPerEvent {
+ glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
+ break
+ }
+ // Randomize the first sleep so that various clients won't all be
+ // synced up if the master goes down.
+ if tries == 1 {
+ time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
+ } else {
+ time.Sleep(sleepDuration)
+ }
+ }
+}
+
+func isKeyNotFoundError(err error) bool {
+ statusErr, _ := err.(*errors.StatusError)
+
+ if statusErr != nil && statusErr.Status().Code == http.StatusNotFound {
+ return true
+ }
+
+ return false
+}
+
+// recordEvent attempts to write event to a sink. It returns true if the event
+// was successfully recorded or discarded, false if it should be retried.
+// If updateExistingEvent is false, it creates a new event, otherwise it updates
+// existing event.
+func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
+ var newEvent *v1.Event
+ var err error
+ if updateExistingEvent {
+ newEvent, err = sink.Patch(event, patch)
+ }
+ // Update can fail because the event may have been removed and it no longer exists.
+ if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
+ // Making sure that ResourceVersion is empty on creation
+ event.ResourceVersion = ""
+ newEvent, err = sink.Create(event)
+ }
+ if err == nil {
+ // we need to update our event correlator with the server returned state to handle name/resourceversion
+ eventCorrelator.UpdateState(newEvent)
+ return true
+ }
+
+ // If we can't contact the server, then hold everything while we keep trying.
+ // Otherwise, something about the event is malformed and we should abandon it.
+ switch err.(type) {
+ case *restclient.RequestConstructionError:
+ // We will construct the request the same next time, so don't keep trying.
+ glog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
+ return true
+ case *errors.StatusError:
+ if errors.IsAlreadyExists(err) {
+ glog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
+ } else {
+ glog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
+ }
+ return true
+ case *errors.UnexpectedObjectError:
+ // We don't expect this; it implies the server's response didn't match a
+ // known pattern. Go ahead and retry.
+ default:
+ // This case includes actual http transport errors. Go ahead and retry.
+ }
+ glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
+ return false
+}
+
+// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
+// The return value can be ignored or used to stop recording, if desired.
+func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
+ return eventBroadcaster.StartEventWatcher(
+ func(e *v1.Event) {
+ logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
+ })
+}
+
+// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
+// The return value can be ignored or used to stop recording, if desired.
+func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
+ watcher := eventBroadcaster.Watch()
+ go func() {
+ defer utilruntime.HandleCrash()
+ for {
+ watchEvent, open := <-watcher.ResultChan()
+ if !open {
+ return
+ }
+ event, ok := watchEvent.Object.(*v1.Event)
+ if !ok {
+ // This is all local, so there's no reason this should
+ // ever happen.
+ continue
+ }
+ eventHandler(event)
+ }
+ }()
+ return watcher
+}
+
+// NewRecorder returns an EventRecorder that records events with the given event source.
+func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
+ return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
+}
+
+type recorderImpl struct {
+ scheme *runtime.Scheme
+ source v1.EventSource
+ *watch.Broadcaster
+ clock clock.Clock
+}
+
+func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp metav1.Time, eventtype, reason, message string) {
+ ref, err := ref.GetReference(recorder.scheme, object)
+ if err != nil {
+ glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
+ return
+ }
+
+ if !validateEventType(eventtype) {
+ glog.Errorf("Unsupported event type: '%v'", eventtype)
+ return
+ }
+
+ event := recorder.makeEvent(ref, eventtype, reason, message)
+ event.Source = recorder.source
+
+ go func() {
+ // NOTE: events should be a non-blocking operation
+ defer utilruntime.HandleCrash()
+ recorder.Action(watch.Added, event)
+ }()
+}
+
+func validateEventType(eventtype string) bool {
+ switch eventtype {
+ case v1.EventTypeNormal, v1.EventTypeWarning:
+ return true
+ }
+ return false
+}
+
+func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
+ recorder.generateEvent(object, metav1.Now(), eventtype, reason, message)
+}
+
+func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+ recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
+ recorder.generateEvent(object, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *v1.Event {
+ t := metav1.Time{Time: recorder.clock.Now()}
+ namespace := ref.Namespace
+ if namespace == "" {
+ namespace = metav1.NamespaceDefault
+ }
+ return &v1.Event{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
+ Namespace: namespace,
+ },
+ InvolvedObject: *ref,
+ Reason: reason,
+ Message: message,
+ FirstTimestamp: t,
+ LastTimestamp: t,
+ Count: 1,
+ Type: eventtype,
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/record/events_cache.go b/vendor/k8s.io/client-go/tools/record/events_cache.go
new file mode 100644
index 000000000..785ec6477
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/events_cache.go
@@ -0,0 +1,377 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package record
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/groupcache/lru"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/clock"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/pkg/api/v1"
+)
+
+const (
+ maxLruCacheEntries = 4096
+
+ // if we see the same event that varies only by message
+ // more than 10 times in a 10 minute period, aggregate the event
+ defaultAggregateMaxEvents = 10
+ defaultAggregateIntervalInSeconds = 600
+)
+
+// getEventKey builds unique event key based on source, involvedObject, reason, message
+func getEventKey(event *v1.Event) string {
+ return strings.Join([]string{
+ event.Source.Component,
+ event.Source.Host,
+ event.InvolvedObject.Kind,
+ event.InvolvedObject.Namespace,
+ event.InvolvedObject.Name,
+ event.InvolvedObject.FieldPath,
+ string(event.InvolvedObject.UID),
+ event.InvolvedObject.APIVersion,
+ event.Type,
+ event.Reason,
+ event.Message,
+ },
+ "")
+}
+
+// EventFilterFunc is a function that returns true if the event should be skipped
+type EventFilterFunc func(event *v1.Event) bool
+
+// DefaultEventFilterFunc returns false for all incoming events
+func DefaultEventFilterFunc(event *v1.Event) bool {
+ return false
+}
+
+// EventAggregatorKeyFunc is responsible for grouping events for aggregation
+// It returns a tuple of the following:
+// aggregateKey - key the identifies the aggregate group to bucket this event
+// localKey - key that makes this event in the local group
+type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
+
+// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
+func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
+ return strings.Join([]string{
+ event.Source.Component,
+ event.Source.Host,
+ event.InvolvedObject.Kind,
+ event.InvolvedObject.Namespace,
+ event.InvolvedObject.Name,
+ string(event.InvolvedObject.UID),
+ event.InvolvedObject.APIVersion,
+ event.Type,
+ event.Reason,
+ },
+ ""), event.Message
+}
+
+// EventAggregatorMessageFunc is responsible for producing an aggregation message
+type EventAggregatorMessageFunc func(event *v1.Event) string
+
+// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
+func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
+ return "(combined from similar events): " + event.Message
+}
+
+// EventAggregator identifies similar events and aggregates them into a single event
+type EventAggregator struct {
+ sync.RWMutex
+
+ // The cache that manages aggregation state
+ cache *lru.Cache
+
+ // The function that groups events for aggregation
+ keyFunc EventAggregatorKeyFunc
+
+ // The function that generates a message for an aggregate event
+ messageFunc EventAggregatorMessageFunc
+
+ // The maximum number of events in the specified interval before aggregation occurs
+ maxEvents uint
+
+ // The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
+ maxIntervalInSeconds uint
+
+ // clock is used to allow for testing over a time interval
+ clock clock.Clock
+}
+
+// NewEventAggregator returns a new instance of an EventAggregator
+func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
+ maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
+ return &EventAggregator{
+ cache: lru.New(lruCacheSize),
+ keyFunc: keyFunc,
+ messageFunc: messageFunc,
+ maxEvents: uint(maxEvents),
+ maxIntervalInSeconds: uint(maxIntervalInSeconds),
+ clock: clock,
+ }
+}
+
+// aggregateRecord holds data used to perform aggregation decisions
+type aggregateRecord struct {
+ // we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
+ // if the size of this set exceeds the max, we know we need to aggregate
+ localKeys sets.String
+ // The last time at which the aggregate was recorded
+ lastTimestamp metav1.Time
+}
+
+// EventAggregate checks if a similar event has been seen according to the
+// aggregation configuration (max events, max interval, etc) and returns:
+//
+// - The (potentially modified) event that should be created
+// - The cache key for the event, for correlation purposes. This will be set to
+// the full key for normal events, and to the result of
+// EventAggregatorMessageFunc for aggregate events.
+func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
+ now := metav1.NewTime(e.clock.Now())
+ var record aggregateRecord
+ // eventKey is the full cache key for this event
+ eventKey := getEventKey(newEvent)
+ // aggregateKey is for the aggregate event, if one is needed.
+ aggregateKey, localKey := e.keyFunc(newEvent)
+
+ // Do we have a record of similar events in our cache?
+ e.Lock()
+ defer e.Unlock()
+ value, found := e.cache.Get(aggregateKey)
+ if found {
+ record = value.(aggregateRecord)
+ }
+
+ // Is the previous record too old? If so, make a fresh one. Note: if we didn't
+ // find a similar record, its lastTimestamp will be the zero value, so we
+ // create a new one in that case.
+ maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
+ interval := now.Time.Sub(record.lastTimestamp.Time)
+ if interval > maxInterval {
+ record = aggregateRecord{localKeys: sets.NewString()}
+ }
+
+ // Write the new event into the aggregation record and put it on the cache
+ record.localKeys.Insert(localKey)
+ record.lastTimestamp = now
+ e.cache.Add(aggregateKey, record)
+
+ // If we are not yet over the threshold for unique events, don't correlate them
+ if uint(record.localKeys.Len()) < e.maxEvents {
+ return newEvent, eventKey
+ }
+
+ // do not grow our local key set any larger than max
+ record.localKeys.PopAny()
+
+ // create a new aggregate event, and return the aggregateKey as the cache key
+ // (so that it can be overwritten.)
+ eventCopy := &v1.Event{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
+ Namespace: newEvent.Namespace,
+ },
+ Count: 1,
+ FirstTimestamp: now,
+ InvolvedObject: newEvent.InvolvedObject,
+ LastTimestamp: now,
+ Message: e.messageFunc(newEvent),
+ Type: newEvent.Type,
+ Reason: newEvent.Reason,
+ Source: newEvent.Source,
+ }
+ return eventCopy, aggregateKey
+}
+
+// eventLog records data about when an event was observed
+type eventLog struct {
+ // The number of times the event has occurred since first occurrence.
+ count uint
+
+ // The time at which the event was first recorded.
+ firstTimestamp metav1.Time
+
+ // The unique name of the first occurrence of this event
+ name string
+
+ // Resource version returned from previous interaction with server
+ resourceVersion string
+}
+
+// eventLogger logs occurrences of an event
+type eventLogger struct {
+ sync.RWMutex
+ cache *lru.Cache
+ clock clock.Clock
+}
+
+// newEventLogger observes events and counts their frequencies
+func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
+ return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
+}
+
+// eventObserve records an event, or updates an existing one if key is a cache hit
+func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
+ var (
+ patch []byte
+ err error
+ )
+ eventCopy := *newEvent
+ event := &eventCopy
+
+ e.Lock()
+ defer e.Unlock()
+
+ // Check if there is an existing event we should update
+ lastObservation := e.lastEventObservationFromCache(key)
+
+ // If we found a result, prepare a patch
+ if lastObservation.count > 0 {
+ // update the event based on the last observation so patch will work as desired
+ event.Name = lastObservation.name
+ event.ResourceVersion = lastObservation.resourceVersion
+ event.FirstTimestamp = lastObservation.firstTimestamp
+ event.Count = int32(lastObservation.count) + 1
+
+ eventCopy2 := *event
+ eventCopy2.Count = 0
+ eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
+ eventCopy2.Message = ""
+
+ newData, _ := json.Marshal(event)
+ oldData, _ := json.Marshal(eventCopy2)
+ patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
+ }
+
+ // record our new observation
+ e.cache.Add(
+ key,
+ eventLog{
+ count: uint(event.Count),
+ firstTimestamp: event.FirstTimestamp,
+ name: event.Name,
+ resourceVersion: event.ResourceVersion,
+ },
+ )
+ return event, patch, err
+}
+
+// updateState updates its internal tracking information based on latest server state
+func (e *eventLogger) updateState(event *v1.Event) {
+ key := getEventKey(event)
+ e.Lock()
+ defer e.Unlock()
+ // record our new observation
+ e.cache.Add(
+ key,
+ eventLog{
+ count: uint(event.Count),
+ firstTimestamp: event.FirstTimestamp,
+ name: event.Name,
+ resourceVersion: event.ResourceVersion,
+ },
+ )
+}
+
+// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
+func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
+ value, ok := e.cache.Get(key)
+ if ok {
+ observationValue, ok := value.(eventLog)
+ if ok {
+ return observationValue
+ }
+ }
+ return eventLog{}
+}
+
+// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
+// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
+// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
+// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
+type EventCorrelator struct {
+ // the function to filter the event
+ filterFunc EventFilterFunc
+ // the object that performs event aggregation
+ aggregator *EventAggregator
+ // the object that observes events as they come through
+ logger *eventLogger
+}
+
+// EventCorrelateResult is the result of a Correlate
+type EventCorrelateResult struct {
+ // the event after correlation
+ Event *v1.Event
+ // if provided, perform a strategic patch when updating the record on the server
+ Patch []byte
+ // if true, do no further processing of the event
+ Skip bool
+}
+
+// NewEventCorrelator returns an EventCorrelator configured with default values.
+//
+// The EventCorrelator is responsible for event filtering, aggregating, and counting
+// prior to interacting with the API server to record the event.
+//
+// The default behavior is as follows:
+// * No events are filtered from being recorded
+// * Aggregation is performed if a similar event is recorded 10 times in a
+// in a 10 minute rolling interval. A similar event is an event that varies only by
+// the Event.Message field. Rather than recording the precise event, aggregation
+// will create a new event whose message reports that it has combined events with
+// the same reason.
+// * Events are incrementally counted if the exact same event is encountered multiple
+// times.
+func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
+ cacheSize := maxLruCacheEntries
+ return &EventCorrelator{
+ filterFunc: DefaultEventFilterFunc,
+ aggregator: NewEventAggregator(
+ cacheSize,
+ EventAggregatorByReasonFunc,
+ EventAggregatorByReasonMessageFunc,
+ defaultAggregateMaxEvents,
+ defaultAggregateIntervalInSeconds,
+ clock),
+
+ logger: newEventLogger(cacheSize, clock),
+ }
+}
+
+// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
+func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
+ if c.filterFunc(newEvent) {
+ return &EventCorrelateResult{Skip: true}, nil
+ }
+ aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
+ observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
+ return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
+}
+
+// UpdateState based on the latest observed state from server
+func (c *EventCorrelator) UpdateState(event *v1.Event) {
+ c.logger.updateState(event)
+}
diff --git a/vendor/k8s.io/client-go/tools/record/fake.go b/vendor/k8s.io/client-go/tools/record/fake.go
new file mode 100644
index 000000000..c0e8eedbb
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/fake.go
@@ -0,0 +1,54 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package record
+
+import (
+ "fmt"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// FakeRecorder is used as a fake during tests. It is thread safe. It is usable
+// when created manually and not by NewFakeRecorder, however all events may be
+// thrown away in this case.
+type FakeRecorder struct {
+ Events chan string
+}
+
+func (f *FakeRecorder) Event(object runtime.Object, eventtype, reason, message string) {
+ if f.Events != nil {
+ f.Events <- fmt.Sprintf("%s %s %s", eventtype, reason, message)
+ }
+}
+
+func (f *FakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+ if f.Events != nil {
+ f.Events <- fmt.Sprintf(eventtype+" "+reason+" "+messageFmt, args...)
+ }
+}
+
+func (f *FakeRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
+}
+
+// NewFakeRecorder creates new fake event recorder with event channel with
+// buffer of given size.
+func NewFakeRecorder(bufferSize int) *FakeRecorder {
+ return &FakeRecorder{
+ Events: make(chan string, bufferSize),
+ }
+}