summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools
diff options
context:
space:
mode:
authorMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
committerMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
commita031b83a09a8628435317a03f199cdc18b78262f (patch)
treebc017a96769ce6de33745b8b0b1304ccf38e9df0 /vendor/k8s.io/client-go/tools
parent2b74391cd5281f6fdf391ff8ad50fd1490f6bf89 (diff)
downloadpodman-a031b83a09a8628435317a03f199cdc18b78262f.tar.gz
podman-a031b83a09a8628435317a03f199cdc18b78262f.tar.bz2
podman-a031b83a09a8628435317a03f199cdc18b78262f.zip
Initial checkin from CRI-O repo
Signed-off-by: Matthew Heon <matthew.heon@gmail.com>
Diffstat (limited to 'vendor/k8s.io/client-go/tools')
-rw-r--r--vendor/k8s.io/client-go/tools/cache/controller.go391
-rw-r--r--vendor/k8s.io/client-go/tools/cache/delta_fifo.go681
-rw-r--r--vendor/k8s.io/client-go/tools/cache/doc.go24
-rw-r--r--vendor/k8s.io/client-go/tools/cache/expiration_cache.go208
-rw-r--r--vendor/k8s.io/client-go/tools/cache/expiration_cache_fakes.go54
-rw-r--r--vendor/k8s.io/client-go/tools/cache/fake_custom_store.go102
-rw-r--r--vendor/k8s.io/client-go/tools/cache/fifo.go358
-rw-r--r--vendor/k8s.io/client-go/tools/cache/index.go87
-rw-r--r--vendor/k8s.io/client-go/tools/cache/listers.go160
-rw-r--r--vendor/k8s.io/client-go/tools/cache/listwatch.go162
-rw-r--r--vendor/k8s.io/client-go/tools/cache/mutation_cache.go261
-rw-r--r--vendor/k8s.io/client-go/tools/cache/mutation_detector.go135
-rw-r--r--vendor/k8s.io/client-go/tools/cache/reflector.go421
-rw-r--r--vendor/k8s.io/client-go/tools/cache/shared_informer.go581
-rwxr-xr-xvendor/k8s.io/client-go/tools/cache/store.go244
-rw-r--r--vendor/k8s.io/client-go/tools/cache/thread_safe_store.go306
-rw-r--r--vendor/k8s.io/client-go/tools/cache/undelta_store.go83
-rw-r--r--vendor/k8s.io/client-go/tools/clientcmd/api/helpers.go183
-rw-r--r--vendor/k8s.io/client-go/tools/clientcmd/api/register.go46
-rw-r--r--vendor/k8s.io/client-go/tools/clientcmd/api/types.go185
-rw-r--r--vendor/k8s.io/client-go/tools/metrics/metrics.go61
-rw-r--r--vendor/k8s.io/client-go/tools/record/doc.go18
-rw-r--r--vendor/k8s.io/client-go/tools/record/event.go318
-rw-r--r--vendor/k8s.io/client-go/tools/record/events_cache.go377
-rw-r--r--vendor/k8s.io/client-go/tools/record/fake.go54
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/doc.go20
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/errorstream.go55
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go178
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/resize.go33
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/v1.go160
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/v2.go195
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/v3.go111
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/v4.go119
33 files changed, 6371 insertions, 0 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/controller.go b/vendor/k8s.io/client-go/tools/cache/controller.go
new file mode 100644
index 000000000..2c97b8658
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/controller.go
@@ -0,0 +1,391 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
+)
+
+// Config contains all the settings for a Controller.
+type Config struct {
+ // The queue for your objects; either a FIFO or
+ // a DeltaFIFO. Your Process() function should accept
+ // the output of this Queue's Pop() method.
+ Queue
+
+ // Something that can list and watch your objects.
+ ListerWatcher
+
+ // Something that can process your objects.
+ Process ProcessFunc
+
+ // The type of your objects.
+ ObjectType runtime.Object
+
+ // Reprocess everything at least this often.
+ // Note that if it takes longer for you to clear the queue than this
+ // period, you will end up processing items in the order determined
+ // by FIFO.Replace(). Currently, this is random. If this is a
+ // problem, we can change that replacement policy to append new
+ // things to the end of the queue instead of replacing the entire
+ // queue.
+ FullResyncPeriod time.Duration
+
+ // ShouldResync, if specified, is invoked when the controller's reflector determines the next
+ // periodic sync should occur. If this returns true, it means the reflector should proceed with
+ // the resync.
+ ShouldResync ShouldResyncFunc
+
+ // If true, when Process() returns an error, re-enqueue the object.
+ // TODO: add interface to let you inject a delay/backoff or drop
+ // the object completely if desired. Pass the object in
+ // question to this interface as a parameter.
+ RetryOnError bool
+}
+
+// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
+// resync or not. It can be used by a shared informer to support multiple event handlers with custom
+// resync periods.
+type ShouldResyncFunc func() bool
+
+// ProcessFunc processes a single object.
+type ProcessFunc func(obj interface{}) error
+
+// Controller is a generic controller framework.
+type controller struct {
+ config Config
+ reflector *Reflector
+ reflectorMutex sync.RWMutex
+ clock clock.Clock
+}
+
+type Controller interface {
+ Run(stopCh <-chan struct{})
+ HasSynced() bool
+ LastSyncResourceVersion() string
+}
+
+// New makes a new Controller from the given Config.
+func New(c *Config) Controller {
+ ctlr := &controller{
+ config: *c,
+ clock: &clock.RealClock{},
+ }
+ return ctlr
+}
+
+// Run begins processing items, and will continue until a value is sent down stopCh.
+// It's an error to call Run more than once.
+// Run blocks; call via go.
+func (c *controller) Run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+ go func() {
+ <-stopCh
+ c.config.Queue.Close()
+ }()
+ r := NewReflector(
+ c.config.ListerWatcher,
+ c.config.ObjectType,
+ c.config.Queue,
+ c.config.FullResyncPeriod,
+ )
+ r.ShouldResync = c.config.ShouldResync
+ r.clock = c.clock
+
+ c.reflectorMutex.Lock()
+ c.reflector = r
+ c.reflectorMutex.Unlock()
+
+ r.RunUntil(stopCh)
+
+ wait.Until(c.processLoop, time.Second, stopCh)
+}
+
+// Returns true once this controller has completed an initial resource listing
+func (c *controller) HasSynced() bool {
+ return c.config.Queue.HasSynced()
+}
+
+func (c *controller) LastSyncResourceVersion() string {
+ if c.reflector == nil {
+ return ""
+ }
+ return c.reflector.LastSyncResourceVersion()
+}
+
+// processLoop drains the work queue.
+// TODO: Consider doing the processing in parallel. This will require a little thought
+// to make sure that we don't end up processing the same object multiple times
+// concurrently.
+//
+// TODO: Plumb through the stopCh here (and down to the queue) so that this can
+// actually exit when the controller is stopped. Or just give up on this stuff
+// ever being stoppable. Converting this whole package to use Context would
+// also be helpful.
+func (c *controller) processLoop() {
+ for {
+ obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
+ if err != nil {
+ if err == FIFOClosedError {
+ return
+ }
+ if c.config.RetryOnError {
+ // This is the safe way to re-enqueue.
+ c.config.Queue.AddIfNotPresent(obj)
+ }
+ }
+ }
+}
+
+// ResourceEventHandler can handle notifications for events that happen to a
+// resource. The events are informational only, so you can't return an
+// error.
+// * OnAdd is called when an object is added.
+// * OnUpdate is called when an object is modified. Note that oldObj is the
+// last known state of the object-- it is possible that several changes
+// were combined together, so you can't use this to see every single
+// change. OnUpdate is also called when a re-list happens, and it will
+// get called even if nothing changed. This is useful for periodically
+// evaluating or syncing something.
+// * OnDelete will get the final state of the item if it is known, otherwise
+// it will get an object of type DeletedFinalStateUnknown. This can
+// happen if the watch is closed and misses the delete event and we don't
+// notice the deletion until the subsequent re-list.
+type ResourceEventHandler interface {
+ OnAdd(obj interface{})
+ OnUpdate(oldObj, newObj interface{})
+ OnDelete(obj interface{})
+}
+
+// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
+// as few of the notification functions as you want while still implementing
+// ResourceEventHandler.
+type ResourceEventHandlerFuncs struct {
+ AddFunc func(obj interface{})
+ UpdateFunc func(oldObj, newObj interface{})
+ DeleteFunc func(obj interface{})
+}
+
+// OnAdd calls AddFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
+ if r.AddFunc != nil {
+ r.AddFunc(obj)
+ }
+}
+
+// OnUpdate calls UpdateFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
+ if r.UpdateFunc != nil {
+ r.UpdateFunc(oldObj, newObj)
+ }
+}
+
+// OnDelete calls DeleteFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
+ if r.DeleteFunc != nil {
+ r.DeleteFunc(obj)
+ }
+}
+
+// FilteringResourceEventHandler applies the provided filter to all events coming
+// in, ensuring the appropriate nested handler method is invoked. An object
+// that starts passing the filter after an update is considered an add, and an
+// object that stops passing the filter after an update is considered a delete.
+type FilteringResourceEventHandler struct {
+ FilterFunc func(obj interface{}) bool
+ Handler ResourceEventHandler
+}
+
+// OnAdd calls the nested handler only if the filter succeeds
+func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
+ if !r.FilterFunc(obj) {
+ return
+ }
+ r.Handler.OnAdd(obj)
+}
+
+// OnUpdate ensures the proper handler is called depending on whether the filter matches
+func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
+ newer := r.FilterFunc(newObj)
+ older := r.FilterFunc(oldObj)
+ switch {
+ case newer && older:
+ r.Handler.OnUpdate(oldObj, newObj)
+ case newer && !older:
+ r.Handler.OnAdd(newObj)
+ case !newer && older:
+ r.Handler.OnDelete(oldObj)
+ default:
+ // do nothing
+ }
+}
+
+// OnDelete calls the nested handler only if the filter succeeds
+func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
+ if !r.FilterFunc(obj) {
+ return
+ }
+ r.Handler.OnDelete(obj)
+}
+
+// DeletionHandlingMetaNamespaceKeyFunc checks for
+// DeletedFinalStateUnknown objects before calling
+// MetaNamespaceKeyFunc.
+func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
+ if d, ok := obj.(DeletedFinalStateUnknown); ok {
+ return d.Key, nil
+ }
+ return MetaNamespaceKeyFunc(obj)
+}
+
+// NewInformer returns a Store and a controller for populating the store
+// while also providing event notifications. You should only used the returned
+// Store for Get/List operations; Add/Modify/Deletes will cause the event
+// notifications to be faulty.
+//
+// Parameters:
+// * lw is list and watch functions for the source of the resource you want to
+// be informed of.
+// * objType is an object of the type that you expect to receive.
+// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
+// calls, even if nothing changed). Otherwise, re-list will be delayed as
+// long as possible (until the upstream source closes the watch or times out,
+// or you stop the controller).
+// * h is the object you want notifications sent to.
+//
+func NewInformer(
+ lw ListerWatcher,
+ objType runtime.Object,
+ resyncPeriod time.Duration,
+ h ResourceEventHandler,
+) (Store, Controller) {
+ // This will hold the client state, as we know it.
+ clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
+
+ // This will hold incoming changes. Note how we pass clientState in as a
+ // KeyLister, that way resync operations will result in the correct set
+ // of update/delete deltas.
+ fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+
+ Process: func(obj interface{}) error {
+ // from oldest to newest
+ for _, d := range obj.(Deltas) {
+ switch d.Type {
+ case Sync, Added, Updated:
+ if old, exists, err := clientState.Get(d.Object); err == nil && exists {
+ if err := clientState.Update(d.Object); err != nil {
+ return err
+ }
+ h.OnUpdate(old, d.Object)
+ } else {
+ if err := clientState.Add(d.Object); err != nil {
+ return err
+ }
+ h.OnAdd(d.Object)
+ }
+ case Deleted:
+ if err := clientState.Delete(d.Object); err != nil {
+ return err
+ }
+ h.OnDelete(d.Object)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, New(cfg)
+}
+
+// NewIndexerInformer returns a Indexer and a controller for populating the index
+// while also providing event notifications. You should only used the returned
+// Index for Get/List operations; Add/Modify/Deletes will cause the event
+// notifications to be faulty.
+//
+// Parameters:
+// * lw is list and watch functions for the source of the resource you want to
+// be informed of.
+// * objType is an object of the type that you expect to receive.
+// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
+// calls, even if nothing changed). Otherwise, re-list will be delayed as
+// long as possible (until the upstream source closes the watch or times out,
+// or you stop the controller).
+// * h is the object you want notifications sent to.
+// * indexers is the indexer for the received object type.
+//
+func NewIndexerInformer(
+ lw ListerWatcher,
+ objType runtime.Object,
+ resyncPeriod time.Duration,
+ h ResourceEventHandler,
+ indexers Indexers,
+) (Indexer, Controller) {
+ // This will hold the client state, as we know it.
+ clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
+
+ // This will hold incoming changes. Note how we pass clientState in as a
+ // KeyLister, that way resync operations will result in the correct set
+ // of update/delete deltas.
+ fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+
+ Process: func(obj interface{}) error {
+ // from oldest to newest
+ for _, d := range obj.(Deltas) {
+ switch d.Type {
+ case Sync, Added, Updated:
+ if old, exists, err := clientState.Get(d.Object); err == nil && exists {
+ if err := clientState.Update(d.Object); err != nil {
+ return err
+ }
+ h.OnUpdate(old, d.Object)
+ } else {
+ if err := clientState.Add(d.Object); err != nil {
+ return err
+ }
+ h.OnAdd(d.Object)
+ }
+ case Deleted:
+ if err := clientState.Delete(d.Object); err != nil {
+ return err
+ }
+ h.OnDelete(d.Object)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, New(cfg)
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/delta_fifo.go b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
new file mode 100644
index 000000000..a71db6048
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
@@ -0,0 +1,681 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ "k8s.io/apimachinery/pkg/util/sets"
+
+ "github.com/golang/glog"
+)
+
+// NewDeltaFIFO returns a Store which can be used process changes to items.
+//
+// keyFunc is used to figure out what key an object should have. (It's
+// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
+//
+// 'compressor' may compress as many or as few items as it wants
+// (including returning an empty slice), but it should do what it
+// does quickly since it is called while the queue is locked.
+// 'compressor' may be nil if you don't want any delta compression.
+//
+// 'keyLister' is expected to return a list of keys that the consumer of
+// this queue "knows about". It is used to decide which items are missing
+// when Replace() is called; 'Deleted' deltas are produced for these items.
+// It may be nil if you don't need to detect all deletions.
+// TODO: consider merging keyLister with this object, tracking a list of
+// "known" keys when Pop() is called. Have to think about how that
+// affects error retrying.
+// TODO(lavalamp): I believe there is a possible race only when using an
+// external known object source that the above TODO would
+// fix.
+//
+// Also see the comment on DeltaFIFO.
+func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
+ f := &DeltaFIFO{
+ items: map[string]Deltas{},
+ queue: []string{},
+ keyFunc: keyFunc,
+ deltaCompressor: compressor,
+ knownObjects: knownObjects,
+ }
+ f.cond.L = &f.lock
+ return f
+}
+
+// DeltaFIFO is like FIFO, but allows you to process deletes.
+//
+// DeltaFIFO is a producer-consumer queue, where a Reflector is
+// intended to be the producer, and the consumer is whatever calls
+// the Pop() method.
+//
+// DeltaFIFO solves this use case:
+// * You want to process every object change (delta) at most once.
+// * When you process an object, you want to see everything
+// that's happened to it since you last processed it.
+// * You want to process the deletion of objects.
+// * You might want to periodically reprocess objects.
+//
+// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
+// interface{} to satisfy the Store/Queue interfaces, but it
+// will always return an object of type Deltas.
+//
+// A note on threading: If you call Pop() in parallel from multiple
+// threads, you could end up with multiple threads processing slightly
+// different versions of the same object.
+//
+// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
+// to list keys that are "known", for the purpose of figuring out which
+// items have been deleted when Replace() or Delete() are called. The deleted
+// object will be included in the DeleteFinalStateUnknown markers. These objects
+// could be stale.
+//
+// You may provide a function to compress deltas (e.g., represent a
+// series of Updates as a single Update).
+type DeltaFIFO struct {
+ // lock/cond protects access to 'items' and 'queue'.
+ lock sync.RWMutex
+ cond sync.Cond
+
+ // We depend on the property that items in the set are in
+ // the queue and vice versa, and that all Deltas in this
+ // map have at least one Delta.
+ items map[string]Deltas
+ queue []string
+
+ // populated is true if the first batch of items inserted by Replace() has been populated
+ // or Delete/Add/Update was called first.
+ populated bool
+ // initialPopulationCount is the number of items inserted by the first call of Replace()
+ initialPopulationCount int
+
+ // keyFunc is used to make the key used for queued item
+ // insertion and retrieval, and should be deterministic.
+ keyFunc KeyFunc
+
+ // deltaCompressor tells us how to combine two or more
+ // deltas. It may be nil.
+ deltaCompressor DeltaCompressor
+
+ // knownObjects list keys that are "known", for the
+ // purpose of figuring out which items have been deleted
+ // when Replace() or Delete() is called.
+ knownObjects KeyListerGetter
+
+ // Indication the queue is closed.
+ // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
+ // Currently, not used to gate any of CRED operations.
+ closed bool
+ closedLock sync.Mutex
+}
+
+var (
+ _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
+)
+
+var (
+ // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
+ // object with zero length is encountered (should be impossible,
+ // even if such an object is accidentally produced by a DeltaCompressor--
+ // but included for completeness).
+ ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
+)
+
+// Close the queue.
+func (f *DeltaFIFO) Close() {
+ f.closedLock.Lock()
+ defer f.closedLock.Unlock()
+ f.closed = true
+ f.cond.Broadcast()
+}
+
+// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
+// DeletedFinalStateUnknown objects.
+func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
+ if d, ok := obj.(Deltas); ok {
+ if len(d) == 0 {
+ return "", KeyError{obj, ErrZeroLengthDeltasObject}
+ }
+ obj = d.Newest().Object
+ }
+ if d, ok := obj.(DeletedFinalStateUnknown); ok {
+ return d.Key, nil
+ }
+ return f.keyFunc(obj)
+}
+
+// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
+// or an Update called first but the first batch of items inserted by Replace() has been popped
+func (f *DeltaFIFO) HasSynced() bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ return f.populated && f.initialPopulationCount == 0
+}
+
+// Add inserts an item, and puts it in the queue. The item is only enqueued
+// if it doesn't already exist in the set.
+func (f *DeltaFIFO) Add(obj interface{}) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ return f.queueActionLocked(Added, obj)
+}
+
+// Update is just like Add, but makes an Updated Delta.
+func (f *DeltaFIFO) Update(obj interface{}) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ return f.queueActionLocked(Updated, obj)
+}
+
+// Delete is just like Add, but makes an Deleted Delta. If the item does not
+// already exist, it will be ignored. (It may have already been deleted by a
+// Replace (re-list), for example.
+func (f *DeltaFIFO) Delete(obj interface{}) error {
+ id, err := f.KeyOf(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ if f.knownObjects == nil {
+ if _, exists := f.items[id]; !exists {
+ // Presumably, this was deleted when a relist happened.
+ // Don't provide a second report of the same deletion.
+ return nil
+ }
+ } else {
+ // We only want to skip the "deletion" action if the object doesn't
+ // exist in knownObjects and it doesn't have corresponding item in items.
+ // Note that even if there is a "deletion" action in items, we can ignore it,
+ // because it will be deduped automatically in "queueActionLocked"
+ _, exists, err := f.knownObjects.GetByKey(id)
+ _, itemsExist := f.items[id]
+ if err == nil && !exists && !itemsExist {
+ // Presumably, this was deleted when a relist happened.
+ // Don't provide a second report of the same deletion.
+ // TODO(lavalamp): This may be racy-- we aren't properly locked
+ // with knownObjects.
+ return nil
+ }
+ }
+
+ return f.queueActionLocked(Deleted, obj)
+}
+
+// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
+// present in the set, it is neither enqueued nor added to the set.
+//
+// This is useful in a single producer/consumer scenario so that the consumer can
+// safely retry items without contending with the producer and potentially enqueueing
+// stale items.
+//
+// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
+// different from the Add/Update/Delete functions.
+func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
+ deltas, ok := obj.(Deltas)
+ if !ok {
+ return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
+ }
+ id, err := f.KeyOf(deltas.Newest().Object)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.addIfNotPresent(id, deltas)
+ return nil
+}
+
+// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
+// already holds the fifo lock.
+func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
+ f.populated = true
+ if _, exists := f.items[id]; exists {
+ return
+ }
+
+ f.queue = append(f.queue, id)
+ f.items[id] = deltas
+ f.cond.Broadcast()
+}
+
+// re-listing and watching can deliver the same update multiple times in any
+// order. This will combine the most recent two deltas if they are the same.
+func dedupDeltas(deltas Deltas) Deltas {
+ n := len(deltas)
+ if n < 2 {
+ return deltas
+ }
+ a := &deltas[n-1]
+ b := &deltas[n-2]
+ if out := isDup(a, b); out != nil {
+ d := append(Deltas{}, deltas[:n-2]...)
+ return append(d, *out)
+ }
+ return deltas
+}
+
+// If a & b represent the same event, returns the delta that ought to be kept.
+// Otherwise, returns nil.
+// TODO: is there anything other than deletions that need deduping?
+func isDup(a, b *Delta) *Delta {
+ if out := isDeletionDup(a, b); out != nil {
+ return out
+ }
+ // TODO: Detect other duplicate situations? Are there any?
+ return nil
+}
+
+// keep the one with the most information if both are deletions.
+func isDeletionDup(a, b *Delta) *Delta {
+ if b.Type != Deleted || a.Type != Deleted {
+ return nil
+ }
+ // Do more sophisticated checks, or is this sufficient?
+ if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
+ return a
+ }
+ return b
+}
+
+// willObjectBeDeletedLocked returns true only if the last delta for the
+// given object is Delete. Caller must lock first.
+func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
+ deltas := f.items[id]
+ return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
+}
+
+// queueActionLocked appends to the delta list for the object, calling
+// f.deltaCompressor if needed. Caller must lock first.
+func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
+ id, err := f.KeyOf(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+
+ // If object is supposed to be deleted (last event is Deleted),
+ // then we should ignore Sync events, because it would result in
+ // recreation of this object.
+ if actionType == Sync && f.willObjectBeDeletedLocked(id) {
+ return nil
+ }
+
+ newDeltas := append(f.items[id], Delta{actionType, obj})
+ newDeltas = dedupDeltas(newDeltas)
+ if f.deltaCompressor != nil {
+ newDeltas = f.deltaCompressor.Compress(newDeltas)
+ }
+
+ _, exists := f.items[id]
+ if len(newDeltas) > 0 {
+ if !exists {
+ f.queue = append(f.queue, id)
+ }
+ f.items[id] = newDeltas
+ f.cond.Broadcast()
+ } else if exists {
+ // The compression step removed all deltas, so
+ // we need to remove this from our map (extra items
+ // in the queue are ignored if they are not in the
+ // map).
+ delete(f.items, id)
+ }
+ return nil
+}
+
+// List returns a list of all the items; it returns the object
+// from the most recent Delta.
+// You should treat the items returned inside the deltas as immutable.
+func (f *DeltaFIFO) List() []interface{} {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ return f.listLocked()
+}
+
+func (f *DeltaFIFO) listLocked() []interface{} {
+ list := make([]interface{}, 0, len(f.items))
+ for _, item := range f.items {
+ // Copy item's slice so operations on this slice (delta
+ // compression) won't interfere with the object we return.
+ item = copyDeltas(item)
+ list = append(list, item.Newest().Object)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the FIFO.
+func (f *DeltaFIFO) ListKeys() []string {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list := make([]string, 0, len(f.items))
+ for key := range f.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+// Get returns the complete list of deltas for the requested item,
+// or sets exists=false.
+// You should treat the items returned inside the deltas as immutable.
+func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ key, err := f.KeyOf(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return f.GetByKey(key)
+}
+
+// GetByKey returns the complete list of deltas for the requested item,
+// setting exists=false if that list is empty.
+// You should treat the items returned inside the deltas as immutable.
+func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ d, exists := f.items[key]
+ if exists {
+ // Copy item's slice so operations on this slice (delta
+ // compression) won't interfere with the object we return.
+ d = copyDeltas(d)
+ }
+ return d, exists, nil
+}
+
+// Checks if the queue is closed
+func (f *DeltaFIFO) IsClosed() bool {
+ f.closedLock.Lock()
+ defer f.closedLock.Unlock()
+ if f.closed {
+ return true
+ }
+ return false
+}
+
+// Pop blocks until an item is added to the queue, and then returns it. If
+// multiple items are ready, they are returned in the order in which they were
+// added/updated. The item is removed from the queue (and the store) before it
+// is returned, so if you don't successfully process it, you need to add it back
+// with AddIfNotPresent().
+// process function is called under lock, so it is safe update data structures
+// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
+// may return an instance of ErrRequeue with a nested error to indicate the current
+// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
+//
+// Pop returns a 'Deltas', which has a complete list of all the things
+// that happened to the object (deltas) while it was sitting in the queue.
+func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ for {
+ for len(f.queue) == 0 {
+ // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
+ // When Close() is called, the f.closed is set and the condition is broadcasted.
+ // Which causes this loop to continue and return from the Pop().
+ if f.IsClosed() {
+ return nil, FIFOClosedError
+ }
+
+ f.cond.Wait()
+ }
+ id := f.queue[0]
+ f.queue = f.queue[1:]
+ item, ok := f.items[id]
+ if f.initialPopulationCount > 0 {
+ f.initialPopulationCount--
+ }
+ if !ok {
+ // Item may have been deleted subsequently.
+ continue
+ }
+ delete(f.items, id)
+ err := process(item)
+ if e, ok := err.(ErrRequeue); ok {
+ f.addIfNotPresent(id, item)
+ err = e.Err
+ }
+ // Don't need to copyDeltas here, because we're transferring
+ // ownership to the caller.
+ return item, err
+ }
+}
+
+// Replace will delete the contents of 'f', using instead the given map.
+// 'f' takes ownership of the map, you should not reference the map again
+// after calling this function. f's queue is reset, too; upon return, it
+// will contain the items in the map, in no particular order.
+func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ keys := make(sets.String, len(list))
+
+ for _, item := range list {
+ key, err := f.KeyOf(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ keys.Insert(key)
+ if err := f.queueActionLocked(Sync, item); err != nil {
+ return fmt.Errorf("couldn't enqueue object: %v", err)
+ }
+ }
+
+ if f.knownObjects == nil {
+ // Do deletion detection against our own list.
+ for k, oldItem := range f.items {
+ if keys.Has(k) {
+ continue
+ }
+ var deletedObj interface{}
+ if n := oldItem.Newest(); n != nil {
+ deletedObj = n.Object
+ }
+ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
+ return err
+ }
+ }
+
+ if !f.populated {
+ f.populated = true
+ f.initialPopulationCount = len(list)
+ }
+
+ return nil
+ }
+
+ // Detect deletions not already in the queue.
+ // TODO(lavalamp): This may be racy-- we aren't properly locked
+ // with knownObjects. Unproven.
+ knownKeys := f.knownObjects.ListKeys()
+ queuedDeletions := 0
+ for _, k := range knownKeys {
+ if keys.Has(k) {
+ continue
+ }
+
+ deletedObj, exists, err := f.knownObjects.GetByKey(k)
+ if err != nil {
+ deletedObj = nil
+ glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
+ } else if !exists {
+ deletedObj = nil
+ glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
+ }
+ queuedDeletions++
+ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
+ return err
+ }
+ }
+
+ if !f.populated {
+ f.populated = true
+ f.initialPopulationCount = len(list) + queuedDeletions
+ }
+
+ return nil
+}
+
+// Resync will send a sync event for each item
+func (f *DeltaFIFO) Resync() error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ keys := f.knownObjects.ListKeys()
+ for _, k := range keys {
+ if err := f.syncKeyLocked(k); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (f *DeltaFIFO) syncKey(key string) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ return f.syncKeyLocked(key)
+}
+
+func (f *DeltaFIFO) syncKeyLocked(key string) error {
+ obj, exists, err := f.knownObjects.GetByKey(key)
+ if err != nil {
+ glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
+ return nil
+ } else if !exists {
+ glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
+ return nil
+ }
+
+ // If we are doing Resync() and there is already an event queued for that object,
+ // we ignore the Resync for it. This is to avoid the race, in which the resync
+ // comes with the previous value of object (since queueing an event for the object
+ // doesn't trigger changing the underlying store <knownObjects>.
+ id, err := f.KeyOf(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ if len(f.items[id]) > 0 {
+ return nil
+ }
+
+ if err := f.queueActionLocked(Sync, obj); err != nil {
+ return fmt.Errorf("couldn't queue object: %v", err)
+ }
+ return nil
+}
+
+// A KeyListerGetter is anything that knows how to list its keys and look up by key.
+type KeyListerGetter interface {
+ KeyLister
+ KeyGetter
+}
+
+// A KeyLister is anything that knows how to list its keys.
+type KeyLister interface {
+ ListKeys() []string
+}
+
+// A KeyGetter is anything that knows how to get the value stored under a given key.
+type KeyGetter interface {
+ GetByKey(key string) (interface{}, bool, error)
+}
+
+// DeltaCompressor is an algorithm that removes redundant changes.
+type DeltaCompressor interface {
+ Compress(Deltas) Deltas
+}
+
+// DeltaCompressorFunc should remove redundant changes; but changes that
+// are redundant depend on one's desired semantics, so this is an
+// injectable function.
+//
+// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
+type DeltaCompressorFunc func(Deltas) Deltas
+
+// Compress just calls dc.
+func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
+ return dc(d)
+}
+
+// DeltaType is the type of a change (addition, deletion, etc)
+type DeltaType string
+
+const (
+ Added DeltaType = "Added"
+ Updated DeltaType = "Updated"
+ Deleted DeltaType = "Deleted"
+ // The other types are obvious. You'll get Sync deltas when:
+ // * A watch expires/errors out and a new list/watch cycle is started.
+ // * You've turned on periodic syncs.
+ // (Anything that trigger's DeltaFIFO's Replace() method.)
+ Sync DeltaType = "Sync"
+)
+
+// Delta is the type stored by a DeltaFIFO. It tells you what change
+// happened, and the object's state after* that change.
+//
+// [*] Unless the change is a deletion, and then you'll get the final
+// state of the object before it was deleted.
+type Delta struct {
+ Type DeltaType
+ Object interface{}
+}
+
+// Deltas is a list of one or more 'Delta's to an individual object.
+// The oldest delta is at index 0, the newest delta is the last one.
+type Deltas []Delta
+
+// Oldest is a convenience function that returns the oldest delta, or
+// nil if there are no deltas.
+func (d Deltas) Oldest() *Delta {
+ if len(d) > 0 {
+ return &d[0]
+ }
+ return nil
+}
+
+// Newest is a convenience function that returns the newest delta, or
+// nil if there are no deltas.
+func (d Deltas) Newest() *Delta {
+ if n := len(d); n > 0 {
+ return &d[n-1]
+ }
+ return nil
+}
+
+// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
+// the objects in the slice. This allows Get/List to return an object that we
+// know won't be clobbered by a subsequent call to a delta compressor.
+func copyDeltas(d Deltas) Deltas {
+ d2 := make(Deltas, len(d))
+ copy(d2, d)
+ return d2
+}
+
+// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
+// an object was deleted but the watch deletion event was missed. In this
+// case we don't know the final "resting" state of the object, so there's
+// a chance the included `Obj` is stale.
+type DeletedFinalStateUnknown struct {
+ Key string
+ Obj interface{}
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/doc.go b/vendor/k8s.io/client-go/tools/cache/doc.go
new file mode 100644
index 000000000..56b61d300
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/doc.go
@@ -0,0 +1,24 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package cache is a client-side caching mechanism. It is useful for
+// reducing the number of server calls you'd otherwise need to make.
+// Reflector watches a server and updates a Store. Two stores are provided;
+// one that simply caches objects (for example, to allow a scheduler to
+// list currently available nodes), and one that additionally acts as
+// a FIFO queue (for example, to allow a scheduler to process incoming
+// pods).
+package cache // import "k8s.io/client-go/tools/cache"
diff --git a/vendor/k8s.io/client-go/tools/cache/expiration_cache.go b/vendor/k8s.io/client-go/tools/cache/expiration_cache.go
new file mode 100644
index 000000000..fa88fc407
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/expiration_cache.go
@@ -0,0 +1,208 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+ "k8s.io/apimachinery/pkg/util/clock"
+)
+
+// ExpirationCache implements the store interface
+// 1. All entries are automatically time stamped on insert
+// a. The key is computed based off the original item/keyFunc
+// b. The value inserted under that key is the timestamped item
+// 2. Expiration happens lazily on read based on the expiration policy
+// a. No item can be inserted into the store while we're expiring
+// *any* item in the cache.
+// 3. Time-stamps are stripped off unexpired entries before return
+// Note that the ExpirationCache is inherently slower than a normal
+// threadSafeStore because it takes a write lock every time it checks if
+// an item has expired.
+type ExpirationCache struct {
+ cacheStorage ThreadSafeStore
+ keyFunc KeyFunc
+ clock clock.Clock
+ expirationPolicy ExpirationPolicy
+ // expirationLock is a write lock used to guarantee that we don't clobber
+ // newly inserted objects because of a stale expiration timestamp comparison
+ expirationLock sync.Mutex
+}
+
+// ExpirationPolicy dictates when an object expires. Currently only abstracted out
+// so unittests don't rely on the system clock.
+type ExpirationPolicy interface {
+ IsExpired(obj *timestampedEntry) bool
+}
+
+// TTLPolicy implements a ttl based ExpirationPolicy.
+type TTLPolicy struct {
+ // >0: Expire entries with an age > ttl
+ // <=0: Don't expire any entry
+ Ttl time.Duration
+
+ // Clock used to calculate ttl expiration
+ Clock clock.Clock
+}
+
+// IsExpired returns true if the given object is older than the ttl, or it can't
+// determine its age.
+func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
+ return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
+}
+
+// timestampedEntry is the only type allowed in a ExpirationCache.
+type timestampedEntry struct {
+ obj interface{}
+ timestamp time.Time
+}
+
+// getTimestampedEntry returns the timestampedEntry stored under the given key.
+func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
+ item, _ := c.cacheStorage.Get(key)
+ if tsEntry, ok := item.(*timestampedEntry); ok {
+ return tsEntry, true
+ }
+ return nil, false
+}
+
+// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
+// already expired. It holds a write lock across deletion.
+func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
+ // Prevent all inserts from the time we deem an item as "expired" to when we
+ // delete it, so an un-expired item doesn't sneak in under the same key, just
+ // before the Delete.
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+ timestampedItem, exists := c.getTimestampedEntry(key)
+ if !exists {
+ return nil, false
+ }
+ if c.expirationPolicy.IsExpired(timestampedItem) {
+ glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
+ c.cacheStorage.Delete(key)
+ return nil, false
+ }
+ return timestampedItem.obj, true
+}
+
+// GetByKey returns the item stored under the key, or sets exists=false.
+func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) {
+ obj, exists := c.getOrExpire(key)
+ return obj, exists, nil
+}
+
+// Get returns unexpired items. It purges the cache of expired items in the
+// process.
+func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ obj, exists := c.getOrExpire(key)
+ return obj, exists, nil
+}
+
+// List retrieves a list of unexpired items. It purges the cache of expired
+// items in the process.
+func (c *ExpirationCache) List() []interface{} {
+ items := c.cacheStorage.List()
+
+ list := make([]interface{}, 0, len(items))
+ for _, item := range items {
+ obj := item.(*timestampedEntry).obj
+ if key, err := c.keyFunc(obj); err != nil {
+ list = append(list, obj)
+ } else if obj, exists := c.getOrExpire(key); exists {
+ list = append(list, obj)
+ }
+ }
+ return list
+}
+
+// ListKeys returns a list of all keys in the expiration cache.
+func (c *ExpirationCache) ListKeys() []string {
+ return c.cacheStorage.ListKeys()
+}
+
+// Add timestamps an item and inserts it into the cache, overwriting entries
+// that might exist under the same key.
+func (c *ExpirationCache) Add(obj interface{}) error {
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Add(key, &timestampedEntry{obj, c.clock.Now()})
+ return nil
+}
+
+// Update has not been implemented yet for lack of a use case, so this method
+// simply calls `Add`. This effectively refreshes the timestamp.
+func (c *ExpirationCache) Update(obj interface{}) error {
+ return c.Add(obj)
+}
+
+// Delete removes an item from the cache.
+func (c *ExpirationCache) Delete(obj interface{}) error {
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Delete(key)
+ return nil
+}
+
+// Replace will convert all items in the given list to TimestampedEntries
+// before attempting the replace operation. The replace operation will
+// delete the contents of the ExpirationCache `c`.
+func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+ items := map[string]interface{}{}
+ ts := c.clock.Now()
+ for _, item := range list {
+ key, err := c.keyFunc(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ items[key] = &timestampedEntry{item, ts}
+ }
+ c.cacheStorage.Replace(items, resourceVersion)
+ return nil
+}
+
+// Resync will touch all objects to put them into the processing queue
+func (c *ExpirationCache) Resync() error {
+ return c.cacheStorage.Resync()
+}
+
+// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
+func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
+ return &ExpirationCache{
+ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
+ keyFunc: keyFunc,
+ clock: clock.RealClock{},
+ expirationPolicy: &TTLPolicy{ttl, clock.RealClock{}},
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/expiration_cache_fakes.go b/vendor/k8s.io/client-go/tools/cache/expiration_cache_fakes.go
new file mode 100644
index 000000000..a096765f6
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/expiration_cache_fakes.go
@@ -0,0 +1,54 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "k8s.io/apimachinery/pkg/util/clock"
+ "k8s.io/apimachinery/pkg/util/sets"
+)
+
+type fakeThreadSafeMap struct {
+ ThreadSafeStore
+ deletedKeys chan<- string
+}
+
+func (c *fakeThreadSafeMap) Delete(key string) {
+ if c.deletedKeys != nil {
+ c.ThreadSafeStore.Delete(key)
+ c.deletedKeys <- key
+ }
+}
+
+type FakeExpirationPolicy struct {
+ NeverExpire sets.String
+ RetrieveKeyFunc KeyFunc
+}
+
+func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool {
+ key, _ := p.RetrieveKeyFunc(obj)
+ return !p.NeverExpire.Has(key)
+}
+
+func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock clock.Clock) Store {
+ cacheStorage := NewThreadSafeStore(Indexers{}, Indices{})
+ return &ExpirationCache{
+ cacheStorage: &fakeThreadSafeMap{cacheStorage, deletedKeys},
+ keyFunc: keyFunc,
+ clock: cacheClock,
+ expirationPolicy: expirationPolicy,
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/fake_custom_store.go b/vendor/k8s.io/client-go/tools/cache/fake_custom_store.go
new file mode 100644
index 000000000..8d71c2474
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/fake_custom_store.go
@@ -0,0 +1,102 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+// FakeStore lets you define custom functions for store operations
+type FakeCustomStore struct {
+ AddFunc func(obj interface{}) error
+ UpdateFunc func(obj interface{}) error
+ DeleteFunc func(obj interface{}) error
+ ListFunc func() []interface{}
+ ListKeysFunc func() []string
+ GetFunc func(obj interface{}) (item interface{}, exists bool, err error)
+ GetByKeyFunc func(key string) (item interface{}, exists bool, err error)
+ ReplaceFunc func(list []interface{}, resourceVerion string) error
+ ResyncFunc func() error
+}
+
+// Add calls the custom Add function if defined
+func (f *FakeCustomStore) Add(obj interface{}) error {
+ if f.AddFunc != nil {
+ return f.AddFunc(obj)
+ }
+ return nil
+}
+
+// Update calls the custom Update function if defined
+func (f *FakeCustomStore) Update(obj interface{}) error {
+ if f.UpdateFunc != nil {
+ return f.Update(obj)
+ }
+ return nil
+}
+
+// Delete calls the custom Delete function if defined
+func (f *FakeCustomStore) Delete(obj interface{}) error {
+ if f.DeleteFunc != nil {
+ return f.DeleteFunc(obj)
+ }
+ return nil
+}
+
+// List calls the custom List function if defined
+func (f *FakeCustomStore) List() []interface{} {
+ if f.ListFunc != nil {
+ return f.ListFunc()
+ }
+ return nil
+}
+
+// ListKeys calls the custom ListKeys function if defined
+func (f *FakeCustomStore) ListKeys() []string {
+ if f.ListKeysFunc != nil {
+ return f.ListKeysFunc()
+ }
+ return nil
+}
+
+// Get calls the custom Get function if defined
+func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ if f.GetFunc != nil {
+ return f.GetFunc(obj)
+ }
+ return nil, false, nil
+}
+
+// GetByKey calls the custom GetByKey function if defined
+func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error) {
+ if f.GetByKeyFunc != nil {
+ return f.GetByKeyFunc(key)
+ }
+ return nil, false, nil
+}
+
+// Replace calls the custom Replace function if defined
+func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error {
+ if f.ReplaceFunc != nil {
+ return f.ReplaceFunc(list, resourceVersion)
+ }
+ return nil
+}
+
+// Resync calls the custom Resync function if defined
+func (f *FakeCustomStore) Resync() error {
+ if f.ResyncFunc != nil {
+ return f.ResyncFunc()
+ }
+ return nil
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/fifo.go b/vendor/k8s.io/client-go/tools/cache/fifo.go
new file mode 100644
index 000000000..3f6e2a948
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/fifo.go
@@ -0,0 +1,358 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "errors"
+ "sync"
+
+ "k8s.io/apimachinery/pkg/util/sets"
+)
+
+// PopProcessFunc is passed to Pop() method of Queue interface.
+// It is supposed to process the element popped from the queue.
+type PopProcessFunc func(interface{}) error
+
+// ErrRequeue may be returned by a PopProcessFunc to safely requeue
+// the current item. The value of Err will be returned from Pop.
+type ErrRequeue struct {
+ // Err is returned by the Pop function
+ Err error
+}
+
+var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
+
+func (e ErrRequeue) Error() string {
+ if e.Err == nil {
+ return "the popped item should be requeued without returning an error"
+ }
+ return e.Err.Error()
+}
+
+// Queue is exactly like a Store, but has a Pop() method too.
+type Queue interface {
+ Store
+
+ // Pop blocks until it has something to process.
+ // It returns the object that was process and the result of processing.
+ // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
+ // should be requeued before releasing the lock on the queue.
+ Pop(PopProcessFunc) (interface{}, error)
+
+ // AddIfNotPresent adds a value previously
+ // returned by Pop back into the queue as long
+ // as nothing else (presumably more recent)
+ // has since been added.
+ AddIfNotPresent(interface{}) error
+
+ // Return true if the first batch of items has been popped
+ HasSynced() bool
+
+ // Close queue
+ Close()
+}
+
+// Helper function for popping from Queue.
+// WARNING: Do NOT use this function in non-test code to avoid races
+// unless you really really really really know what you are doing.
+func Pop(queue Queue) interface{} {
+ var result interface{}
+ queue.Pop(func(obj interface{}) error {
+ result = obj
+ return nil
+ })
+ return result
+}
+
+// FIFO receives adds and updates from a Reflector, and puts them in a queue for
+// FIFO order processing. If multiple adds/updates of a single item happen while
+// an item is in the queue before it has been processed, it will only be
+// processed once, and when it is processed, the most recent version will be
+// processed. This can't be done with a channel.
+//
+// FIFO solves this use case:
+// * You want to process every object (exactly) once.
+// * You want to process the most recent version of the object when you process it.
+// * You do not want to process deleted objects, they should be removed from the queue.
+// * You do not want to periodically reprocess objects.
+// Compare with DeltaFIFO for other use cases.
+type FIFO struct {
+ lock sync.RWMutex
+ cond sync.Cond
+ // We depend on the property that items in the set are in the queue and vice versa.
+ items map[string]interface{}
+ queue []string
+
+ // populated is true if the first batch of items inserted by Replace() has been populated
+ // or Delete/Add/Update was called first.
+ populated bool
+ // initialPopulationCount is the number of items inserted by the first call of Replace()
+ initialPopulationCount int
+
+ // keyFunc is used to make the key used for queued item insertion and retrieval, and
+ // should be deterministic.
+ keyFunc KeyFunc
+
+ // Indication the queue is closed.
+ // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
+ // Currently, not used to gate any of CRED operations.
+ closed bool
+ closedLock sync.Mutex
+}
+
+var (
+ _ = Queue(&FIFO{}) // FIFO is a Queue
+)
+
+// Close the queue.
+func (f *FIFO) Close() {
+ f.closedLock.Lock()
+ defer f.closedLock.Unlock()
+ f.closed = true
+ f.cond.Broadcast()
+}
+
+// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
+// or an Update called first but the first batch of items inserted by Replace() has been popped
+func (f *FIFO) HasSynced() bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ return f.populated && f.initialPopulationCount == 0
+}
+
+// Add inserts an item, and puts it in the queue. The item is only enqueued
+// if it doesn't already exist in the set.
+func (f *FIFO) Add(obj interface{}) error {
+ id, err := f.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ if _, exists := f.items[id]; !exists {
+ f.queue = append(f.queue, id)
+ }
+ f.items[id] = obj
+ f.cond.Broadcast()
+ return nil
+}
+
+// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
+// present in the set, it is neither enqueued nor added to the set.
+//
+// This is useful in a single producer/consumer scenario so that the consumer can
+// safely retry items without contending with the producer and potentially enqueueing
+// stale items.
+func (f *FIFO) AddIfNotPresent(obj interface{}) error {
+ id, err := f.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.addIfNotPresent(id, obj)
+ return nil
+}
+
+// addIfNotPresent assumes the fifo lock is already held and adds the the provided
+// item to the queue under id if it does not already exist.
+func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
+ f.populated = true
+ if _, exists := f.items[id]; exists {
+ return
+ }
+
+ f.queue = append(f.queue, id)
+ f.items[id] = obj
+ f.cond.Broadcast()
+}
+
+// Update is the same as Add in this implementation.
+func (f *FIFO) Update(obj interface{}) error {
+ return f.Add(obj)
+}
+
+// Delete removes an item. It doesn't add it to the queue, because
+// this implementation assumes the consumer only cares about the objects,
+// not the order in which they were created/added.
+func (f *FIFO) Delete(obj interface{}) error {
+ id, err := f.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ delete(f.items, id)
+ return err
+}
+
+// List returns a list of all the items.
+func (f *FIFO) List() []interface{} {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list := make([]interface{}, 0, len(f.items))
+ for _, item := range f.items {
+ list = append(list, item)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the FIFO.
+func (f *FIFO) ListKeys() []string {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list := make([]string, 0, len(f.items))
+ for key := range f.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+// Get returns the requested item, or sets exists=false.
+func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ key, err := f.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return f.GetByKey(key)
+}
+
+// GetByKey returns the requested item, or sets exists=false.
+func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ item, exists = f.items[key]
+ return item, exists, nil
+}
+
+// Checks if the queue is closed
+func (f *FIFO) IsClosed() bool {
+ f.closedLock.Lock()
+ defer f.closedLock.Unlock()
+ if f.closed {
+ return true
+ }
+ return false
+}
+
+// Pop waits until an item is ready and processes it. If multiple items are
+// ready, they are returned in the order in which they were added/updated.
+// The item is removed from the queue (and the store) before it is processed,
+// so if you don't successfully process it, it should be added back with
+// AddIfNotPresent(). process function is called under lock, so it is safe
+// update data structures in it that need to be in sync with the queue.
+func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ for {
+ for len(f.queue) == 0 {
+ // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
+ // When Close() is called, the f.closed is set and the condition is broadcasted.
+ // Which causes this loop to continue and return from the Pop().
+ if f.IsClosed() {
+ return nil, FIFOClosedError
+ }
+
+ f.cond.Wait()
+ }
+ id := f.queue[0]
+ f.queue = f.queue[1:]
+ if f.initialPopulationCount > 0 {
+ f.initialPopulationCount--
+ }
+ item, ok := f.items[id]
+ if !ok {
+ // Item may have been deleted subsequently.
+ continue
+ }
+ delete(f.items, id)
+ err := process(item)
+ if e, ok := err.(ErrRequeue); ok {
+ f.addIfNotPresent(id, item)
+ err = e.Err
+ }
+ return item, err
+ }
+}
+
+// Replace will delete the contents of 'f', using instead the given map.
+// 'f' takes ownership of the map, you should not reference the map again
+// after calling this function. f's queue is reset, too; upon return, it
+// will contain the items in the map, in no particular order.
+func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
+ items := map[string]interface{}{}
+ for _, item := range list {
+ key, err := f.keyFunc(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ items[key] = item
+ }
+
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if !f.populated {
+ f.populated = true
+ f.initialPopulationCount = len(items)
+ }
+
+ f.items = items
+ f.queue = f.queue[:0]
+ for id := range items {
+ f.queue = append(f.queue, id)
+ }
+ if len(f.queue) > 0 {
+ f.cond.Broadcast()
+ }
+ return nil
+}
+
+// Resync will touch all objects to put them into the processing queue
+func (f *FIFO) Resync() error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ inQueue := sets.NewString()
+ for _, id := range f.queue {
+ inQueue.Insert(id)
+ }
+ for id := range f.items {
+ if !inQueue.Has(id) {
+ f.queue = append(f.queue, id)
+ }
+ }
+ if len(f.queue) > 0 {
+ f.cond.Broadcast()
+ }
+ return nil
+}
+
+// NewFIFO returns a Store which can be used to queue up items to
+// process.
+func NewFIFO(keyFunc KeyFunc) *FIFO {
+ f := &FIFO{
+ items: map[string]interface{}{},
+ queue: []string{},
+ keyFunc: keyFunc,
+ }
+ f.cond.L = &f.lock
+ return f
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/index.go b/vendor/k8s.io/client-go/tools/cache/index.go
new file mode 100644
index 000000000..15acb168e
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/index.go
@@ -0,0 +1,87 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/util/sets"
+)
+
+// Indexer is a storage interface that lets you list objects using multiple indexing functions
+type Indexer interface {
+ Store
+ // Retrieve list of objects that match on the named indexing function
+ Index(indexName string, obj interface{}) ([]interface{}, error)
+ // IndexKeys returns the set of keys that match on the named indexing function.
+ IndexKeys(indexName, indexKey string) ([]string, error)
+ // ListIndexFuncValues returns the list of generated values of an Index func
+ ListIndexFuncValues(indexName string) []string
+ // ByIndex lists object that match on the named indexing function with the exact key
+ ByIndex(indexName, indexKey string) ([]interface{}, error)
+ // GetIndexer return the indexers
+ GetIndexers() Indexers
+
+ // AddIndexers adds more indexers to this store. If you call this after you already have data
+ // in the store, the results are undefined.
+ AddIndexers(newIndexers Indexers) error
+}
+
+// IndexFunc knows how to provide an indexed value for an object.
+type IndexFunc func(obj interface{}) ([]string, error)
+
+// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns
+// unique values for every object. This is conversion can create errors when more than one key is found. You
+// should prefer to make proper key and index functions.
+func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
+ return func(obj interface{}) (string, error) {
+ indexKeys, err := indexFunc(obj)
+ if err != nil {
+ return "", err
+ }
+ if len(indexKeys) > 1 {
+ return "", fmt.Errorf("too many keys: %v", indexKeys)
+ }
+ if len(indexKeys) == 0 {
+ return "", fmt.Errorf("unexpected empty indexKeys")
+ }
+ return indexKeys[0], nil
+ }
+}
+
+const (
+ NamespaceIndex string = "namespace"
+)
+
+// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
+func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
+ meta, err := meta.Accessor(obj)
+ if err != nil {
+ return []string{""}, fmt.Errorf("object has no meta: %v", err)
+ }
+ return []string{meta.GetNamespace()}, nil
+}
+
+// Index maps the indexed value to a set of keys in the store that match on that value
+type Index map[string]sets.String
+
+// Indexers maps a name to a IndexFunc
+type Indexers map[string]IndexFunc
+
+// Indices maps a name to an Index
+type Indices map[string]Index
diff --git a/vendor/k8s.io/client-go/tools/cache/listers.go b/vendor/k8s.io/client-go/tools/cache/listers.go
new file mode 100644
index 000000000..27d51a6b3
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/listers.go
@@ -0,0 +1,160 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+)
+
+// AppendFunc is used to add a matching item to whatever list the caller is using
+type AppendFunc func(interface{})
+
+func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
+ for _, m := range store.List() {
+ metadata, err := meta.Accessor(m)
+ if err != nil {
+ return err
+ }
+ if selector.Matches(labels.Set(metadata.GetLabels())) {
+ appendFn(m)
+ }
+ }
+ return nil
+}
+
+func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
+ if namespace == metav1.NamespaceAll {
+ for _, m := range indexer.List() {
+ metadata, err := meta.Accessor(m)
+ if err != nil {
+ return err
+ }
+ if selector.Matches(labels.Set(metadata.GetLabels())) {
+ appendFn(m)
+ }
+ }
+ return nil
+ }
+
+ items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
+ if err != nil {
+ // Ignore error; do slow search without index.
+ glog.Warningf("can not retrieve list of objects using index : %v", err)
+ for _, m := range indexer.List() {
+ metadata, err := meta.Accessor(m)
+ if err != nil {
+ return err
+ }
+ if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) {
+ appendFn(m)
+ }
+
+ }
+ return nil
+ }
+ for _, m := range items {
+ metadata, err := meta.Accessor(m)
+ if err != nil {
+ return err
+ }
+ if selector.Matches(labels.Set(metadata.GetLabels())) {
+ appendFn(m)
+ }
+ }
+
+ return nil
+}
+
+// GenericLister is a lister skin on a generic Indexer
+type GenericLister interface {
+ // List will return all objects across namespaces
+ List(selector labels.Selector) (ret []runtime.Object, err error)
+ // Get will attempt to retrieve assuming that name==key
+ Get(name string) (runtime.Object, error)
+ // ByNamespace will give you a GenericNamespaceLister for one namespace
+ ByNamespace(namespace string) GenericNamespaceLister
+}
+
+// GenericNamespaceLister is a lister skin on a generic Indexer
+type GenericNamespaceLister interface {
+ // List will return all objects in this namespace
+ List(selector labels.Selector) (ret []runtime.Object, err error)
+ // Get will attempt to retrieve by namespace and name
+ Get(name string) (runtime.Object, error)
+}
+
+func NewGenericLister(indexer Indexer, resource schema.GroupResource) GenericLister {
+ return &genericLister{indexer: indexer, resource: resource}
+}
+
+type genericLister struct {
+ indexer Indexer
+ resource schema.GroupResource
+}
+
+func (s *genericLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
+ err = ListAll(s.indexer, selector, func(m interface{}) {
+ ret = append(ret, m.(runtime.Object))
+ })
+ return ret, err
+}
+
+func (s *genericLister) ByNamespace(namespace string) GenericNamespaceLister {
+ return &genericNamespaceLister{indexer: s.indexer, namespace: namespace, resource: s.resource}
+}
+
+func (s *genericLister) Get(name string) (runtime.Object, error) {
+ obj, exists, err := s.indexer.GetByKey(name)
+ if err != nil {
+ return nil, err
+ }
+ if !exists {
+ return nil, errors.NewNotFound(s.resource, name)
+ }
+ return obj.(runtime.Object), nil
+}
+
+type genericNamespaceLister struct {
+ indexer Indexer
+ namespace string
+ resource schema.GroupResource
+}
+
+func (s *genericNamespaceLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
+ err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
+ ret = append(ret, m.(runtime.Object))
+ })
+ return ret, err
+}
+
+func (s *genericNamespaceLister) Get(name string) (runtime.Object, error) {
+ obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
+ if err != nil {
+ return nil, err
+ }
+ if !exists {
+ return nil, errors.NewNotFound(s.resource, name)
+ }
+ return obj.(runtime.Object), nil
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/listwatch.go b/vendor/k8s.io/client-go/tools/cache/listwatch.go
new file mode 100644
index 000000000..af01d4745
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/listwatch.go
@@ -0,0 +1,162 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ restclient "k8s.io/client-go/rest"
+)
+
+// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
+type ListerWatcher interface {
+ // List should return a list type object; the Items field will be extracted, and the
+ // ResourceVersion field will be used to start the watch in the right place.
+ List(options metav1.ListOptions) (runtime.Object, error)
+ // Watch should begin a watch at the specified version.
+ Watch(options metav1.ListOptions) (watch.Interface, error)
+}
+
+// ListFunc knows how to list resources
+type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
+
+// WatchFunc knows how to watch resources
+type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
+
+// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
+// It is a convenience function for users of NewReflector, etc.
+// ListFunc and WatchFunc must not be nil
+type ListWatch struct {
+ ListFunc ListFunc
+ WatchFunc WatchFunc
+}
+
+// Getter interface knows how to access Get method from RESTClient.
+type Getter interface {
+ Get() *restclient.Request
+}
+
+// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
+func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
+ listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
+ return c.Get().
+ Namespace(namespace).
+ Resource(resource).
+ VersionedParams(&options, metav1.ParameterCodec).
+ FieldsSelectorParam(fieldSelector).
+ Do().
+ Get()
+ }
+ watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
+ options.Watch = true
+ return c.Get().
+ Namespace(namespace).
+ Resource(resource).
+ VersionedParams(&options, metav1.ParameterCodec).
+ FieldsSelectorParam(fieldSelector).
+ Watch()
+ }
+ return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
+}
+
+func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
+ if options.TimeoutSeconds != nil {
+ return time.Duration(*options.TimeoutSeconds) * time.Second
+ }
+ return 0
+}
+
+// List a set of apiserver resources
+func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
+ return lw.ListFunc(options)
+}
+
+// Watch a set of apiserver resources
+func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
+ return lw.WatchFunc(options)
+}
+
+// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
+func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
+ if len(conditions) == 0 {
+ return nil, nil
+ }
+
+ list, err := lw.List(metav1.ListOptions{})
+ if err != nil {
+ return nil, err
+ }
+ initialItems, err := meta.ExtractList(list)
+ if err != nil {
+ return nil, err
+ }
+
+ // use the initial items as simulated "adds"
+ var lastEvent *watch.Event
+ currIndex := 0
+ passedConditions := 0
+ for _, condition := range conditions {
+ // check the next condition against the previous event and short circuit waiting for the next watch
+ if lastEvent != nil {
+ done, err := condition(*lastEvent)
+ if err != nil {
+ return lastEvent, err
+ }
+ if done {
+ passedConditions = passedConditions + 1
+ continue
+ }
+ }
+
+ ConditionSucceeded:
+ for currIndex < len(initialItems) {
+ lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
+ currIndex++
+
+ done, err := condition(*lastEvent)
+ if err != nil {
+ return lastEvent, err
+ }
+ if done {
+ passedConditions = passedConditions + 1
+ break ConditionSucceeded
+ }
+ }
+ }
+ if passedConditions == len(conditions) {
+ return lastEvent, nil
+ }
+ remainingConditions := conditions[passedConditions:]
+
+ metaObj, err := meta.ListAccessor(list)
+ if err != nil {
+ return nil, err
+ }
+ currResourceVersion := metaObj.GetResourceVersion()
+
+ watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
+ if err != nil {
+ return nil, err
+ }
+
+ return watch.Until(timeout, watchInterface, remainingConditions...)
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/mutation_cache.go b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
new file mode 100644
index 000000000..0fa06bf77
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
@@ -0,0 +1,261 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilcache "k8s.io/apimachinery/pkg/util/cache"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/sets"
+)
+
+// MutationCache is able to take the result of update operations and stores them in an LRU
+// that can be used to provide a more current view of a requested object. It requires interpreting
+// resourceVersions for comparisons.
+// Implementations must be thread-safe.
+// TODO find a way to layer this into an informer/lister
+type MutationCache interface {
+ GetByKey(key string) (interface{}, bool, error)
+ ByIndex(indexName, indexKey string) ([]interface{}, error)
+ Mutation(interface{})
+}
+
+type ResourceVersionComparator interface {
+ CompareResourceVersion(lhs, rhs runtime.Object) int
+}
+
+// NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
+// deal with objects that have a resource version that:
+//
+// - is an integer
+// - increases when updated
+// - is comparable across the same resource in a namespace
+//
+// Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
+// remains in the mutation cache before it is removed.
+//
+// If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
+// in the underlying store. This is only safe if your use of the cache can handle mutation entries
+// remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
+func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
+ return &mutationCache{
+ backingCache: backingCache,
+ indexer: indexer,
+ mutationCache: utilcache.NewLRUExpireCache(100),
+ comparator: etcdObjectVersioner{},
+ ttl: ttl,
+ includeAdds: includeAdds,
+ }
+}
+
+// mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
+// since you can't distinguish between, "didn't observe create" and "was deleted after create",
+// if the key is missing from the backing cache, we always return it as missing
+type mutationCache struct {
+ lock sync.Mutex
+ backingCache Store
+ indexer Indexer
+ mutationCache *utilcache.LRUExpireCache
+ includeAdds bool
+ ttl time.Duration
+
+ comparator ResourceVersionComparator
+}
+
+// GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
+// be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
+// You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
+func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ obj, exists, err := c.backingCache.GetByKey(key)
+ if err != nil {
+ return nil, false, err
+ }
+ if !exists {
+ if !c.includeAdds {
+ // we can't distinguish between, "didn't observe create" and "was deleted after create", so
+ // if the key is missing, we always return it as missing
+ return nil, false, nil
+ }
+ obj, exists = c.mutationCache.Get(key)
+ if !exists {
+ return nil, false, nil
+ }
+ }
+ objRuntime, ok := obj.(runtime.Object)
+ if !ok {
+ return obj, true, nil
+ }
+ return c.newerObject(key, objRuntime), true, nil
+}
+
+// ByIndex returns the newer objects that match the provided index and indexer key.
+// Will return an error if no indexer was provided.
+func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if c.indexer == nil {
+ return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
+ }
+ keys, err := c.indexer.IndexKeys(name, indexKey)
+ if err != nil {
+ return nil, err
+ }
+ var items []interface{}
+ keySet := sets.NewString()
+ for _, key := range keys {
+ keySet.Insert(key)
+ obj, exists, err := c.indexer.GetByKey(key)
+ if err != nil {
+ return nil, err
+ }
+ if !exists {
+ continue
+ }
+ if objRuntime, ok := obj.(runtime.Object); ok {
+ items = append(items, c.newerObject(key, objRuntime))
+ } else {
+ items = append(items, obj)
+ }
+ }
+
+ if c.includeAdds {
+ fn := c.indexer.GetIndexers()[name]
+ // Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
+ for _, key := range c.mutationCache.Keys() {
+ updated, ok := c.mutationCache.Get(key)
+ if !ok {
+ continue
+ }
+ if keySet.Has(key.(string)) {
+ continue
+ }
+ elements, err := fn(updated)
+ if err != nil {
+ glog.V(4).Info("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
+ continue
+ }
+ for _, inIndex := range elements {
+ if inIndex != indexKey {
+ continue
+ }
+ items = append(items, updated)
+ break
+ }
+ }
+ }
+
+ return items, nil
+}
+
+// newerObject checks the mutation cache for a newer object and returns one if found. If the
+// mutated object is older than the backing object, it is removed from the Must be
+// called while the lock is held.
+func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
+ mutatedObj, exists := c.mutationCache.Get(key)
+ if !exists {
+ return backing
+ }
+ mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
+ if !ok {
+ return backing
+ }
+ if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
+ c.mutationCache.Remove(key)
+ return backing
+ }
+ return mutatedObjRuntime
+}
+
+// Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
+// copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
+// which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
+// preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
+// "one that was valid at some point in time", not "the one that I want".
+func (c *mutationCache) Mutation(obj interface{}) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
+ if err != nil {
+ // this is a "nice to have", so failures shouldn't do anything weird
+ utilruntime.HandleError(err)
+ return
+ }
+
+ if objRuntime, ok := obj.(runtime.Object); ok {
+ if mutatedObj, exists := c.mutationCache.Get(key); exists {
+ if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
+ if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
+ return
+ }
+ }
+ }
+ }
+ c.mutationCache.Add(key, obj, c.ttl)
+}
+
+// etcdObjectVersioner implements versioning and extracting etcd node information
+// for objects that have an embedded ObjectMeta or ListMeta field.
+type etcdObjectVersioner struct{}
+
+// ObjectResourceVersion implements Versioner
+func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
+ accessor, err := meta.Accessor(obj)
+ if err != nil {
+ return 0, err
+ }
+ version := accessor.GetResourceVersion()
+ if len(version) == 0 {
+ return 0, nil
+ }
+ return strconv.ParseUint(version, 10, 64)
+}
+
+// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
+// but etcd resource versions are special, they're actually ints, so we can easily compare them.
+func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
+ lhsVersion, err := a.ObjectResourceVersion(lhs)
+ if err != nil {
+ // coder error
+ panic(err)
+ }
+ rhsVersion, err := a.ObjectResourceVersion(rhs)
+ if err != nil {
+ // coder error
+ panic(err)
+ }
+
+ if lhsVersion == rhsVersion {
+ return 0
+ }
+ if lhsVersion < rhsVersion {
+ return -1
+ }
+
+ return 1
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/mutation_detector.go b/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
new file mode 100644
index 000000000..cc6094ce4
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
@@ -0,0 +1,135 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "os"
+ "reflect"
+ "strconv"
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/diff"
+ "k8s.io/client-go/kubernetes/scheme"
+)
+
+var mutationDetectionEnabled = false
+
+func init() {
+ mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
+}
+
+type CacheMutationDetector interface {
+ AddObject(obj interface{})
+ Run(stopCh <-chan struct{})
+}
+
+func NewCacheMutationDetector(name string) CacheMutationDetector {
+ if !mutationDetectionEnabled {
+ return dummyMutationDetector{}
+ }
+ return &defaultCacheMutationDetector{name: name, period: 1 * time.Second}
+}
+
+type dummyMutationDetector struct{}
+
+func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
+}
+func (dummyMutationDetector) AddObject(obj interface{}) {
+}
+
+// defaultCacheMutationDetector gives a way to detect if a cached object has been mutated
+// It has a list of cached objects and their copies. I haven't thought of a way
+// to see WHO is mutating it, just that it's getting mutated.
+type defaultCacheMutationDetector struct {
+ name string
+ period time.Duration
+
+ lock sync.Mutex
+ cachedObjs []cacheObj
+
+ // failureFunc is injectable for unit testing. If you don't have it, the process will panic.
+ // This panic is intentional, since turning on this detection indicates you want a strong
+ // failure signal. This failure is effectively a p0 bug and you can't trust process results
+ // after a mutation anyway.
+ failureFunc func(message string)
+}
+
+// cacheObj holds the actual object and a copy
+type cacheObj struct {
+ cached interface{}
+ copied interface{}
+}
+
+func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
+ // we DON'T want protection from panics. If we're running this code, we want to die
+ go func() {
+ for {
+ d.CompareObjects()
+
+ select {
+ case <-stopCh:
+ return
+ case <-time.After(d.period):
+ }
+ }
+ }()
+}
+
+// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
+// but that covers the vast majority of our cached objects
+func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
+ if _, ok := obj.(DeletedFinalStateUnknown); ok {
+ return
+ }
+ if _, ok := obj.(runtime.Object); !ok {
+ return
+ }
+
+ copiedObj, err := scheme.Scheme.Copy(obj.(runtime.Object))
+ if err != nil {
+ return
+ }
+
+ d.lock.Lock()
+ defer d.lock.Unlock()
+ d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
+}
+
+func (d *defaultCacheMutationDetector) CompareObjects() {
+ d.lock.Lock()
+ defer d.lock.Unlock()
+
+ altered := false
+ for i, obj := range d.cachedObjs {
+ if !reflect.DeepEqual(obj.cached, obj.copied) {
+ fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectDiff(obj.cached, obj.copied))
+ altered = true
+ }
+ }
+
+ if altered {
+ msg := fmt.Sprintf("cache %s modified", d.name)
+ if d.failureFunc != nil {
+ d.failureFunc(msg)
+ return
+ }
+ panic(msg)
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go
new file mode 100644
index 000000000..9a730610c
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/reflector.go
@@ -0,0 +1,421 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "net"
+ "net/url"
+ "reflect"
+ "regexp"
+ goruntime "runtime"
+ "runtime/debug"
+ "strconv"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/golang/glog"
+ apierrs "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/watch"
+)
+
+// Reflector watches a specified resource and causes all changes to be reflected in the given store.
+type Reflector struct {
+ // name identifies this reflector. By default it will be a file:line if possible.
+ name string
+
+ // The type of object we expect to place in the store.
+ expectedType reflect.Type
+ // The destination to sync up with the watch source
+ store Store
+ // listerWatcher is used to perform lists and watches.
+ listerWatcher ListerWatcher
+ // period controls timing between one watch ending and
+ // the beginning of the next one.
+ period time.Duration
+ resyncPeriod time.Duration
+ ShouldResync func() bool
+ // clock allows tests to manipulate time
+ clock clock.Clock
+ // lastSyncResourceVersion is the resource version token last
+ // observed when doing a sync with the underlying store
+ // it is thread safe, but not synchronized with the underlying store
+ lastSyncResourceVersion string
+ // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
+ lastSyncResourceVersionMutex sync.RWMutex
+}
+
+var (
+ // We try to spread the load on apiserver by setting timeouts for
+ // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
+ // However, it can be modified to avoid periodic resync to break the
+ // TCP connection.
+ minWatchTimeout = 5 * time.Minute
+)
+
+// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
+// The indexer is configured to key on namespace
+func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
+ indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
+ reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
+ return indexer, reflector
+}
+
+// NewReflector creates a new Reflector object which will keep the given store up to
+// date with the server's contents for the given resource. Reflector promises to
+// only put things in the store that have the type of expectedType, unless expectedType
+// is nil. If resyncPeriod is non-zero, then lists will be executed after every
+// resyncPeriod, so that you can use reflectors to periodically process everything as
+// well as incrementally processing the things that change.
+func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
+ return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
+}
+
+// NewNamedReflector same as NewReflector, but with a specified name for logging
+func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
+ r := &Reflector{
+ name: name,
+ listerWatcher: lw,
+ store: store,
+ expectedType: reflect.TypeOf(expectedType),
+ period: time.Second,
+ resyncPeriod: resyncPeriod,
+ clock: &clock.RealClock{},
+ }
+ return r
+}
+
+// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
+// call chains to NewReflector, so they'd be low entropy names for reflectors
+var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
+
+// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages
+// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging
+func getDefaultReflectorName(ignoredPackages ...string) string {
+ name := "????"
+ const maxStack = 10
+ for i := 1; i < maxStack; i++ {
+ _, file, line, ok := goruntime.Caller(i)
+ if !ok {
+ file, line, ok = extractStackCreator()
+ if !ok {
+ break
+ }
+ i += maxStack
+ }
+ if hasPackage(file, ignoredPackages) {
+ continue
+ }
+
+ file = trimPackagePrefix(file)
+ name = fmt.Sprintf("%s:%d", file, line)
+ break
+ }
+ return name
+}
+
+// hasPackage returns true if the file is in one of the ignored packages.
+func hasPackage(file string, ignoredPackages []string) bool {
+ for _, ignoredPackage := range ignoredPackages {
+ if strings.Contains(file, ignoredPackage) {
+ return true
+ }
+ }
+ return false
+}
+
+// trimPackagePrefix reduces duplicate values off the front of a package name.
+func trimPackagePrefix(file string) string {
+ if l := strings.LastIndex(file, "k8s.io/client-go/pkg/"); l >= 0 {
+ return file[l+len("k8s.io/client-go/"):]
+ }
+ if l := strings.LastIndex(file, "/src/"); l >= 0 {
+ return file[l+5:]
+ }
+ if l := strings.LastIndex(file, "/pkg/"); l >= 0 {
+ return file[l+1:]
+ }
+ return file
+}
+
+var stackCreator = regexp.MustCompile(`(?m)^created by (.*)\n\s+(.*):(\d+) \+0x[[:xdigit:]]+$`)
+
+// extractStackCreator retrieves the goroutine file and line that launched this stack. Returns false
+// if the creator cannot be located.
+// TODO: Go does not expose this via runtime https://github.com/golang/go/issues/11440
+func extractStackCreator() (string, int, bool) {
+ stack := debug.Stack()
+ matches := stackCreator.FindStringSubmatch(string(stack))
+ if matches == nil || len(matches) != 4 {
+ return "", 0, false
+ }
+ line, err := strconv.Atoi(matches[3])
+ if err != nil {
+ return "", 0, false
+ }
+ return matches[2], line, true
+}
+
+// Run starts a watch and handles watch events. Will restart the watch if it is closed.
+// Run starts a goroutine and returns immediately.
+func (r *Reflector) Run() {
+ glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
+ go wait.Until(func() {
+ if err := r.ListAndWatch(wait.NeverStop); err != nil {
+ utilruntime.HandleError(err)
+ }
+ }, r.period, wait.NeverStop)
+}
+
+// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
+// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
+func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
+ glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
+ go wait.Until(func() {
+ if err := r.ListAndWatch(stopCh); err != nil {
+ utilruntime.HandleError(err)
+ }
+ }, r.period, stopCh)
+}
+
+var (
+ // nothing will ever be sent down this channel
+ neverExitWatch <-chan time.Time = make(chan time.Time)
+
+ // Used to indicate that watching stopped so that a resync could happen.
+ errorResyncRequested = errors.New("resync channel fired")
+
+ // Used to indicate that watching stopped because of a signal from the stop
+ // channel passed in from a client of the reflector.
+ errorStopRequested = errors.New("Stop requested")
+)
+
+// resyncChan returns a channel which will receive something when a resync is
+// required, and a cleanup function.
+func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
+ if r.resyncPeriod == 0 {
+ return neverExitWatch, func() bool { return false }
+ }
+ // The cleanup function is required: imagine the scenario where watches
+ // always fail so we end up listing frequently. Then, if we don't
+ // manually stop the timer, we could end up with many timers active
+ // concurrently.
+ t := r.clock.NewTimer(r.resyncPeriod)
+ return t.C(), t.Stop
+}
+
+// ListAndWatch first lists all items and get the resource version at the moment of call,
+// and then use the resource version to watch.
+// It returns error if ListAndWatch didn't even try to initialize watch.
+func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
+ glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
+ var resourceVersion string
+ resyncCh, cleanup := r.resyncChan()
+ defer cleanup()
+
+ // Explicitly set "0" as resource version - it's fine for the List()
+ // to be served from cache and potentially be delayed relative to
+ // etcd contents. Reflector framework will catch up via Watch() eventually.
+ options := metav1.ListOptions{ResourceVersion: "0"}
+ list, err := r.listerWatcher.List(options)
+ if err != nil {
+ return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
+ }
+ listMetaInterface, err := meta.ListAccessor(list)
+ if err != nil {
+ return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
+ }
+ resourceVersion = listMetaInterface.GetResourceVersion()
+ items, err := meta.ExtractList(list)
+ if err != nil {
+ return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
+ }
+ if err := r.syncWith(items, resourceVersion); err != nil {
+ return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
+ }
+ r.setLastSyncResourceVersion(resourceVersion)
+
+ resyncerrc := make(chan error, 1)
+ cancelCh := make(chan struct{})
+ defer close(cancelCh)
+ go func() {
+ for {
+ select {
+ case <-resyncCh:
+ case <-stopCh:
+ return
+ case <-cancelCh:
+ return
+ }
+ if r.ShouldResync == nil || r.ShouldResync() {
+ glog.V(4).Infof("%s: forcing resync", r.name)
+ if err := r.store.Resync(); err != nil {
+ resyncerrc <- err
+ return
+ }
+ }
+ cleanup()
+ resyncCh, cleanup = r.resyncChan()
+ }
+ }()
+
+ for {
+ timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
+ options = metav1.ListOptions{
+ ResourceVersion: resourceVersion,
+ // We want to avoid situations of hanging watchers. Stop any wachers that do not
+ // receive any events within the timeout window.
+ TimeoutSeconds: &timemoutseconds,
+ }
+
+ w, err := r.listerWatcher.Watch(options)
+ if err != nil {
+ switch err {
+ case io.EOF:
+ // watch closed normally
+ case io.ErrUnexpectedEOF:
+ glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
+ default:
+ utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
+ }
+ // If this is "connection refused" error, it means that most likely apiserver is not responsive.
+ // It doesn't make sense to re-list all objects because most likely we will be able to restart
+ // watch where we ended.
+ // If that's the case wait and resend watch request.
+ if urlError, ok := err.(*url.Error); ok {
+ if opError, ok := urlError.Err.(*net.OpError); ok {
+ if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
+ time.Sleep(time.Second)
+ continue
+ }
+ }
+ }
+ return nil
+ }
+
+ if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
+ if err != errorStopRequested {
+ glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
+ }
+ return nil
+ }
+ }
+}
+
+// syncWith replaces the store's items with the given list.
+func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
+ found := make([]interface{}, 0, len(items))
+ for _, item := range items {
+ found = append(found, item)
+ }
+ return r.store.Replace(found, resourceVersion)
+}
+
+// watchHandler watches w and keeps *resourceVersion up to date.
+func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
+ start := r.clock.Now()
+ eventCount := 0
+
+ // Stopping the watcher should be idempotent and if we return from this function there's no way
+ // we're coming back in with the same watch interface.
+ defer w.Stop()
+
+loop:
+ for {
+ select {
+ case <-stopCh:
+ return errorStopRequested
+ case err := <-errc:
+ return err
+ case event, ok := <-w.ResultChan():
+ if !ok {
+ break loop
+ }
+ if event.Type == watch.Error {
+ return apierrs.FromObject(event.Object)
+ }
+ if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
+ utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
+ continue
+ }
+ meta, err := meta.Accessor(event.Object)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
+ continue
+ }
+ newResourceVersion := meta.GetResourceVersion()
+ switch event.Type {
+ case watch.Added:
+ err := r.store.Add(event.Object)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
+ }
+ case watch.Modified:
+ err := r.store.Update(event.Object)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
+ }
+ case watch.Deleted:
+ // TODO: Will any consumers need access to the "last known
+ // state", which is passed in event.Object? If so, may need
+ // to change this.
+ err := r.store.Delete(event.Object)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
+ }
+ default:
+ utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
+ }
+ *resourceVersion = newResourceVersion
+ r.setLastSyncResourceVersion(newResourceVersion)
+ eventCount++
+ }
+ }
+
+ watchDuration := r.clock.Now().Sub(start)
+ if watchDuration < 1*time.Second && eventCount == 0 {
+ glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
+ return errors.New("very short watch")
+ }
+ glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
+ return nil
+}
+
+// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
+// The value returned is not synchronized with access to the underlying store and is not thread-safe
+func (r *Reflector) LastSyncResourceVersion() string {
+ r.lastSyncResourceVersionMutex.RLock()
+ defer r.lastSyncResourceVersionMutex.RUnlock()
+ return r.lastSyncResourceVersion
+}
+
+func (r *Reflector) setLastSyncResourceVersion(v string) {
+ r.lastSyncResourceVersionMutex.Lock()
+ defer r.lastSyncResourceVersionMutex.Unlock()
+ r.lastSyncResourceVersion = v
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/shared_informer.go b/vendor/k8s.io/client-go/tools/cache/shared_informer.go
new file mode 100644
index 000000000..a0dbbb697
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/shared_informer.go
@@ -0,0 +1,581 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
+
+ "github.com/golang/glog"
+)
+
+// SharedInformer has a shared data cache and is capable of distributing notifications for changes
+// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
+// one behavior change compared to a standard Informer. When you receive a notification, the cache
+// will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
+// on the contents of the cache exactly matching the notification you've received in handler
+// functions. If there was a create, followed by a delete, the cache may NOT have your item. This
+// has advantages over the broadcaster since it allows us to share a common cache across many
+// controllers. Extending the broadcaster would have required us keep duplicate caches for each
+// watch.
+type SharedInformer interface {
+ // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
+ // period. Events to a single handler are delivered sequentially, but there is no coordination
+ // between different handlers.
+ AddEventHandler(handler ResourceEventHandler)
+ // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
+ // specified resync period. Events to a single handler are delivered sequentially, but there is
+ // no coordination between different handlers.
+ AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
+ // GetStore returns the Store.
+ GetStore() Store
+ // GetController gives back a synthetic interface that "votes" to start the informer
+ GetController() Controller
+ // Run starts the shared informer, which will be stopped when stopCh is closed.
+ Run(stopCh <-chan struct{})
+ // HasSynced returns true if the shared informer's store has synced.
+ HasSynced() bool
+ // LastSyncResourceVersion is the resource version observed when last synced with the underlying
+ // store. The value returned is not synchronized with access to the underlying store and is not
+ // thread-safe.
+ LastSyncResourceVersion() string
+}
+
+type SharedIndexInformer interface {
+ SharedInformer
+ // AddIndexers add indexers to the informer before it starts.
+ AddIndexers(indexers Indexers) error
+ GetIndexer() Indexer
+}
+
+// NewSharedInformer creates a new instance for the listwatcher.
+func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
+ return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
+}
+
+// NewSharedIndexInformer creates a new instance for the listwatcher.
+func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
+ realClock := &clock.RealClock{}
+ sharedIndexInformer := &sharedIndexInformer{
+ processor: &sharedProcessor{clock: realClock},
+ indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
+ listerWatcher: lw,
+ objectType: objType,
+ resyncCheckPeriod: defaultEventHandlerResyncPeriod,
+ defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
+ cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
+ clock: realClock,
+ }
+ return sharedIndexInformer
+}
+
+// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
+type InformerSynced func() bool
+
+// syncedPollPeriod controls how often you look at the status of your sync funcs
+const syncedPollPeriod = 100 * time.Millisecond
+
+// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
+// if the controller should shutdown
+func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
+ err := wait.PollUntil(syncedPollPeriod,
+ func() (bool, error) {
+ for _, syncFunc := range cacheSyncs {
+ if !syncFunc() {
+ return false, nil
+ }
+ }
+ return true, nil
+ },
+ stopCh)
+ if err != nil {
+ glog.V(2).Infof("stop requested")
+ return false
+ }
+
+ glog.V(4).Infof("caches populated")
+ return true
+}
+
+type sharedIndexInformer struct {
+ indexer Indexer
+ controller Controller
+
+ processor *sharedProcessor
+ cacheMutationDetector CacheMutationDetector
+
+ // This block is tracked to handle late initialization of the controller
+ listerWatcher ListerWatcher
+ objectType runtime.Object
+
+ // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
+ // shouldResync to check if any of our listeners need a resync.
+ resyncCheckPeriod time.Duration
+ // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
+ // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
+ // value).
+ defaultEventHandlerResyncPeriod time.Duration
+ // clock allows for testability
+ clock clock.Clock
+
+ started bool
+ startedLock sync.Mutex
+
+ // blockDeltas gives a way to stop all event distribution so that a late event handler
+ // can safely join the shared informer.
+ blockDeltas sync.Mutex
+ // stopCh is the channel used to stop the main Run process. We have to track it so that
+ // late joiners can have a proper stop
+ stopCh <-chan struct{}
+}
+
+// dummyController hides the fact that a SharedInformer is different from a dedicated one
+// where a caller can `Run`. The run method is disconnected in this case, because higher
+// level logic will decide when to start the SharedInformer and related controller.
+// Because returning information back is always asynchronous, the legacy callers shouldn't
+// notice any change in behavior.
+type dummyController struct {
+ informer *sharedIndexInformer
+}
+
+func (v *dummyController) Run(stopCh <-chan struct{}) {
+}
+
+func (v *dummyController) HasSynced() bool {
+ return v.informer.HasSynced()
+}
+
+func (c *dummyController) LastSyncResourceVersion() string {
+ return ""
+}
+
+type updateNotification struct {
+ oldObj interface{}
+ newObj interface{}
+}
+
+type addNotification struct {
+ newObj interface{}
+}
+
+type deleteNotification struct {
+ oldObj interface{}
+}
+
+func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: s.listerWatcher,
+ ObjectType: s.objectType,
+ FullResyncPeriod: s.resyncCheckPeriod,
+ RetryOnError: false,
+ ShouldResync: s.processor.shouldResync,
+
+ Process: s.HandleDeltas,
+ }
+
+ func() {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ s.controller = New(cfg)
+ s.controller.(*controller).clock = s.clock
+ s.started = true
+ }()
+
+ s.stopCh = stopCh
+ s.cacheMutationDetector.Run(stopCh)
+ s.processor.run(stopCh)
+ s.controller.Run(stopCh)
+}
+
+func (s *sharedIndexInformer) isStarted() bool {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+ return s.started
+}
+
+func (s *sharedIndexInformer) HasSynced() bool {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.controller == nil {
+ return false
+ }
+ return s.controller.HasSynced()
+}
+
+func (s *sharedIndexInformer) LastSyncResourceVersion() string {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.controller == nil {
+ return ""
+ }
+ return s.controller.LastSyncResourceVersion()
+}
+
+func (s *sharedIndexInformer) GetStore() Store {
+ return s.indexer
+}
+
+func (s *sharedIndexInformer) GetIndexer() Indexer {
+ return s.indexer
+}
+
+func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.started {
+ return fmt.Errorf("informer has already started")
+ }
+
+ return s.indexer.AddIndexers(indexers)
+}
+
+func (s *sharedIndexInformer) GetController() Controller {
+ return &dummyController{informer: s}
+}
+
+func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
+ s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
+}
+
+func determineResyncPeriod(desired, check time.Duration) time.Duration {
+ if desired == 0 {
+ return desired
+ }
+ if check == 0 {
+ glog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
+ return 0
+ }
+ if desired < check {
+ glog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
+ return check
+ }
+ return desired
+}
+
+const minimumResyncPeriod = 1 * time.Second
+
+func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if resyncPeriod > 0 {
+ if resyncPeriod < minimumResyncPeriod {
+ glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
+ resyncPeriod = minimumResyncPeriod
+ }
+
+ if resyncPeriod < s.resyncCheckPeriod {
+ if s.started {
+ glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
+ resyncPeriod = s.resyncCheckPeriod
+ } else {
+ // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
+ // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
+ // accordingly
+ s.resyncCheckPeriod = resyncPeriod
+ s.processor.resyncCheckPeriodChanged(resyncPeriod)
+ }
+ }
+ }
+
+ listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
+
+ if !s.started {
+ s.processor.addListener(listener)
+ return
+ }
+
+ // in order to safely join, we have to
+ // 1. stop sending add/update/delete notifications
+ // 2. do a list against the store
+ // 3. send synthetic "Add" events to the new handler
+ // 4. unblock
+ s.blockDeltas.Lock()
+ defer s.blockDeltas.Unlock()
+
+ s.processor.addListener(listener)
+
+ go listener.run(s.stopCh)
+ go listener.pop(s.stopCh)
+
+ items := s.indexer.List()
+ for i := range items {
+ listener.add(addNotification{newObj: items[i]})
+ }
+}
+
+func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
+ s.blockDeltas.Lock()
+ defer s.blockDeltas.Unlock()
+
+ // from oldest to newest
+ for _, d := range obj.(Deltas) {
+ switch d.Type {
+ case Sync, Added, Updated:
+ isSync := d.Type == Sync
+ s.cacheMutationDetector.AddObject(d.Object)
+ if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
+ if err := s.indexer.Update(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
+ } else {
+ if err := s.indexer.Add(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(addNotification{newObj: d.Object}, isSync)
+ }
+ case Deleted:
+ if err := s.indexer.Delete(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
+ }
+ }
+ return nil
+}
+
+type sharedProcessor struct {
+ listenersLock sync.RWMutex
+ listeners []*processorListener
+ syncingListeners []*processorListener
+ clock clock.Clock
+}
+
+func (p *sharedProcessor) addListener(listener *processorListener) {
+ p.listenersLock.Lock()
+ defer p.listenersLock.Unlock()
+
+ p.listeners = append(p.listeners, listener)
+ p.syncingListeners = append(p.syncingListeners, listener)
+}
+
+func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
+ p.listenersLock.RLock()
+ defer p.listenersLock.RUnlock()
+
+ if sync {
+ for _, listener := range p.syncingListeners {
+ listener.add(obj)
+ }
+ } else {
+ for _, listener := range p.listeners {
+ listener.add(obj)
+ }
+ }
+}
+
+func (p *sharedProcessor) run(stopCh <-chan struct{}) {
+ p.listenersLock.RLock()
+ defer p.listenersLock.RUnlock()
+
+ for _, listener := range p.listeners {
+ go listener.run(stopCh)
+ go listener.pop(stopCh)
+ }
+}
+
+// shouldResync queries every listener to determine if any of them need a resync, based on each
+// listener's resyncPeriod.
+func (p *sharedProcessor) shouldResync() bool {
+ p.listenersLock.Lock()
+ defer p.listenersLock.Unlock()
+
+ p.syncingListeners = []*processorListener{}
+
+ resyncNeeded := false
+ now := p.clock.Now()
+ for _, listener := range p.listeners {
+ // need to loop through all the listeners to see if they need to resync so we can prepare any
+ // listeners that are going to be resyncing.
+ if listener.shouldResync(now) {
+ resyncNeeded = true
+ p.syncingListeners = append(p.syncingListeners, listener)
+ listener.determineNextResync(now)
+ }
+ }
+ return resyncNeeded
+}
+
+func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
+ p.listenersLock.RLock()
+ defer p.listenersLock.RUnlock()
+
+ for _, listener := range p.listeners {
+ resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
+ listener.setResyncPeriod(resyncPeriod)
+ }
+}
+
+type processorListener struct {
+ // lock/cond protects access to 'pendingNotifications'.
+ lock sync.RWMutex
+ cond sync.Cond
+
+ // pendingNotifications is an unbounded slice that holds all notifications not yet distributed
+ // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
+ // added until we OOM.
+ // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
+ // we should try to do something better
+ pendingNotifications []interface{}
+
+ nextCh chan interface{}
+
+ handler ResourceEventHandler
+
+ // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
+ requestedResyncPeriod time.Duration
+ // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
+ // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
+ // informer's overall resync check period.
+ resyncPeriod time.Duration
+ // nextResync is the earliest time the listener should get a full resync
+ nextResync time.Time
+ // resyncLock guards access to resyncPeriod and nextResync
+ resyncLock sync.Mutex
+}
+
+func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
+ ret := &processorListener{
+ pendingNotifications: []interface{}{},
+ nextCh: make(chan interface{}),
+ handler: handler,
+ requestedResyncPeriod: requestedResyncPeriod,
+ resyncPeriod: resyncPeriod,
+ }
+
+ ret.cond.L = &ret.lock
+
+ ret.determineNextResync(now)
+
+ return ret
+}
+
+func (p *processorListener) add(notification interface{}) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.pendingNotifications = append(p.pendingNotifications, notification)
+ p.cond.Broadcast()
+}
+
+func (p *processorListener) pop(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ for {
+ blockingGet := func() (interface{}, bool) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ for len(p.pendingNotifications) == 0 {
+ // check if we're shutdown
+ select {
+ case <-stopCh:
+ return nil, true
+ default:
+ }
+ p.cond.Wait()
+ }
+
+ nt := p.pendingNotifications[0]
+ p.pendingNotifications = p.pendingNotifications[1:]
+ return nt, false
+ }
+
+ notification, stopped := blockingGet()
+ if stopped {
+ return
+ }
+
+ select {
+ case <-stopCh:
+ return
+ case p.nextCh <- notification:
+ }
+ }
+}
+
+func (p *processorListener) run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ for {
+ var next interface{}
+ select {
+ case <-stopCh:
+ func() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ p.cond.Broadcast()
+ }()
+ return
+ case next = <-p.nextCh:
+ }
+
+ switch notification := next.(type) {
+ case updateNotification:
+ p.handler.OnUpdate(notification.oldObj, notification.newObj)
+ case addNotification:
+ p.handler.OnAdd(notification.newObj)
+ case deleteNotification:
+ p.handler.OnDelete(notification.oldObj)
+ default:
+ utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
+ }
+ }
+}
+
+// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
+// this always returns false.
+func (p *processorListener) shouldResync(now time.Time) bool {
+ p.resyncLock.Lock()
+ defer p.resyncLock.Unlock()
+
+ if p.resyncPeriod == 0 {
+ return false
+ }
+
+ return now.After(p.nextResync) || now.Equal(p.nextResync)
+}
+
+func (p *processorListener) determineNextResync(now time.Time) {
+ p.resyncLock.Lock()
+ defer p.resyncLock.Unlock()
+
+ p.nextResync = now.Add(p.resyncPeriod)
+}
+
+func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
+ p.resyncLock.Lock()
+ defer p.resyncLock.Unlock()
+
+ p.resyncPeriod = resyncPeriod
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/store.go b/vendor/k8s.io/client-go/tools/cache/store.go
new file mode 100755
index 000000000..4958987f0
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/store.go
@@ -0,0 +1,244 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "strings"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+)
+
+// Store is a generic object storage interface. Reflector knows how to watch a server
+// and update a store. A generic store is provided, which allows Reflector to be used
+// as a local caching system, and an LRU store, which allows Reflector to work like a
+// queue of items yet to be processed.
+//
+// Store makes no assumptions about stored object identity; it is the responsibility
+// of a Store implementation to provide a mechanism to correctly key objects and to
+// define the contract for obtaining objects by some arbitrary key type.
+type Store interface {
+ Add(obj interface{}) error
+ Update(obj interface{}) error
+ Delete(obj interface{}) error
+ List() []interface{}
+ ListKeys() []string
+ Get(obj interface{}) (item interface{}, exists bool, err error)
+ GetByKey(key string) (item interface{}, exists bool, err error)
+
+ // Replace will delete the contents of the store, using instead the
+ // given list. Store takes ownership of the list, you should not reference
+ // it after calling this function.
+ Replace([]interface{}, string) error
+ Resync() error
+}
+
+// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
+type KeyFunc func(obj interface{}) (string, error)
+
+// KeyError will be returned any time a KeyFunc gives an error; it includes the object
+// at fault.
+type KeyError struct {
+ Obj interface{}
+ Err error
+}
+
+// Error gives a human-readable description of the error.
+func (k KeyError) Error() string {
+ return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
+}
+
+// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
+// the object but not the object itself.
+type ExplicitKey string
+
+// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
+// keys for API objects which implement meta.Interface.
+// The key uses the format <namespace>/<name> unless <namespace> is empty, then
+// it's just <name>.
+//
+// TODO: replace key-as-string with a key-as-struct so that this
+// packing/unpacking won't be necessary.
+func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
+ if key, ok := obj.(ExplicitKey); ok {
+ return string(key), nil
+ }
+ meta, err := meta.Accessor(obj)
+ if err != nil {
+ return "", fmt.Errorf("object has no meta: %v", err)
+ }
+ if len(meta.GetNamespace()) > 0 {
+ return meta.GetNamespace() + "/" + meta.GetName(), nil
+ }
+ return meta.GetName(), nil
+}
+
+// SplitMetaNamespaceKey returns the namespace and name that
+// MetaNamespaceKeyFunc encoded into key.
+//
+// TODO: replace key-as-string with a key-as-struct so that this
+// packing/unpacking won't be necessary.
+func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
+ parts := strings.Split(key, "/")
+ switch len(parts) {
+ case 1:
+ // name only, no namespace
+ return "", parts[0], nil
+ case 2:
+ // namespace and name
+ return parts[0], parts[1], nil
+ }
+
+ return "", "", fmt.Errorf("unexpected key format: %q", key)
+}
+
+// cache responsibilities are limited to:
+// 1. Computing keys for objects via keyFunc
+// 2. Invoking methods of a ThreadSafeStorage interface
+type cache struct {
+ // cacheStorage bears the burden of thread safety for the cache
+ cacheStorage ThreadSafeStore
+ // keyFunc is used to make the key for objects stored in and retrieved from items, and
+ // should be deterministic.
+ keyFunc KeyFunc
+}
+
+var _ Store = &cache{}
+
+// Add inserts an item into the cache.
+func (c *cache) Add(obj interface{}) error {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Add(key, obj)
+ return nil
+}
+
+// Update sets an item in the cache to its updated state.
+func (c *cache) Update(obj interface{}) error {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Update(key, obj)
+ return nil
+}
+
+// Delete removes an item from the cache.
+func (c *cache) Delete(obj interface{}) error {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Delete(key)
+ return nil
+}
+
+// List returns a list of all the items.
+// List is completely threadsafe as long as you treat all items as immutable.
+func (c *cache) List() []interface{} {
+ return c.cacheStorage.List()
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the cache.
+func (c *cache) ListKeys() []string {
+ return c.cacheStorage.ListKeys()
+}
+
+// GetIndexers returns the indexers of cache
+func (c *cache) GetIndexers() Indexers {
+ return c.cacheStorage.GetIndexers()
+}
+
+// Index returns a list of items that match on the index function
+// Index is thread-safe so long as you treat all items as immutable
+func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
+ return c.cacheStorage.Index(indexName, obj)
+}
+
+func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {
+ return c.cacheStorage.IndexKeys(indexName, indexKey)
+}
+
+// ListIndexFuncValues returns the list of generated values of an Index func
+func (c *cache) ListIndexFuncValues(indexName string) []string {
+ return c.cacheStorage.ListIndexFuncValues(indexName)
+}
+
+func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
+ return c.cacheStorage.ByIndex(indexName, indexKey)
+}
+
+func (c *cache) AddIndexers(newIndexers Indexers) error {
+ return c.cacheStorage.AddIndexers(newIndexers)
+}
+
+// Get returns the requested item, or sets exists=false.
+// Get is completely threadsafe as long as you treat all items as immutable.
+func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return c.GetByKey(key)
+}
+
+// GetByKey returns the request item, or exists=false.
+// GetByKey is completely threadsafe as long as you treat all items as immutable.
+func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
+ item, exists = c.cacheStorage.Get(key)
+ return item, exists, nil
+}
+
+// Replace will delete the contents of 'c', using instead the given list.
+// 'c' takes ownership of the list, you should not reference the list again
+// after calling this function.
+func (c *cache) Replace(list []interface{}, resourceVersion string) error {
+ items := map[string]interface{}{}
+ for _, item := range list {
+ key, err := c.keyFunc(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ items[key] = item
+ }
+ c.cacheStorage.Replace(items, resourceVersion)
+ return nil
+}
+
+// Resync touches all items in the store to force processing
+func (c *cache) Resync() error {
+ return c.cacheStorage.Resync()
+}
+
+// NewStore returns a Store implemented simply with a map and a lock.
+func NewStore(keyFunc KeyFunc) Store {
+ return &cache{
+ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
+ keyFunc: keyFunc,
+ }
+}
+
+// NewIndexer returns an Indexer implemented simply with a map and a lock.
+func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
+ return &cache{
+ cacheStorage: NewThreadSafeStore(indexers, Indices{}),
+ keyFunc: keyFunc,
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go b/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
new file mode 100644
index 000000000..4eb350c43
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
@@ -0,0 +1,306 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "sync"
+
+ "k8s.io/apimachinery/pkg/util/sets"
+)
+
+// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
+// TL;DR caveats: you must not modify anything returned by Get or List as it will break
+// the indexing feature in addition to not being thread safe.
+//
+// The guarantees of thread safety provided by List/Get are only valid if the caller
+// treats returned items as read-only. For example, a pointer inserted in the store
+// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
+// on the same key and modify the pointer in a non-thread-safe way. Also note that
+// modifying objects stored by the indexers (if any) will *not* automatically lead
+// to a re-index. So it's not a good idea to directly modify the objects returned by
+// Get/List, in general.
+type ThreadSafeStore interface {
+ Add(key string, obj interface{})
+ Update(key string, obj interface{})
+ Delete(key string)
+ Get(key string) (item interface{}, exists bool)
+ List() []interface{}
+ ListKeys() []string
+ Replace(map[string]interface{}, string)
+ Index(indexName string, obj interface{}) ([]interface{}, error)
+ IndexKeys(indexName, indexKey string) ([]string, error)
+ ListIndexFuncValues(name string) []string
+ ByIndex(indexName, indexKey string) ([]interface{}, error)
+ GetIndexers() Indexers
+
+ // AddIndexers adds more indexers to this store. If you call this after you already have data
+ // in the store, the results are undefined.
+ AddIndexers(newIndexers Indexers) error
+ Resync() error
+}
+
+// threadSafeMap implements ThreadSafeStore
+type threadSafeMap struct {
+ lock sync.RWMutex
+ items map[string]interface{}
+
+ // indexers maps a name to an IndexFunc
+ indexers Indexers
+ // indices maps a name to an Index
+ indices Indices
+}
+
+func (c *threadSafeMap) Add(key string, obj interface{}) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ oldObject := c.items[key]
+ c.items[key] = obj
+ c.updateIndices(oldObject, obj, key)
+}
+
+func (c *threadSafeMap) Update(key string, obj interface{}) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ oldObject := c.items[key]
+ c.items[key] = obj
+ c.updateIndices(oldObject, obj, key)
+}
+
+func (c *threadSafeMap) Delete(key string) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if obj, exists := c.items[key]; exists {
+ c.deleteFromIndices(obj, key)
+ delete(c.items, key)
+ }
+}
+
+func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ item, exists = c.items[key]
+ return item, exists
+}
+
+func (c *threadSafeMap) List() []interface{} {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ list := make([]interface{}, 0, len(c.items))
+ for _, item := range c.items {
+ list = append(list, item)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the threadSafeMap.
+func (c *threadSafeMap) ListKeys() []string {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ list := make([]string, 0, len(c.items))
+ for key := range c.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ c.items = items
+
+ // rebuild any index
+ c.indices = Indices{}
+ for key, item := range c.items {
+ c.updateIndices(nil, item, key)
+ }
+}
+
+// Index returns a list of items that match on the index function
+// Index is thread-safe so long as you treat all items as immutable
+func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ indexFunc := c.indexers[indexName]
+ if indexFunc == nil {
+ return nil, fmt.Errorf("Index with name %s does not exist", indexName)
+ }
+
+ indexKeys, err := indexFunc(obj)
+ if err != nil {
+ return nil, err
+ }
+ index := c.indices[indexName]
+
+ // need to de-dupe the return list. Since multiple keys are allowed, this can happen.
+ returnKeySet := sets.String{}
+ for _, indexKey := range indexKeys {
+ set := index[indexKey]
+ for _, key := range set.UnsortedList() {
+ returnKeySet.Insert(key)
+ }
+ }
+
+ list := make([]interface{}, 0, returnKeySet.Len())
+ for absoluteKey := range returnKeySet {
+ list = append(list, c.items[absoluteKey])
+ }
+ return list, nil
+}
+
+// ByIndex returns a list of items that match an exact value on the index function
+func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ indexFunc := c.indexers[indexName]
+ if indexFunc == nil {
+ return nil, fmt.Errorf("Index with name %s does not exist", indexName)
+ }
+
+ index := c.indices[indexName]
+
+ set := index[indexKey]
+ list := make([]interface{}, 0, set.Len())
+ for _, key := range set.List() {
+ list = append(list, c.items[key])
+ }
+
+ return list, nil
+}
+
+// IndexKeys returns a list of keys that match on the index function.
+// IndexKeys is thread-safe so long as you treat all items as immutable.
+func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ indexFunc := c.indexers[indexName]
+ if indexFunc == nil {
+ return nil, fmt.Errorf("Index with name %s does not exist", indexName)
+ }
+
+ index := c.indices[indexName]
+
+ set := index[indexKey]
+ return set.List(), nil
+}
+
+func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ index := c.indices[indexName]
+ names := make([]string, 0, len(index))
+ for key := range index {
+ names = append(names, key)
+ }
+ return names
+}
+
+func (c *threadSafeMap) GetIndexers() Indexers {
+ return c.indexers
+}
+
+func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if len(c.items) > 0 {
+ return fmt.Errorf("cannot add indexers to running index")
+ }
+
+ oldKeys := sets.StringKeySet(c.indexers)
+ newKeys := sets.StringKeySet(newIndexers)
+
+ if oldKeys.HasAny(newKeys.List()...) {
+ return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
+ }
+
+ for k, v := range newIndexers {
+ c.indexers[k] = v
+ }
+ return nil
+}
+
+// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
+// updateIndices must be called from a function that already has a lock on the cache
+func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
+ // if we got an old object, we need to remove it before we add it again
+ if oldObj != nil {
+ c.deleteFromIndices(oldObj, key)
+ }
+ for name, indexFunc := range c.indexers {
+ indexValues, err := indexFunc(newObj)
+ if err != nil {
+ return err
+ }
+ index := c.indices[name]
+ if index == nil {
+ index = Index{}
+ c.indices[name] = index
+ }
+
+ for _, indexValue := range indexValues {
+ set := index[indexValue]
+ if set == nil {
+ set = sets.String{}
+ index[indexValue] = set
+ }
+ set.Insert(key)
+ }
+ }
+ return nil
+}
+
+// deleteFromIndices removes the object from each of the managed indexes
+// it is intended to be called from a function that already has a lock on the cache
+func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
+ for name, indexFunc := range c.indexers {
+ indexValues, err := indexFunc(obj)
+ if err != nil {
+ return err
+ }
+
+ index := c.indices[name]
+ if index == nil {
+ continue
+ }
+ for _, indexValue := range indexValues {
+ set := index[indexValue]
+ if set != nil {
+ set.Delete(key)
+ }
+ }
+ }
+ return nil
+}
+
+func (c *threadSafeMap) Resync() error {
+ // Nothing to do
+ return nil
+}
+
+func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
+ return &threadSafeMap{
+ items: map[string]interface{}{},
+ indexers: indexers,
+ indices: indices,
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/undelta_store.go b/vendor/k8s.io/client-go/tools/cache/undelta_store.go
new file mode 100644
index 000000000..117df46c4
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/undelta_store.go
@@ -0,0 +1,83 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+// UndeltaStore listens to incremental updates and sends complete state on every change.
+// It implements the Store interface so that it can receive a stream of mirrored objects
+// from Reflector. Whenever it receives any complete (Store.Replace) or incremental change
+// (Store.Add, Store.Update, Store.Delete), it sends the complete state by calling PushFunc.
+// It is thread-safe. It guarantees that every change (Add, Update, Replace, Delete) results
+// in one call to PushFunc, but sometimes PushFunc may be called twice with the same values.
+// PushFunc should be thread safe.
+type UndeltaStore struct {
+ Store
+ PushFunc func([]interface{})
+}
+
+// Assert that it implements the Store interface.
+var _ Store = &UndeltaStore{}
+
+// Note about thread safety. The Store implementation (cache.cache) uses a lock for all methods.
+// In the functions below, the lock gets released and reacquired betweend the {Add,Delete,etc}
+// and the List. So, the following can happen, resulting in two identical calls to PushFunc.
+// time thread 1 thread 2
+// 0 UndeltaStore.Add(a)
+// 1 UndeltaStore.Add(b)
+// 2 Store.Add(a)
+// 3 Store.Add(b)
+// 4 Store.List() -> [a,b]
+// 5 Store.List() -> [a,b]
+
+func (u *UndeltaStore) Add(obj interface{}) error {
+ if err := u.Store.Add(obj); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+func (u *UndeltaStore) Update(obj interface{}) error {
+ if err := u.Store.Update(obj); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+func (u *UndeltaStore) Delete(obj interface{}) error {
+ if err := u.Store.Delete(obj); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
+ if err := u.Store.Replace(list, resourceVersion); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+// NewUndeltaStore returns an UndeltaStore implemented with a Store.
+func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore {
+ return &UndeltaStore{
+ Store: NewStore(keyFunc),
+ PushFunc: pushFunc,
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/clientcmd/api/helpers.go b/vendor/k8s.io/client-go/tools/clientcmd/api/helpers.go
new file mode 100644
index 000000000..43e26487c
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/clientcmd/api/helpers.go
@@ -0,0 +1,183 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path"
+ "path/filepath"
+)
+
+func init() {
+ sDec, _ := base64.StdEncoding.DecodeString("REDACTED+")
+ redactedBytes = []byte(string(sDec))
+}
+
+// IsConfigEmpty returns true if the config is empty.
+func IsConfigEmpty(config *Config) bool {
+ return len(config.AuthInfos) == 0 && len(config.Clusters) == 0 && len(config.Contexts) == 0 &&
+ len(config.CurrentContext) == 0 &&
+ len(config.Preferences.Extensions) == 0 && !config.Preferences.Colors &&
+ len(config.Extensions) == 0
+}
+
+// MinifyConfig read the current context and uses that to keep only the relevant pieces of config
+// This is useful for making secrets based on kubeconfig files
+func MinifyConfig(config *Config) error {
+ if len(config.CurrentContext) == 0 {
+ return errors.New("current-context must exist in order to minify")
+ }
+
+ currContext, exists := config.Contexts[config.CurrentContext]
+ if !exists {
+ return fmt.Errorf("cannot locate context %v", config.CurrentContext)
+ }
+
+ newContexts := map[string]*Context{}
+ newContexts[config.CurrentContext] = currContext
+
+ newClusters := map[string]*Cluster{}
+ if len(currContext.Cluster) > 0 {
+ if _, exists := config.Clusters[currContext.Cluster]; !exists {
+ return fmt.Errorf("cannot locate cluster %v", currContext.Cluster)
+ }
+
+ newClusters[currContext.Cluster] = config.Clusters[currContext.Cluster]
+ }
+
+ newAuthInfos := map[string]*AuthInfo{}
+ if len(currContext.AuthInfo) > 0 {
+ if _, exists := config.AuthInfos[currContext.AuthInfo]; !exists {
+ return fmt.Errorf("cannot locate user %v", currContext.AuthInfo)
+ }
+
+ newAuthInfos[currContext.AuthInfo] = config.AuthInfos[currContext.AuthInfo]
+ }
+
+ config.AuthInfos = newAuthInfos
+ config.Clusters = newClusters
+ config.Contexts = newContexts
+
+ return nil
+}
+
+var redactedBytes []byte
+
+// Flatten redacts raw data entries from the config object for a human-readable view.
+func ShortenConfig(config *Config) {
+ // trick json encoder into printing a human readable string in the raw data
+ // by base64 decoding what we want to print. Relies on implementation of
+ // http://golang.org/pkg/encoding/json/#Marshal using base64 to encode []byte
+ for key, authInfo := range config.AuthInfos {
+ if len(authInfo.ClientKeyData) > 0 {
+ authInfo.ClientKeyData = redactedBytes
+ }
+ if len(authInfo.ClientCertificateData) > 0 {
+ authInfo.ClientCertificateData = redactedBytes
+ }
+ config.AuthInfos[key] = authInfo
+ }
+ for key, cluster := range config.Clusters {
+ if len(cluster.CertificateAuthorityData) > 0 {
+ cluster.CertificateAuthorityData = redactedBytes
+ }
+ config.Clusters[key] = cluster
+ }
+}
+
+// Flatten changes the config object into a self contained config (useful for making secrets)
+func FlattenConfig(config *Config) error {
+ for key, authInfo := range config.AuthInfos {
+ baseDir, err := MakeAbs(path.Dir(authInfo.LocationOfOrigin), "")
+ if err != nil {
+ return err
+ }
+
+ if err := FlattenContent(&authInfo.ClientCertificate, &authInfo.ClientCertificateData, baseDir); err != nil {
+ return err
+ }
+ if err := FlattenContent(&authInfo.ClientKey, &authInfo.ClientKeyData, baseDir); err != nil {
+ return err
+ }
+
+ config.AuthInfos[key] = authInfo
+ }
+ for key, cluster := range config.Clusters {
+ baseDir, err := MakeAbs(path.Dir(cluster.LocationOfOrigin), "")
+ if err != nil {
+ return err
+ }
+
+ if err := FlattenContent(&cluster.CertificateAuthority, &cluster.CertificateAuthorityData, baseDir); err != nil {
+ return err
+ }
+
+ config.Clusters[key] = cluster
+ }
+
+ return nil
+}
+
+func FlattenContent(path *string, contents *[]byte, baseDir string) error {
+ if len(*path) != 0 {
+ if len(*contents) > 0 {
+ return errors.New("cannot have values for both path and contents")
+ }
+
+ var err error
+ absPath := ResolvePath(*path, baseDir)
+ *contents, err = ioutil.ReadFile(absPath)
+ if err != nil {
+ return err
+ }
+
+ *path = ""
+ }
+
+ return nil
+}
+
+// ResolvePath returns the path as an absolute paths, relative to the given base directory
+func ResolvePath(path string, base string) string {
+ // Don't resolve empty paths
+ if len(path) > 0 {
+ // Don't resolve absolute paths
+ if !filepath.IsAbs(path) {
+ return filepath.Join(base, path)
+ }
+ }
+
+ return path
+}
+
+func MakeAbs(path, base string) (string, error) {
+ if filepath.IsAbs(path) {
+ return path, nil
+ }
+ if len(base) == 0 {
+ cwd, err := os.Getwd()
+ if err != nil {
+ return "", err
+ }
+ base = cwd
+ }
+ return filepath.Join(base, path), nil
+}
diff --git a/vendor/k8s.io/client-go/tools/clientcmd/api/register.go b/vendor/k8s.io/client-go/tools/clientcmd/api/register.go
new file mode 100644
index 000000000..2eec3881c
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/clientcmd/api/register.go
@@ -0,0 +1,46 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+)
+
+// SchemeGroupVersion is group version used to register these objects
+// TODO this should be in the "kubeconfig" group
+var SchemeGroupVersion = schema.GroupVersion{Group: "", Version: runtime.APIVersionInternal}
+
+var (
+ SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
+ AddToScheme = SchemeBuilder.AddToScheme
+)
+
+func addKnownTypes(scheme *runtime.Scheme) error {
+ scheme.AddKnownTypes(SchemeGroupVersion,
+ &Config{},
+ )
+ return nil
+}
+
+func (obj *Config) GetObjectKind() schema.ObjectKind { return obj }
+func (obj *Config) SetGroupVersionKind(gvk schema.GroupVersionKind) {
+ obj.APIVersion, obj.Kind = gvk.ToAPIVersionAndKind()
+}
+func (obj *Config) GroupVersionKind() schema.GroupVersionKind {
+ return schema.FromAPIVersionAndKind(obj.APIVersion, obj.Kind)
+}
diff --git a/vendor/k8s.io/client-go/tools/clientcmd/api/types.go b/vendor/k8s.io/client-go/tools/clientcmd/api/types.go
new file mode 100644
index 000000000..76090c6f5
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/clientcmd/api/types.go
@@ -0,0 +1,185 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// Where possible, json tags match the cli argument names.
+// Top level config objects and all values required for proper functioning are not "omitempty". Any truly optional piece of config is allowed to be omitted.
+
+// Config holds the information needed to build connect to remote kubernetes clusters as a given user
+// IMPORTANT if you add fields to this struct, please update IsConfigEmpty()
+type Config struct {
+ // Legacy field from pkg/api/types.go TypeMeta.
+ // TODO(jlowdermilk): remove this after eliminating downstream dependencies.
+ // +optional
+ Kind string `json:"kind,omitempty"`
+ // Legacy field from pkg/api/types.go TypeMeta.
+ // TODO(jlowdermilk): remove this after eliminating downstream dependencies.
+ // +optional
+ APIVersion string `json:"apiVersion,omitempty"`
+ // Preferences holds general information to be use for cli interactions
+ Preferences Preferences `json:"preferences"`
+ // Clusters is a map of referencable names to cluster configs
+ Clusters map[string]*Cluster `json:"clusters"`
+ // AuthInfos is a map of referencable names to user configs
+ AuthInfos map[string]*AuthInfo `json:"users"`
+ // Contexts is a map of referencable names to context configs
+ Contexts map[string]*Context `json:"contexts"`
+ // CurrentContext is the name of the context that you would like to use by default
+ CurrentContext string `json:"current-context"`
+ // Extensions holds additional information. This is useful for extenders so that reads and writes don't clobber unknown fields
+ // +optional
+ Extensions map[string]runtime.Object `json:"extensions,omitempty"`
+}
+
+// IMPORTANT if you add fields to this struct, please update IsConfigEmpty()
+type Preferences struct {
+ // +optional
+ Colors bool `json:"colors,omitempty"`
+ // Extensions holds additional information. This is useful for extenders so that reads and writes don't clobber unknown fields
+ // +optional
+ Extensions map[string]runtime.Object `json:"extensions,omitempty"`
+}
+
+// Cluster contains information about how to communicate with a kubernetes cluster
+type Cluster struct {
+ // LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
+ LocationOfOrigin string
+ // Server is the address of the kubernetes cluster (https://hostname:port).
+ Server string `json:"server"`
+ // InsecureSkipTLSVerify skips the validity check for the server's certificate. This will make your HTTPS connections insecure.
+ // +optional
+ InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify,omitempty"`
+ // CertificateAuthority is the path to a cert file for the certificate authority.
+ // +optional
+ CertificateAuthority string `json:"certificate-authority,omitempty"`
+ // CertificateAuthorityData contains PEM-encoded certificate authority certificates. Overrides CertificateAuthority
+ // +optional
+ CertificateAuthorityData []byte `json:"certificate-authority-data,omitempty"`
+ // Extensions holds additional information. This is useful for extenders so that reads and writes don't clobber unknown fields
+ // +optional
+ Extensions map[string]runtime.Object `json:"extensions,omitempty"`
+}
+
+// AuthInfo contains information that describes identity information. This is use to tell the kubernetes cluster who you are.
+type AuthInfo struct {
+ // LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
+ LocationOfOrigin string
+ // ClientCertificate is the path to a client cert file for TLS.
+ // +optional
+ ClientCertificate string `json:"client-certificate,omitempty"`
+ // ClientCertificateData contains PEM-encoded data from a client cert file for TLS. Overrides ClientCertificate
+ // +optional
+ ClientCertificateData []byte `json:"client-certificate-data,omitempty"`
+ // ClientKey is the path to a client key file for TLS.
+ // +optional
+ ClientKey string `json:"client-key,omitempty"`
+ // ClientKeyData contains PEM-encoded data from a client key file for TLS. Overrides ClientKey
+ // +optional
+ ClientKeyData []byte `json:"client-key-data,omitempty"`
+ // Token is the bearer token for authentication to the kubernetes cluster.
+ // +optional
+ Token string `json:"token,omitempty"`
+ // TokenFile is a pointer to a file that contains a bearer token (as described above). If both Token and TokenFile are present, Token takes precedence.
+ // +optional
+ TokenFile string `json:"tokenFile,omitempty"`
+ // Impersonate is the username to act-as.
+ // +optional
+ Impersonate string `json:"act-as,omitempty"`
+ // ImpersonateGroups is the groups to imperonate.
+ // +optional
+ ImpersonateGroups []string `json:"act-as-groups,omitempty"`
+ // ImpersonateUserExtra contains additional information for impersonated user.
+ // +optional
+ ImpersonateUserExtra map[string][]string `json:"act-as-user-extra,omitempty"`
+ // Username is the username for basic authentication to the kubernetes cluster.
+ // +optional
+ Username string `json:"username,omitempty"`
+ // Password is the password for basic authentication to the kubernetes cluster.
+ // +optional
+ Password string `json:"password,omitempty"`
+ // AuthProvider specifies a custom authentication plugin for the kubernetes cluster.
+ // +optional
+ AuthProvider *AuthProviderConfig `json:"auth-provider,omitempty"`
+ // Extensions holds additional information. This is useful for extenders so that reads and writes don't clobber unknown fields
+ // +optional
+ Extensions map[string]runtime.Object `json:"extensions,omitempty"`
+}
+
+// Context is a tuple of references to a cluster (how do I communicate with a kubernetes cluster), a user (how do I identify myself), and a namespace (what subset of resources do I want to work with)
+type Context struct {
+ // LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
+ LocationOfOrigin string
+ // Cluster is the name of the cluster for this context
+ Cluster string `json:"cluster"`
+ // AuthInfo is the name of the authInfo for this context
+ AuthInfo string `json:"user"`
+ // Namespace is the default namespace to use on unspecified requests
+ // +optional
+ Namespace string `json:"namespace,omitempty"`
+ // Extensions holds additional information. This is useful for extenders so that reads and writes don't clobber unknown fields
+ // +optional
+ Extensions map[string]runtime.Object `json:"extensions,omitempty"`
+}
+
+// AuthProviderConfig holds the configuration for a specified auth provider.
+type AuthProviderConfig struct {
+ Name string `json:"name"`
+ // +optional
+ Config map[string]string `json:"config,omitempty"`
+}
+
+// NewConfig is a convenience function that returns a new Config object with non-nil maps
+func NewConfig() *Config {
+ return &Config{
+ Preferences: *NewPreferences(),
+ Clusters: make(map[string]*Cluster),
+ AuthInfos: make(map[string]*AuthInfo),
+ Contexts: make(map[string]*Context),
+ Extensions: make(map[string]runtime.Object),
+ }
+}
+
+// NewContext is a convenience function that returns a new Context
+// object with non-nil maps
+func NewContext() *Context {
+ return &Context{Extensions: make(map[string]runtime.Object)}
+}
+
+// NewCluster is a convenience function that returns a new Cluster
+// object with non-nil maps
+func NewCluster() *Cluster {
+ return &Cluster{Extensions: make(map[string]runtime.Object)}
+}
+
+// NewAuthInfo is a convenience function that returns a new AuthInfo
+// object with non-nil maps
+func NewAuthInfo() *AuthInfo {
+ return &AuthInfo{
+ Extensions: make(map[string]runtime.Object),
+ ImpersonateUserExtra: make(map[string][]string),
+ }
+}
+
+// NewPreferences is a convenience function that returns a new
+// Preferences object with non-nil maps
+func NewPreferences() *Preferences {
+ return &Preferences{Extensions: make(map[string]runtime.Object)}
+}
diff --git a/vendor/k8s.io/client-go/tools/metrics/metrics.go b/vendor/k8s.io/client-go/tools/metrics/metrics.go
new file mode 100644
index 000000000..a01306c65
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/metrics/metrics.go
@@ -0,0 +1,61 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package metrics provides abstractions for registering which metrics
+// to record.
+package metrics
+
+import (
+ "net/url"
+ "sync"
+ "time"
+)
+
+var registerMetrics sync.Once
+
+// LatencyMetric observes client latency partitioned by verb and url.
+type LatencyMetric interface {
+ Observe(verb string, u url.URL, latency time.Duration)
+}
+
+// ResultMetric counts response codes partitioned by method and host.
+type ResultMetric interface {
+ Increment(code string, method string, host string)
+}
+
+var (
+ // RequestLatency is the latency metric that rest clients will update.
+ RequestLatency LatencyMetric = noopLatency{}
+ // RequestResult is the result metric that rest clients will update.
+ RequestResult ResultMetric = noopResult{}
+)
+
+// Register registers metrics for the rest client to use. This can
+// only be called once.
+func Register(lm LatencyMetric, rm ResultMetric) {
+ registerMetrics.Do(func() {
+ RequestLatency = lm
+ RequestResult = rm
+ })
+}
+
+type noopLatency struct{}
+
+func (noopLatency) Observe(string, url.URL, time.Duration) {}
+
+type noopResult struct{}
+
+func (noopResult) Increment(string, string, string) {}
diff --git a/vendor/k8s.io/client-go/tools/record/doc.go b/vendor/k8s.io/client-go/tools/record/doc.go
new file mode 100644
index 000000000..657ddecbc
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package record has all client logic for recording and reporting events.
+package record // import "k8s.io/client-go/tools/record"
diff --git a/vendor/k8s.io/client-go/tools/record/event.go b/vendor/k8s.io/client-go/tools/record/event.go
new file mode 100644
index 000000000..6b2fad409
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/event.go
@@ -0,0 +1,318 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package record
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/pkg/api/v1"
+ "k8s.io/client-go/pkg/api/v1/ref"
+ restclient "k8s.io/client-go/rest"
+
+ "net/http"
+
+ "github.com/golang/glog"
+)
+
+const maxTriesPerEvent = 12
+
+var defaultSleepDuration = 10 * time.Second
+
+const maxQueuedEvents = 1000
+
+// EventSink knows how to store events (client.Client implements it.)
+// EventSink must respect the namespace that will be embedded in 'event'.
+// It is assumed that EventSink will return the same sorts of errors as
+// pkg/client's REST client.
+type EventSink interface {
+ Create(event *v1.Event) (*v1.Event, error)
+ Update(event *v1.Event) (*v1.Event, error)
+ Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
+}
+
+// EventRecorder knows how to record events on behalf of an EventSource.
+type EventRecorder interface {
+ // Event constructs an event from the given information and puts it in the queue for sending.
+ // 'object' is the object this event is about. Event will make a reference-- or you may also
+ // pass a reference to the object directly.
+ // 'type' of this event, and can be one of Normal, Warning. New types could be added in future
+ // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
+ // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
+ // to automate handling of events, so imagine people writing switch statements to handle them.
+ // You want to make that easy.
+ // 'message' is intended to be human readable.
+ //
+ // The resulting event will be created in the same namespace as the reference object.
+ Event(object runtime.Object, eventtype, reason, message string)
+
+ // Eventf is just like Event, but with Sprintf for the message field.
+ Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
+
+ // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
+ PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
+}
+
+// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
+type EventBroadcaster interface {
+ // StartEventWatcher starts sending events received from this EventBroadcaster to the given
+ // event handler function. The return value can be ignored or used to stop recording, if
+ // desired.
+ StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
+
+ // StartRecordingToSink starts sending events received from this EventBroadcaster to the given
+ // sink. The return value can be ignored or used to stop recording, if desired.
+ StartRecordingToSink(sink EventSink) watch.Interface
+
+ // StartLogging starts sending events received from this EventBroadcaster to the given logging
+ // function. The return value can be ignored or used to stop recording, if desired.
+ StartLogging(logf func(format string, args ...interface{})) watch.Interface
+
+ // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
+ // with the event source set to the given event source.
+ NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
+}
+
+// Creates a new event broadcaster.
+func NewBroadcaster() EventBroadcaster {
+ return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
+}
+
+func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
+ return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
+}
+
+type eventBroadcasterImpl struct {
+ *watch.Broadcaster
+ sleepDuration time.Duration
+}
+
+// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
+// The return value can be ignored or used to stop recording, if desired.
+// TODO: make me an object with parameterizable queue length and retry interval
+func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
+ // The default math/rand package functions aren't thread safe, so create a
+ // new Rand object for each StartRecording call.
+ randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
+ eventCorrelator := NewEventCorrelator(clock.RealClock{})
+ return eventBroadcaster.StartEventWatcher(
+ func(event *v1.Event) {
+ recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
+ })
+}
+
+func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
+ // Make a copy before modification, because there could be multiple listeners.
+ // Events are safe to copy like this.
+ eventCopy := *event
+ event = &eventCopy
+ result, err := eventCorrelator.EventCorrelate(event)
+ if err != nil {
+ utilruntime.HandleError(err)
+ }
+ if result.Skip {
+ return
+ }
+ tries := 0
+ for {
+ if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
+ break
+ }
+ tries++
+ if tries >= maxTriesPerEvent {
+ glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
+ break
+ }
+ // Randomize the first sleep so that various clients won't all be
+ // synced up if the master goes down.
+ if tries == 1 {
+ time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
+ } else {
+ time.Sleep(sleepDuration)
+ }
+ }
+}
+
+func isKeyNotFoundError(err error) bool {
+ statusErr, _ := err.(*errors.StatusError)
+
+ if statusErr != nil && statusErr.Status().Code == http.StatusNotFound {
+ return true
+ }
+
+ return false
+}
+
+// recordEvent attempts to write event to a sink. It returns true if the event
+// was successfully recorded or discarded, false if it should be retried.
+// If updateExistingEvent is false, it creates a new event, otherwise it updates
+// existing event.
+func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
+ var newEvent *v1.Event
+ var err error
+ if updateExistingEvent {
+ newEvent, err = sink.Patch(event, patch)
+ }
+ // Update can fail because the event may have been removed and it no longer exists.
+ if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
+ // Making sure that ResourceVersion is empty on creation
+ event.ResourceVersion = ""
+ newEvent, err = sink.Create(event)
+ }
+ if err == nil {
+ // we need to update our event correlator with the server returned state to handle name/resourceversion
+ eventCorrelator.UpdateState(newEvent)
+ return true
+ }
+
+ // If we can't contact the server, then hold everything while we keep trying.
+ // Otherwise, something about the event is malformed and we should abandon it.
+ switch err.(type) {
+ case *restclient.RequestConstructionError:
+ // We will construct the request the same next time, so don't keep trying.
+ glog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
+ return true
+ case *errors.StatusError:
+ if errors.IsAlreadyExists(err) {
+ glog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
+ } else {
+ glog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
+ }
+ return true
+ case *errors.UnexpectedObjectError:
+ // We don't expect this; it implies the server's response didn't match a
+ // known pattern. Go ahead and retry.
+ default:
+ // This case includes actual http transport errors. Go ahead and retry.
+ }
+ glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
+ return false
+}
+
+// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
+// The return value can be ignored or used to stop recording, if desired.
+func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
+ return eventBroadcaster.StartEventWatcher(
+ func(e *v1.Event) {
+ logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
+ })
+}
+
+// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
+// The return value can be ignored or used to stop recording, if desired.
+func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
+ watcher := eventBroadcaster.Watch()
+ go func() {
+ defer utilruntime.HandleCrash()
+ for {
+ watchEvent, open := <-watcher.ResultChan()
+ if !open {
+ return
+ }
+ event, ok := watchEvent.Object.(*v1.Event)
+ if !ok {
+ // This is all local, so there's no reason this should
+ // ever happen.
+ continue
+ }
+ eventHandler(event)
+ }
+ }()
+ return watcher
+}
+
+// NewRecorder returns an EventRecorder that records events with the given event source.
+func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
+ return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
+}
+
+type recorderImpl struct {
+ scheme *runtime.Scheme
+ source v1.EventSource
+ *watch.Broadcaster
+ clock clock.Clock
+}
+
+func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp metav1.Time, eventtype, reason, message string) {
+ ref, err := ref.GetReference(recorder.scheme, object)
+ if err != nil {
+ glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
+ return
+ }
+
+ if !validateEventType(eventtype) {
+ glog.Errorf("Unsupported event type: '%v'", eventtype)
+ return
+ }
+
+ event := recorder.makeEvent(ref, eventtype, reason, message)
+ event.Source = recorder.source
+
+ go func() {
+ // NOTE: events should be a non-blocking operation
+ defer utilruntime.HandleCrash()
+ recorder.Action(watch.Added, event)
+ }()
+}
+
+func validateEventType(eventtype string) bool {
+ switch eventtype {
+ case v1.EventTypeNormal, v1.EventTypeWarning:
+ return true
+ }
+ return false
+}
+
+func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
+ recorder.generateEvent(object, metav1.Now(), eventtype, reason, message)
+}
+
+func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+ recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
+ recorder.generateEvent(object, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *v1.Event {
+ t := metav1.Time{Time: recorder.clock.Now()}
+ namespace := ref.Namespace
+ if namespace == "" {
+ namespace = metav1.NamespaceDefault
+ }
+ return &v1.Event{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
+ Namespace: namespace,
+ },
+ InvolvedObject: *ref,
+ Reason: reason,
+ Message: message,
+ FirstTimestamp: t,
+ LastTimestamp: t,
+ Count: 1,
+ Type: eventtype,
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/record/events_cache.go b/vendor/k8s.io/client-go/tools/record/events_cache.go
new file mode 100644
index 000000000..785ec6477
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/events_cache.go
@@ -0,0 +1,377 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package record
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/groupcache/lru"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/clock"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/pkg/api/v1"
+)
+
+const (
+ maxLruCacheEntries = 4096
+
+ // if we see the same event that varies only by message
+ // more than 10 times in a 10 minute period, aggregate the event
+ defaultAggregateMaxEvents = 10
+ defaultAggregateIntervalInSeconds = 600
+)
+
+// getEventKey builds unique event key based on source, involvedObject, reason, message
+func getEventKey(event *v1.Event) string {
+ return strings.Join([]string{
+ event.Source.Component,
+ event.Source.Host,
+ event.InvolvedObject.Kind,
+ event.InvolvedObject.Namespace,
+ event.InvolvedObject.Name,
+ event.InvolvedObject.FieldPath,
+ string(event.InvolvedObject.UID),
+ event.InvolvedObject.APIVersion,
+ event.Type,
+ event.Reason,
+ event.Message,
+ },
+ "")
+}
+
+// EventFilterFunc is a function that returns true if the event should be skipped
+type EventFilterFunc func(event *v1.Event) bool
+
+// DefaultEventFilterFunc returns false for all incoming events
+func DefaultEventFilterFunc(event *v1.Event) bool {
+ return false
+}
+
+// EventAggregatorKeyFunc is responsible for grouping events for aggregation
+// It returns a tuple of the following:
+// aggregateKey - key the identifies the aggregate group to bucket this event
+// localKey - key that makes this event in the local group
+type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
+
+// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
+func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
+ return strings.Join([]string{
+ event.Source.Component,
+ event.Source.Host,
+ event.InvolvedObject.Kind,
+ event.InvolvedObject.Namespace,
+ event.InvolvedObject.Name,
+ string(event.InvolvedObject.UID),
+ event.InvolvedObject.APIVersion,
+ event.Type,
+ event.Reason,
+ },
+ ""), event.Message
+}
+
+// EventAggregatorMessageFunc is responsible for producing an aggregation message
+type EventAggregatorMessageFunc func(event *v1.Event) string
+
+// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
+func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
+ return "(combined from similar events): " + event.Message
+}
+
+// EventAggregator identifies similar events and aggregates them into a single event
+type EventAggregator struct {
+ sync.RWMutex
+
+ // The cache that manages aggregation state
+ cache *lru.Cache
+
+ // The function that groups events for aggregation
+ keyFunc EventAggregatorKeyFunc
+
+ // The function that generates a message for an aggregate event
+ messageFunc EventAggregatorMessageFunc
+
+ // The maximum number of events in the specified interval before aggregation occurs
+ maxEvents uint
+
+ // The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
+ maxIntervalInSeconds uint
+
+ // clock is used to allow for testing over a time interval
+ clock clock.Clock
+}
+
+// NewEventAggregator returns a new instance of an EventAggregator
+func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
+ maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
+ return &EventAggregator{
+ cache: lru.New(lruCacheSize),
+ keyFunc: keyFunc,
+ messageFunc: messageFunc,
+ maxEvents: uint(maxEvents),
+ maxIntervalInSeconds: uint(maxIntervalInSeconds),
+ clock: clock,
+ }
+}
+
+// aggregateRecord holds data used to perform aggregation decisions
+type aggregateRecord struct {
+ // we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
+ // if the size of this set exceeds the max, we know we need to aggregate
+ localKeys sets.String
+ // The last time at which the aggregate was recorded
+ lastTimestamp metav1.Time
+}
+
+// EventAggregate checks if a similar event has been seen according to the
+// aggregation configuration (max events, max interval, etc) and returns:
+//
+// - The (potentially modified) event that should be created
+// - The cache key for the event, for correlation purposes. This will be set to
+// the full key for normal events, and to the result of
+// EventAggregatorMessageFunc for aggregate events.
+func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
+ now := metav1.NewTime(e.clock.Now())
+ var record aggregateRecord
+ // eventKey is the full cache key for this event
+ eventKey := getEventKey(newEvent)
+ // aggregateKey is for the aggregate event, if one is needed.
+ aggregateKey, localKey := e.keyFunc(newEvent)
+
+ // Do we have a record of similar events in our cache?
+ e.Lock()
+ defer e.Unlock()
+ value, found := e.cache.Get(aggregateKey)
+ if found {
+ record = value.(aggregateRecord)
+ }
+
+ // Is the previous record too old? If so, make a fresh one. Note: if we didn't
+ // find a similar record, its lastTimestamp will be the zero value, so we
+ // create a new one in that case.
+ maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
+ interval := now.Time.Sub(record.lastTimestamp.Time)
+ if interval > maxInterval {
+ record = aggregateRecord{localKeys: sets.NewString()}
+ }
+
+ // Write the new event into the aggregation record and put it on the cache
+ record.localKeys.Insert(localKey)
+ record.lastTimestamp = now
+ e.cache.Add(aggregateKey, record)
+
+ // If we are not yet over the threshold for unique events, don't correlate them
+ if uint(record.localKeys.Len()) < e.maxEvents {
+ return newEvent, eventKey
+ }
+
+ // do not grow our local key set any larger than max
+ record.localKeys.PopAny()
+
+ // create a new aggregate event, and return the aggregateKey as the cache key
+ // (so that it can be overwritten.)
+ eventCopy := &v1.Event{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
+ Namespace: newEvent.Namespace,
+ },
+ Count: 1,
+ FirstTimestamp: now,
+ InvolvedObject: newEvent.InvolvedObject,
+ LastTimestamp: now,
+ Message: e.messageFunc(newEvent),
+ Type: newEvent.Type,
+ Reason: newEvent.Reason,
+ Source: newEvent.Source,
+ }
+ return eventCopy, aggregateKey
+}
+
+// eventLog records data about when an event was observed
+type eventLog struct {
+ // The number of times the event has occurred since first occurrence.
+ count uint
+
+ // The time at which the event was first recorded.
+ firstTimestamp metav1.Time
+
+ // The unique name of the first occurrence of this event
+ name string
+
+ // Resource version returned from previous interaction with server
+ resourceVersion string
+}
+
+// eventLogger logs occurrences of an event
+type eventLogger struct {
+ sync.RWMutex
+ cache *lru.Cache
+ clock clock.Clock
+}
+
+// newEventLogger observes events and counts their frequencies
+func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
+ return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
+}
+
+// eventObserve records an event, or updates an existing one if key is a cache hit
+func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
+ var (
+ patch []byte
+ err error
+ )
+ eventCopy := *newEvent
+ event := &eventCopy
+
+ e.Lock()
+ defer e.Unlock()
+
+ // Check if there is an existing event we should update
+ lastObservation := e.lastEventObservationFromCache(key)
+
+ // If we found a result, prepare a patch
+ if lastObservation.count > 0 {
+ // update the event based on the last observation so patch will work as desired
+ event.Name = lastObservation.name
+ event.ResourceVersion = lastObservation.resourceVersion
+ event.FirstTimestamp = lastObservation.firstTimestamp
+ event.Count = int32(lastObservation.count) + 1
+
+ eventCopy2 := *event
+ eventCopy2.Count = 0
+ eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
+ eventCopy2.Message = ""
+
+ newData, _ := json.Marshal(event)
+ oldData, _ := json.Marshal(eventCopy2)
+ patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
+ }
+
+ // record our new observation
+ e.cache.Add(
+ key,
+ eventLog{
+ count: uint(event.Count),
+ firstTimestamp: event.FirstTimestamp,
+ name: event.Name,
+ resourceVersion: event.ResourceVersion,
+ },
+ )
+ return event, patch, err
+}
+
+// updateState updates its internal tracking information based on latest server state
+func (e *eventLogger) updateState(event *v1.Event) {
+ key := getEventKey(event)
+ e.Lock()
+ defer e.Unlock()
+ // record our new observation
+ e.cache.Add(
+ key,
+ eventLog{
+ count: uint(event.Count),
+ firstTimestamp: event.FirstTimestamp,
+ name: event.Name,
+ resourceVersion: event.ResourceVersion,
+ },
+ )
+}
+
+// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
+func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
+ value, ok := e.cache.Get(key)
+ if ok {
+ observationValue, ok := value.(eventLog)
+ if ok {
+ return observationValue
+ }
+ }
+ return eventLog{}
+}
+
+// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
+// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
+// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
+// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
+type EventCorrelator struct {
+ // the function to filter the event
+ filterFunc EventFilterFunc
+ // the object that performs event aggregation
+ aggregator *EventAggregator
+ // the object that observes events as they come through
+ logger *eventLogger
+}
+
+// EventCorrelateResult is the result of a Correlate
+type EventCorrelateResult struct {
+ // the event after correlation
+ Event *v1.Event
+ // if provided, perform a strategic patch when updating the record on the server
+ Patch []byte
+ // if true, do no further processing of the event
+ Skip bool
+}
+
+// NewEventCorrelator returns an EventCorrelator configured with default values.
+//
+// The EventCorrelator is responsible for event filtering, aggregating, and counting
+// prior to interacting with the API server to record the event.
+//
+// The default behavior is as follows:
+// * No events are filtered from being recorded
+// * Aggregation is performed if a similar event is recorded 10 times in a
+// in a 10 minute rolling interval. A similar event is an event that varies only by
+// the Event.Message field. Rather than recording the precise event, aggregation
+// will create a new event whose message reports that it has combined events with
+// the same reason.
+// * Events are incrementally counted if the exact same event is encountered multiple
+// times.
+func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
+ cacheSize := maxLruCacheEntries
+ return &EventCorrelator{
+ filterFunc: DefaultEventFilterFunc,
+ aggregator: NewEventAggregator(
+ cacheSize,
+ EventAggregatorByReasonFunc,
+ EventAggregatorByReasonMessageFunc,
+ defaultAggregateMaxEvents,
+ defaultAggregateIntervalInSeconds,
+ clock),
+
+ logger: newEventLogger(cacheSize, clock),
+ }
+}
+
+// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
+func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
+ if c.filterFunc(newEvent) {
+ return &EventCorrelateResult{Skip: true}, nil
+ }
+ aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
+ observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
+ return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
+}
+
+// UpdateState based on the latest observed state from server
+func (c *EventCorrelator) UpdateState(event *v1.Event) {
+ c.logger.updateState(event)
+}
diff --git a/vendor/k8s.io/client-go/tools/record/fake.go b/vendor/k8s.io/client-go/tools/record/fake.go
new file mode 100644
index 000000000..c0e8eedbb
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/record/fake.go
@@ -0,0 +1,54 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package record
+
+import (
+ "fmt"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// FakeRecorder is used as a fake during tests. It is thread safe. It is usable
+// when created manually and not by NewFakeRecorder, however all events may be
+// thrown away in this case.
+type FakeRecorder struct {
+ Events chan string
+}
+
+func (f *FakeRecorder) Event(object runtime.Object, eventtype, reason, message string) {
+ if f.Events != nil {
+ f.Events <- fmt.Sprintf("%s %s %s", eventtype, reason, message)
+ }
+}
+
+func (f *FakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+ if f.Events != nil {
+ f.Events <- fmt.Sprintf(eventtype+" "+reason+" "+messageFmt, args...)
+ }
+}
+
+func (f *FakeRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
+}
+
+// NewFakeRecorder creates new fake event recorder with event channel with
+// buffer of given size.
+func NewFakeRecorder(bufferSize int) *FakeRecorder {
+ return &FakeRecorder{
+ Events: make(chan string, bufferSize),
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/doc.go b/vendor/k8s.io/client-go/tools/remotecommand/doc.go
new file mode 100644
index 000000000..ac06a9cd3
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/doc.go
@@ -0,0 +1,20 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package remotecommand adds support for executing commands in containers,
+// with support for separate stdin, stdout, and stderr streams, as well as
+// TTY.
+package remotecommand // import "k8s.io/client-go/tools/remotecommand"
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/errorstream.go b/vendor/k8s.io/client-go/tools/remotecommand/errorstream.go
new file mode 100644
index 000000000..360276b65
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/errorstream.go
@@ -0,0 +1,55 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+
+ "k8s.io/apimachinery/pkg/util/runtime"
+)
+
+// errorStreamDecoder interprets the data on the error channel and creates a go error object from it.
+type errorStreamDecoder interface {
+ decode(message []byte) error
+}
+
+// watchErrorStream watches the errorStream for remote command error data,
+// decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote
+// command exited successfully) to the returned error channel, and closes it.
+// This function returns immediately.
+func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error {
+ errorChan := make(chan error)
+
+ go func() {
+ defer runtime.HandleCrash()
+
+ message, err := ioutil.ReadAll(errorStream)
+ switch {
+ case err != nil && err != io.EOF:
+ errorChan <- fmt.Errorf("error reading from error stream: %s", err)
+ case len(message) > 0:
+ errorChan <- d.decode(message)
+ default:
+ errorChan <- nil
+ }
+ close(errorChan)
+ }()
+
+ return errorChan
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go b/vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
new file mode 100644
index 000000000..a90fab1fe
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
@@ -0,0 +1,178 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/util/httpstream"
+ "k8s.io/apimachinery/pkg/util/httpstream/spdy"
+ "k8s.io/apimachinery/pkg/util/remotecommand"
+ restclient "k8s.io/client-go/rest"
+ "k8s.io/client-go/transport"
+)
+
+// StreamOptions holds information pertaining to the current streaming session: supported stream
+// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
+// support terminal resizing.
+type StreamOptions struct {
+ SupportedProtocols []string
+ Stdin io.Reader
+ Stdout io.Writer
+ Stderr io.Writer
+ Tty bool
+ TerminalSizeQueue TerminalSizeQueue
+}
+
+// Executor is an interface for transporting shell-style streams.
+type Executor interface {
+ // Stream initiates the transport of the standard shell streams. It will transport any
+ // non-nil stream to a remote system, and return an error if a problem occurs. If tty
+ // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
+ // stdout stream).
+ Stream(options StreamOptions) error
+}
+
+// StreamExecutor supports the ability to dial an httpstream connection and the ability to
+// run a command line stream protocol over that dialer.
+type StreamExecutor interface {
+ Executor
+ httpstream.Dialer
+}
+
+// streamExecutor handles transporting standard shell streams over an httpstream connection.
+type streamExecutor struct {
+ upgrader httpstream.UpgradeRoundTripper
+ transport http.RoundTripper
+
+ method string
+ url *url.URL
+}
+
+// NewExecutor connects to the provided server and upgrades the connection to
+// multiplexed bidirectional streams. The current implementation uses SPDY,
+// but this could be replaced with HTTP/2 once it's available, or something else.
+// TODO: the common code between this and portforward could be abstracted.
+func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) {
+ tlsConfig, err := restclient.TLSConfigFor(config)
+ if err != nil {
+ return nil, err
+ }
+
+ upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
+ wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
+ if err != nil {
+ return nil, err
+ }
+
+ return &streamExecutor{
+ upgrader: upgradeRoundTripper,
+ transport: wrapper,
+ method: method,
+ url: url,
+ }, nil
+}
+
+// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
+// streams. This method takes a stream upgrader and an optional function that is invoked
+// to wrap the round tripper. This method may be used by clients that are lower level than
+// Kubernetes clients or need to provide their own upgrade round tripper.
+func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
+ rt := http.RoundTripper(upgrader)
+ if fn != nil {
+ rt = fn(rt)
+ }
+ return &streamExecutor{
+ upgrader: upgrader,
+ transport: rt,
+ method: method,
+ url: url,
+ }, nil
+}
+
+// Dial opens a connection to a remote server and attempts to negotiate a SPDY
+// connection. Upon success, it returns the connection and the protocol
+// selected by the server.
+func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) {
+ rt := transport.DebugWrappers(e.transport)
+
+ // TODO the client probably shouldn't be created here, as it doesn't allow
+ // flexibility to allow callers to configure it.
+ client := &http.Client{Transport: rt}
+
+ req, err := http.NewRequest(e.method, e.url.String(), nil)
+ if err != nil {
+ return nil, "", fmt.Errorf("error creating request: %v", err)
+ }
+ for i := range protocols {
+ req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
+ }
+
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, "", fmt.Errorf("error sending request: %v", err)
+ }
+ defer resp.Body.Close()
+
+ conn, err := e.upgrader.NewConnection(resp)
+ if err != nil {
+ return nil, "", err
+ }
+
+ return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
+}
+
+type streamCreator interface {
+ CreateStream(headers http.Header) (httpstream.Stream, error)
+}
+
+type streamProtocolHandler interface {
+ stream(conn streamCreator) error
+}
+
+// Stream opens a protocol streamer to the server and streams until a client closes
+// the connection or the server disconnects.
+func (e *streamExecutor) Stream(options StreamOptions) error {
+ conn, protocol, err := e.Dial(options.SupportedProtocols...)
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+
+ var streamer streamProtocolHandler
+
+ switch protocol {
+ case remotecommand.StreamProtocolV4Name:
+ streamer = newStreamProtocolV4(options)
+ case remotecommand.StreamProtocolV3Name:
+ streamer = newStreamProtocolV3(options)
+ case remotecommand.StreamProtocolV2Name:
+ streamer = newStreamProtocolV2(options)
+ case "":
+ glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
+ fallthrough
+ case remotecommand.StreamProtocolV1Name:
+ streamer = newStreamProtocolV1(options)
+ }
+
+ return streamer.stream(conn)
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/resize.go b/vendor/k8s.io/client-go/tools/remotecommand/resize.go
new file mode 100644
index 000000000..c838f21ba
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/resize.go
@@ -0,0 +1,33 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+// TerminalSize and TerminalSizeQueue was a part of k8s.io/kubernetes/pkg/util/term
+// and were moved in order to decouple client from other term dependencies
+
+// TerminalSize represents the width and height of a terminal.
+type TerminalSize struct {
+ Width uint16
+ Height uint16
+}
+
+// TerminalSizeQueue is capable of returning terminal resize events as they occur.
+type TerminalSizeQueue interface {
+ // Next returns the new terminal size after the terminal has been resized. It returns nil when
+ // monitoring has been stopped.
+ Next() *TerminalSize
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/v1.go b/vendor/k8s.io/client-go/tools/remotecommand/v1.go
new file mode 100644
index 000000000..1db917c0b
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/v1.go
@@ -0,0 +1,160 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+
+ "github.com/golang/glog"
+ "k8s.io/apimachinery/pkg/util/httpstream"
+ "k8s.io/client-go/pkg/api/v1"
+)
+
+// streamProtocolV1 implements the first version of the streaming exec & attach
+// protocol. This version has some bugs, such as not being able to detect when
+// non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
+// http://issues.k8s.io/13395 for more details.
+type streamProtocolV1 struct {
+ StreamOptions
+
+ errorStream httpstream.Stream
+ remoteStdin httpstream.Stream
+ remoteStdout httpstream.Stream
+ remoteStderr httpstream.Stream
+}
+
+var _ streamProtocolHandler = &streamProtocolV1{}
+
+func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
+ return &streamProtocolV1{
+ StreamOptions: options,
+ }
+}
+
+func (p *streamProtocolV1) stream(conn streamCreator) error {
+ doneChan := make(chan struct{}, 2)
+ errorChan := make(chan error)
+
+ cp := func(s string, dst io.Writer, src io.Reader) {
+ glog.V(6).Infof("Copying %s", s)
+ defer glog.V(6).Infof("Done copying %s", s)
+ if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
+ glog.Errorf("Error copying %s: %v", s, err)
+ }
+ if s == v1.StreamTypeStdout || s == v1.StreamTypeStderr {
+ doneChan <- struct{}{}
+ }
+ }
+
+ // set up all the streams first
+ var err error
+ headers := http.Header{}
+ headers.Set(v1.StreamType, v1.StreamTypeError)
+ p.errorStream, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ defer p.errorStream.Reset()
+
+ // Create all the streams first, then start the copy goroutines. The server doesn't start its copy
+ // goroutines until it's received all of the streams. If the client creates the stdin stream and
+ // immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
+ // spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
+ // getting processed because the server hasn't started its copying, and it won't do that until it
+ // gets all the streams. By creating all the streams first, we ensure that the server is ready to
+ // process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
+ if p.Stdin != nil {
+ headers.Set(v1.StreamType, v1.StreamTypeStdin)
+ p.remoteStdin, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ defer p.remoteStdin.Reset()
+ }
+
+ if p.Stdout != nil {
+ headers.Set(v1.StreamType, v1.StreamTypeStdout)
+ p.remoteStdout, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ defer p.remoteStdout.Reset()
+ }
+
+ if p.Stderr != nil && !p.Tty {
+ headers.Set(v1.StreamType, v1.StreamTypeStderr)
+ p.remoteStderr, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ defer p.remoteStderr.Reset()
+ }
+
+ // now that all the streams have been created, proceed with reading & copying
+
+ // always read from errorStream
+ go func() {
+ message, err := ioutil.ReadAll(p.errorStream)
+ if err != nil && err != io.EOF {
+ errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
+ return
+ }
+ if len(message) > 0 {
+ errorChan <- fmt.Errorf("Error executing remote command: %s", message)
+ return
+ }
+ }()
+
+ if p.Stdin != nil {
+ // TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
+ // because stdin is not closed until the process exits. If we try to call
+ // stdin.Close(), it returns no error but doesn't unblock the copy. It will
+ // exit when the process exits, instead.
+ go cp(v1.StreamTypeStdin, p.remoteStdin, p.Stdin)
+ }
+
+ waitCount := 0
+ completedStreams := 0
+
+ if p.Stdout != nil {
+ waitCount++
+ go cp(v1.StreamTypeStdout, p.Stdout, p.remoteStdout)
+ }
+
+ if p.Stderr != nil && !p.Tty {
+ waitCount++
+ go cp(v1.StreamTypeStderr, p.Stderr, p.remoteStderr)
+ }
+
+Loop:
+ for {
+ select {
+ case <-doneChan:
+ completedStreams++
+ if completedStreams == waitCount {
+ break Loop
+ }
+ case err := <-errorChan:
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/v2.go b/vendor/k8s.io/client-go/tools/remotecommand/v2.go
new file mode 100644
index 000000000..95346a439
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/v2.go
@@ -0,0 +1,195 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "sync"
+
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/client-go/pkg/api/v1"
+)
+
+// streamProtocolV2 implements version 2 of the streaming protocol for attach
+// and exec. The original streaming protocol was metav1. As a result, this
+// version is referred to as version 2, even though it is the first actual
+// numbered version.
+type streamProtocolV2 struct {
+ StreamOptions
+
+ errorStream io.Reader
+ remoteStdin io.ReadWriteCloser
+ remoteStdout io.Reader
+ remoteStderr io.Reader
+}
+
+var _ streamProtocolHandler = &streamProtocolV2{}
+
+func newStreamProtocolV2(options StreamOptions) streamProtocolHandler {
+ return &streamProtocolV2{
+ StreamOptions: options,
+ }
+}
+
+func (p *streamProtocolV2) createStreams(conn streamCreator) error {
+ var err error
+ headers := http.Header{}
+
+ // set up error stream
+ headers.Set(v1.StreamType, v1.StreamTypeError)
+ p.errorStream, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+
+ // set up stdin stream
+ if p.Stdin != nil {
+ headers.Set(v1.StreamType, v1.StreamTypeStdin)
+ p.remoteStdin, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ }
+
+ // set up stdout stream
+ if p.Stdout != nil {
+ headers.Set(v1.StreamType, v1.StreamTypeStdout)
+ p.remoteStdout, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ }
+
+ // set up stderr stream
+ if p.Stderr != nil && !p.Tty {
+ headers.Set(v1.StreamType, v1.StreamTypeStderr)
+ p.remoteStderr, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (p *streamProtocolV2) copyStdin() {
+ if p.Stdin != nil {
+ var once sync.Once
+
+ // copy from client's stdin to container's stdin
+ go func() {
+ defer runtime.HandleCrash()
+
+ // if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
+ // we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
+ // the executed command will remain running.
+ defer once.Do(func() { p.remoteStdin.Close() })
+
+ if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil {
+ runtime.HandleError(err)
+ }
+ }()
+
+ // read from remoteStdin until the stream is closed. this is essential to
+ // be able to exit interactive sessions cleanly and not leak goroutines or
+ // hang the client's terminal.
+ //
+ // TODO we aren't using go-dockerclient any more; revisit this to determine if it's still
+ // required by engine-api.
+ //
+ // go-dockerclient's current hijack implementation
+ // (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564)
+ // waits for all three streams (stdin/stdout/stderr) to finish copying
+ // before returning. When hijack finishes copying stdout/stderr, it calls
+ // Close() on its side of remoteStdin, which allows this copy to complete.
+ // When that happens, we must Close() on our side of remoteStdin, to
+ // allow the copy in hijack to complete, and hijack to return.
+ go func() {
+ defer runtime.HandleCrash()
+ defer once.Do(func() { p.remoteStdin.Close() })
+
+ // this "copy" doesn't actually read anything - it's just here to wait for
+ // the server to close remoteStdin.
+ if _, err := io.Copy(ioutil.Discard, p.remoteStdin); err != nil {
+ runtime.HandleError(err)
+ }
+ }()
+ }
+}
+
+func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
+ if p.Stdout == nil {
+ return
+ }
+
+ wg.Add(1)
+ go func() {
+ defer runtime.HandleCrash()
+ defer wg.Done()
+
+ if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
+ runtime.HandleError(err)
+ }
+ }()
+}
+
+func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
+ if p.Stderr == nil || p.Tty {
+ return
+ }
+
+ wg.Add(1)
+ go func() {
+ defer runtime.HandleCrash()
+ defer wg.Done()
+
+ if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
+ runtime.HandleError(err)
+ }
+ }()
+}
+
+func (p *streamProtocolV2) stream(conn streamCreator) error {
+ if err := p.createStreams(conn); err != nil {
+ return err
+ }
+
+ // now that all the streams have been created, proceed with reading & copying
+
+ errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
+
+ p.copyStdin()
+
+ var wg sync.WaitGroup
+ p.copyStdout(&wg)
+ p.copyStderr(&wg)
+
+ // we're waiting for stdout/stderr to finish copying
+ wg.Wait()
+
+ // waits for errorStream to finish reading with an error or nil
+ return <-errorChan
+}
+
+// errorDecoderV2 interprets the error channel data as plain text.
+type errorDecoderV2 struct{}
+
+func (d *errorDecoderV2) decode(message []byte) error {
+ return fmt.Errorf("error executing remote command: %s", message)
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/v3.go b/vendor/k8s.io/client-go/tools/remotecommand/v3.go
new file mode 100644
index 000000000..03b9e2a68
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/v3.go
@@ -0,0 +1,111 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "sync"
+
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/client-go/pkg/api/v1"
+)
+
+// streamProtocolV3 implements version 3 of the streaming protocol for attach
+// and exec. This version adds support for resizing the container's terminal.
+type streamProtocolV3 struct {
+ *streamProtocolV2
+
+ resizeStream io.Writer
+}
+
+var _ streamProtocolHandler = &streamProtocolV3{}
+
+func newStreamProtocolV3(options StreamOptions) streamProtocolHandler {
+ return &streamProtocolV3{
+ streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2),
+ }
+}
+
+func (p *streamProtocolV3) createStreams(conn streamCreator) error {
+ // set up the streams from v2
+ if err := p.streamProtocolV2.createStreams(conn); err != nil {
+ return err
+ }
+
+ // set up resize stream
+ if p.Tty {
+ headers := http.Header{}
+ headers.Set(v1.StreamType, v1.StreamTypeResize)
+ var err error
+ p.resizeStream, err = conn.CreateStream(headers)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (p *streamProtocolV3) handleResizes() {
+ if p.resizeStream == nil || p.TerminalSizeQueue == nil {
+ return
+ }
+ go func() {
+ defer runtime.HandleCrash()
+
+ encoder := json.NewEncoder(p.resizeStream)
+ for {
+ size := p.TerminalSizeQueue.Next()
+ if size == nil {
+ return
+ }
+ if err := encoder.Encode(&size); err != nil {
+ runtime.HandleError(err)
+ }
+ }
+ }()
+}
+
+func (p *streamProtocolV3) stream(conn streamCreator) error {
+ if err := p.createStreams(conn); err != nil {
+ return err
+ }
+
+ // now that all the streams have been created, proceed with reading & copying
+
+ errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
+
+ p.handleResizes()
+
+ p.copyStdin()
+
+ var wg sync.WaitGroup
+ p.copyStdout(&wg)
+ p.copyStderr(&wg)
+
+ // we're waiting for stdout/stderr to finish copying
+ wg.Wait()
+
+ // waits for errorStream to finish reading with an error or nil
+ return <-errorChan
+}
+
+type errorDecoderV3 struct {
+ errorDecoderV2
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/v4.go b/vendor/k8s.io/client-go/tools/remotecommand/v4.go
new file mode 100644
index 000000000..69ca934a0
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/remotecommand/v4.go
@@ -0,0 +1,119 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strconv"
+ "sync"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/remotecommand"
+ "k8s.io/client-go/util/exec"
+)
+
+// streamProtocolV4 implements version 4 of the streaming protocol for attach
+// and exec. This version adds support for exit codes on the error stream through
+// the use of metav1.Status instead of plain text messages.
+type streamProtocolV4 struct {
+ *streamProtocolV3
+}
+
+var _ streamProtocolHandler = &streamProtocolV4{}
+
+func newStreamProtocolV4(options StreamOptions) streamProtocolHandler {
+ return &streamProtocolV4{
+ streamProtocolV3: newStreamProtocolV3(options).(*streamProtocolV3),
+ }
+}
+
+func (p *streamProtocolV4) createStreams(conn streamCreator) error {
+ return p.streamProtocolV3.createStreams(conn)
+}
+
+func (p *streamProtocolV4) handleResizes() {
+ p.streamProtocolV3.handleResizes()
+}
+
+func (p *streamProtocolV4) stream(conn streamCreator) error {
+ if err := p.createStreams(conn); err != nil {
+ return err
+ }
+
+ // now that all the streams have been created, proceed with reading & copying
+
+ errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})
+
+ p.handleResizes()
+
+ p.copyStdin()
+
+ var wg sync.WaitGroup
+ p.copyStdout(&wg)
+ p.copyStderr(&wg)
+
+ // we're waiting for stdout/stderr to finish copying
+ wg.Wait()
+
+ // waits for errorStream to finish reading with an error or nil
+ return <-errorChan
+}
+
+// errorDecoderV4 interprets the json-marshaled metav1.Status on the error channel
+// and creates an exec.ExitError from it.
+type errorDecoderV4 struct{}
+
+func (d *errorDecoderV4) decode(message []byte) error {
+ status := metav1.Status{}
+ err := json.Unmarshal(message, &status)
+ if err != nil {
+ return fmt.Errorf("error stream protocol error: %v in %q", err, string(message))
+ }
+ switch status.Status {
+ case metav1.StatusSuccess:
+ return nil
+ case metav1.StatusFailure:
+ if status.Reason == remotecommand.NonZeroExitCodeReason {
+ if status.Details == nil {
+ return errors.New("error stream protocol error: details must be set")
+ }
+ for i := range status.Details.Causes {
+ c := &status.Details.Causes[i]
+ if c.Type != remotecommand.ExitCodeCauseType {
+ continue
+ }
+
+ rc, err := strconv.ParseUint(c.Message, 10, 8)
+ if err != nil {
+ return fmt.Errorf("error stream protocol error: invalid exit code value %q", c.Message)
+ }
+ return exec.CodeExitError{
+ Err: fmt.Errorf("command terminated with exit code %d", rc),
+ Code: int(rc),
+ }
+ }
+
+ return fmt.Errorf("error stream protocol error: no %s cause given", remotecommand.ExitCodeCauseType)
+ }
+ default:
+ return errors.New("error stream protocol error: unknown error")
+ }
+
+ return fmt.Errorf(status.Message)
+}