summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools
diff options
context:
space:
mode:
authorDaniel J Walsh <dwalsh@redhat.com>2018-03-26 18:26:55 -0400
committerAtomic Bot <atomic-devel@projectatomic.io>2018-03-27 18:09:12 +0000
commitaf64e10400f8533a0c48ecdf5ab9b7fbf329e14e (patch)
tree59160e3841b440dd35189c724bbb4375a7be173b /vendor/k8s.io/client-go/tools
parent26d7e3c7b85e28c4e42998c90fdcc14079f13eef (diff)
downloadpodman-af64e10400f8533a0c48ecdf5ab9b7fbf329e14e.tar.gz
podman-af64e10400f8533a0c48ecdf5ab9b7fbf329e14e.tar.bz2
podman-af64e10400f8533a0c48ecdf5ab9b7fbf329e14e.zip
Vendor in lots of kubernetes stuff to shrink image size
Signed-off-by: Daniel J Walsh <dwalsh@redhat.com> Closes: #554 Approved by: mheon
Diffstat (limited to 'vendor/k8s.io/client-go/tools')
-rw-r--r--vendor/k8s.io/client-go/tools/cache/controller.go5
-rw-r--r--vendor/k8s.io/client-go/tools/cache/delta_fifo.go4
-rw-r--r--vendor/k8s.io/client-go/tools/cache/fifo.go4
-rw-r--r--vendor/k8s.io/client-go/tools/cache/heap.go323
-rw-r--r--vendor/k8s.io/client-go/tools/cache/listwatch.go32
-rw-r--r--vendor/k8s.io/client-go/tools/cache/mutation_cache.go2
-rw-r--r--vendor/k8s.io/client-go/tools/cache/mutation_detector.go34
-rw-r--r--vendor/k8s.io/client-go/tools/cache/reflector.go70
-rw-r--r--vendor/k8s.io/client-go/tools/cache/reflector_metrics.go119
-rw-r--r--vendor/k8s.io/client-go/tools/cache/shared_informer.go221
-rw-r--r--vendor/k8s.io/client-go/tools/cache/thread_safe_store.go10
-rw-r--r--vendor/k8s.io/client-go/tools/clientcmd/api/doc.go18
-rw-r--r--vendor/k8s.io/client-go/tools/clientcmd/api/types.go1
-rw-r--r--vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go270
-rw-r--r--vendor/k8s.io/client-go/tools/pager/pager.go118
-rw-r--r--vendor/k8s.io/client-go/tools/record/event.go4
-rw-r--r--vendor/k8s.io/client-go/tools/record/events_cache.go100
-rw-r--r--vendor/k8s.io/client-go/tools/reference/ref.go122
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go132
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/v1.go2
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/v2.go2
-rw-r--r--vendor/k8s.io/client-go/tools/remotecommand/v3.go2
22 files changed, 1345 insertions, 250 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/controller.go b/vendor/k8s.io/client-go/tools/cache/controller.go
index 2c97b8658..e7b98befa 100644
--- a/vendor/k8s.io/client-go/tools/cache/controller.go
+++ b/vendor/k8s.io/client-go/tools/cache/controller.go
@@ -116,7 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflector = r
c.reflectorMutex.Unlock()
- r.RunUntil(stopCh)
+ var wg wait.Group
+ defer wg.Wait()
+
+ wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
diff --git a/vendor/k8s.io/client-go/tools/cache/delta_fifo.go b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
index a71db6048..f06d1c5b1 100644
--- a/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
+++ b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
@@ -539,6 +539,10 @@ func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
+ if f.knownObjects == nil {
+ return nil
+ }
+
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
diff --git a/vendor/k8s.io/client-go/tools/cache/fifo.go b/vendor/k8s.io/client-go/tools/cache/fifo.go
index 3f6e2a948..e05c01ee2 100644
--- a/vendor/k8s.io/client-go/tools/cache/fifo.go
+++ b/vendor/k8s.io/client-go/tools/cache/fifo.go
@@ -59,7 +59,7 @@ type Queue interface {
// has since been added.
AddIfNotPresent(interface{}) error
- // Return true if the first batch of items has been popped
+ // HasSynced returns true if the first batch of items has been popped
HasSynced() bool
// Close queue
@@ -169,7 +169,7 @@ func (f *FIFO) AddIfNotPresent(obj interface{}) error {
return nil
}
-// addIfNotPresent assumes the fifo lock is already held and adds the the provided
+// addIfNotPresent assumes the fifo lock is already held and adds the provided
// item to the queue under id if it does not already exist.
func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
f.populated = true
diff --git a/vendor/k8s.io/client-go/tools/cache/heap.go b/vendor/k8s.io/client-go/tools/cache/heap.go
new file mode 100644
index 000000000..78e492455
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/heap.go
@@ -0,0 +1,323 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// This file implements a heap data structure.
+
+package cache
+
+import (
+ "container/heap"
+ "fmt"
+ "sync"
+)
+
+const (
+ closedMsg = "heap is closed"
+)
+
+type LessFunc func(interface{}, interface{}) bool
+type heapItem struct {
+ obj interface{} // The object which is stored in the heap.
+ index int // The index of the object's key in the Heap.queue.
+}
+
+type itemKeyValue struct {
+ key string
+ obj interface{}
+}
+
+// heapData is an internal struct that implements the standard heap interface
+// and keeps the data stored in the heap.
+type heapData struct {
+ // items is a map from key of the objects to the objects and their index.
+ // We depend on the property that items in the map are in the queue and vice versa.
+ items map[string]*heapItem
+ // queue implements a heap data structure and keeps the order of elements
+ // according to the heap invariant. The queue keeps the keys of objects stored
+ // in "items".
+ queue []string
+
+ // keyFunc is used to make the key used for queued item insertion and retrieval, and
+ // should be deterministic.
+ keyFunc KeyFunc
+ // lessFunc is used to compare two objects in the heap.
+ lessFunc LessFunc
+}
+
+var (
+ _ = heap.Interface(&heapData{}) // heapData is a standard heap
+)
+
+// Less compares two objects and returns true if the first one should go
+// in front of the second one in the heap.
+func (h *heapData) Less(i, j int) bool {
+ if i > len(h.queue) || j > len(h.queue) {
+ return false
+ }
+ itemi, ok := h.items[h.queue[i]]
+ if !ok {
+ return false
+ }
+ itemj, ok := h.items[h.queue[j]]
+ if !ok {
+ return false
+ }
+ return h.lessFunc(itemi.obj, itemj.obj)
+}
+
+// Len returns the number of items in the Heap.
+func (h *heapData) Len() int { return len(h.queue) }
+
+// Swap implements swapping of two elements in the heap. This is a part of standard
+// heap interface and should never be called directly.
+func (h *heapData) Swap(i, j int) {
+ h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
+ item := h.items[h.queue[i]]
+ item.index = i
+ item = h.items[h.queue[j]]
+ item.index = j
+}
+
+// Push is supposed to be called by heap.Push only.
+func (h *heapData) Push(kv interface{}) {
+ keyValue := kv.(*itemKeyValue)
+ n := len(h.queue)
+ h.items[keyValue.key] = &heapItem{keyValue.obj, n}
+ h.queue = append(h.queue, keyValue.key)
+}
+
+// Pop is supposed to be called by heap.Pop only.
+func (h *heapData) Pop() interface{} {
+ key := h.queue[len(h.queue)-1]
+ h.queue = h.queue[0 : len(h.queue)-1]
+ item, ok := h.items[key]
+ if !ok {
+ // This is an error
+ return nil
+ }
+ delete(h.items, key)
+ return item.obj
+}
+
+// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
+// It can be used to implement priority queues and similar data structures.
+type Heap struct {
+ lock sync.RWMutex
+ cond sync.Cond
+
+ // data stores objects and has a queue that keeps their ordering according
+ // to the heap invariant.
+ data *heapData
+
+ // closed indicates that the queue is closed.
+ // It is mainly used to let Pop() exit its control loop while waiting for an item.
+ closed bool
+}
+
+// Close the Heap and signals condition variables that may be waiting to pop
+// items from the heap.
+func (h *Heap) Close() {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ h.closed = true
+ h.cond.Broadcast()
+}
+
+// Add inserts an item, and puts it in the queue. The item is updated if it
+// already exists.
+func (h *Heap) Add(obj interface{}) error {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if h.closed {
+ return fmt.Errorf(closedMsg)
+ }
+ if _, exists := h.data.items[key]; exists {
+ h.data.items[key].obj = obj
+ heap.Fix(h.data, h.data.items[key].index)
+ } else {
+ h.addIfNotPresentLocked(key, obj)
+ }
+ h.cond.Broadcast()
+ return nil
+}
+
+// Adds all the items in the list to the queue and then signals the condition
+// variable. It is useful when the caller would like to add all of the items
+// to the queue before consumer starts processing them.
+func (h *Heap) BulkAdd(list []interface{}) error {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if h.closed {
+ return fmt.Errorf(closedMsg)
+ }
+ for _, obj := range list {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ if _, exists := h.data.items[key]; exists {
+ h.data.items[key].obj = obj
+ heap.Fix(h.data, h.data.items[key].index)
+ } else {
+ h.addIfNotPresentLocked(key, obj)
+ }
+ }
+ h.cond.Broadcast()
+ return nil
+}
+
+// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
+// the key is present in the map, no changes is made to the item.
+//
+// This is useful in a single producer/consumer scenario so that the consumer can
+// safely retry items without contending with the producer and potentially enqueueing
+// stale items.
+func (h *Heap) AddIfNotPresent(obj interface{}) error {
+ id, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if h.closed {
+ return fmt.Errorf(closedMsg)
+ }
+ h.addIfNotPresentLocked(id, obj)
+ h.cond.Broadcast()
+ return nil
+}
+
+// addIfNotPresentLocked assumes the lock is already held and adds the the provided
+// item to the queue if it does not already exist.
+func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
+ if _, exists := h.data.items[key]; exists {
+ return
+ }
+ heap.Push(h.data, &itemKeyValue{key, obj})
+}
+
+// Update is the same as Add in this implementation. When the item does not
+// exist, it is added.
+func (h *Heap) Update(obj interface{}) error {
+ return h.Add(obj)
+}
+
+// Delete removes an item.
+func (h *Heap) Delete(obj interface{}) error {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if item, ok := h.data.items[key]; ok {
+ heap.Remove(h.data, item.index)
+ return nil
+ }
+ return fmt.Errorf("object not found")
+}
+
+// Pop waits until an item is ready. If multiple items are
+// ready, they are returned in the order given by Heap.data.lessFunc.
+func (h *Heap) Pop() (interface{}, error) {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ for len(h.data.queue) == 0 {
+ // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
+ // When Close() is called, the h.closed is set and the condition is broadcast,
+ // which causes this loop to continue and return from the Pop().
+ if h.closed {
+ return nil, fmt.Errorf("heap is closed")
+ }
+ h.cond.Wait()
+ }
+ obj := heap.Pop(h.data)
+ if obj != nil {
+ return obj, nil
+ } else {
+ return nil, fmt.Errorf("object was removed from heap data")
+ }
+}
+
+// List returns a list of all the items.
+func (h *Heap) List() []interface{} {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ list := make([]interface{}, 0, len(h.data.items))
+ for _, item := range h.data.items {
+ list = append(list, item.obj)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently in the Heap.
+func (h *Heap) ListKeys() []string {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ list := make([]string, 0, len(h.data.items))
+ for key := range h.data.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+// Get returns the requested item, or sets exists=false.
+func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return h.GetByKey(key)
+}
+
+// GetByKey returns the requested item, or sets exists=false.
+func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ item, exists := h.data.items[key]
+ if !exists {
+ return nil, false, nil
+ }
+ return item.obj, true, nil
+}
+
+// IsClosed returns true if the queue is closed.
+func (h *Heap) IsClosed() bool {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ if h.closed {
+ return true
+ }
+ return false
+}
+
+// NewHeap returns a Heap which can be used to queue up items to process.
+func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
+ h := &Heap{
+ data: &heapData{
+ items: map[string]*heapItem{},
+ queue: []string{},
+ keyFunc: keyFn,
+ lessFunc: lessFn,
+ },
+ }
+ h.cond.L = &h.lock
+ return h
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/listwatch.go b/vendor/k8s.io/client-go/tools/cache/listwatch.go
index af01d4745..06657a3b0 100644
--- a/vendor/k8s.io/client-go/tools/cache/listwatch.go
+++ b/vendor/k8s.io/client-go/tools/cache/listwatch.go
@@ -19,12 +19,16 @@ package cache
import (
"time"
+ "golang.org/x/net/context"
+
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/pager"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
@@ -48,6 +52,8 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
+ // DisableChunking requests no chunking for this list watcher.
+ DisableChunking bool
}
// Getter interface knows how to access Get method from RESTClient.
@@ -57,22 +63,32 @@ type Getter interface {
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
+ optionsModifier := func(options *metav1.ListOptions) {
+ options.FieldSelector = fieldSelector.String()
+ }
+ return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
+}
+
+// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
+// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
+// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
+func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
+ optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
- FieldsSelectorParam(fieldSelector).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
+ optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
- FieldsSelectorParam(fieldSelector).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
@@ -87,6 +103,9 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
+ if !lw.DisableChunking {
+ return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
+ }
return lw.ListFunc(options)
}
@@ -95,6 +114,8 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
return lw.WatchFunc(options)
}
+// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
+// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
@@ -158,5 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
return nil, err
}
- return watch.Until(timeout, watchInterface, remainingConditions...)
+ evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
+ if err == watch.ErrWatchClosed {
+ // present a consistent error interface to callers
+ err = wait.ErrWaitTimeout
+ }
+ return evt, err
}
diff --git a/vendor/k8s.io/client-go/tools/cache/mutation_cache.go b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
index 0fa06bf77..cbb6434eb 100644
--- a/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
+++ b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
@@ -156,7 +156,7 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er
}
elements, err := fn(updated)
if err != nil {
- glog.V(4).Info("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
+ glog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
continue
}
for _, inIndex := range elements {
diff --git a/vendor/k8s.io/client-go/tools/cache/mutation_detector.go b/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
index cc6094ce4..8e6338a1b 100644
--- a/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
+++ b/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
@@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
- "k8s.io/client-go/kubernetes/scheme"
)
var mutationDetectionEnabled = false
@@ -79,17 +78,15 @@ type cacheObj struct {
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
// we DON'T want protection from panics. If we're running this code, we want to die
- go func() {
- for {
- d.CompareObjects()
-
- select {
- case <-stopCh:
- return
- case <-time.After(d.period):
- }
+ for {
+ d.CompareObjects()
+
+ select {
+ case <-stopCh:
+ return
+ case <-time.After(d.period):
}
- }()
+ }
}
// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
@@ -98,18 +95,13 @@ func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
if _, ok := obj.(DeletedFinalStateUnknown); ok {
return
}
- if _, ok := obj.(runtime.Object); !ok {
- return
- }
+ if obj, ok := obj.(runtime.Object); ok {
+ copiedObj := obj.DeepCopyObject()
- copiedObj, err := scheme.Scheme.Copy(obj.(runtime.Object))
- if err != nil {
- return
+ d.lock.Lock()
+ defer d.lock.Unlock()
+ d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
}
-
- d.lock.Lock()
- defer d.lock.Unlock()
- d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
}
func (d *defaultCacheMutationDetector) CompareObjects() {
diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go
index 9a730610c..054a7373c 100644
--- a/vendor/k8s.io/client-go/tools/cache/reflector.go
+++ b/vendor/k8s.io/client-go/tools/cache/reflector.go
@@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"time"
@@ -48,6 +49,8 @@ import (
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
+ // metrics tracks basic metric information about the reflector
+ metrics *reflectorMetrics
// The type of object we expect to place in the store.
expectedType reflect.Type
@@ -96,10 +99,17 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
}
+// reflectorDisambiguator is used to disambiguate started reflectors.
+// initialized to an unstable value to ensure meaning isn't attributed to the suffix.
+var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
+
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
+ reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
r := &Reflector{
- name: name,
+ name: name,
+ // we need this to be unique per process (some names are still the same) but obvious who it belongs to
+ metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
@@ -110,6 +120,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
return r
}
+func makeValidPrometheusMetricLabel(in string) string {
+ // this isn't perfect, but it removes our common characters
+ return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
+}
+
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
@@ -182,21 +197,10 @@ func extractStackCreator() (string, int, bool) {
}
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
-// Run starts a goroutine and returns immediately.
-func (r *Reflector) Run() {
+// Run will exit when stopCh is closed.
+func (r *Reflector) Run(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
- go wait.Until(func() {
- if err := r.ListAndWatch(wait.NeverStop); err != nil {
- utilruntime.HandleError(err)
- }
- }, r.period, wait.NeverStop)
-}
-
-// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
-// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
-func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
- glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
- go wait.Until(func() {
+ wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
@@ -235,17 +239,18 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
var resourceVersion string
- resyncCh, cleanup := r.resyncChan()
- defer cleanup()
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
+ r.metrics.numberOfLists.Inc()
+ start := r.clock.Now()
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
+ r.metrics.listDuration.Observe(time.Since(start).Seconds())
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
@@ -255,6 +260,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
+ r.metrics.numberOfItemsInList.Observe(float64(len(items)))
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
@@ -264,6 +270,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
+ resyncCh, cleanup := r.resyncChan()
+ defer func() {
+ cleanup() // Call the last one written into cleanup
+ }()
for {
select {
case <-resyncCh:
@@ -285,14 +295,22 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}()
for {
- timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
+ // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
+ select {
+ case <-stopCh:
+ return nil
+ default:
+ }
+
+ timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
- TimeoutSeconds: &timemoutseconds,
+ TimeoutSeconds: &timeoutSeconds,
}
+ r.metrics.numberOfWatches.Inc()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
@@ -344,6 +362,11 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, err
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
+ // update metrics
+ defer func() {
+ r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
+ r.metrics.watchDuration.Observe(time.Since(start).Seconds())
+ }()
loop:
for {
@@ -399,8 +422,8 @@ loop:
watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
- glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
- return errors.New("very short watch")
+ r.metrics.numberOfShortWatches.Inc()
+ return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
return nil
@@ -418,4 +441,9 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
+
+ rv, err := strconv.Atoi(v)
+ if err == nil {
+ r.metrics.lastResourceVersion.Set(float64(rv))
+ }
}
diff --git a/vendor/k8s.io/client-go/tools/cache/reflector_metrics.go b/vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
new file mode 100644
index 000000000..0945e5c3a
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
@@ -0,0 +1,119 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// This file provides abstractions for setting the provider (e.g., prometheus)
+// of metrics.
+
+package cache
+
+import (
+ "sync"
+)
+
+// GaugeMetric represents a single numerical value that can arbitrarily go up
+// and down.
+type GaugeMetric interface {
+ Set(float64)
+}
+
+// CounterMetric represents a single numerical value that only ever
+// goes up.
+type CounterMetric interface {
+ Inc()
+}
+
+// SummaryMetric captures individual observations.
+type SummaryMetric interface {
+ Observe(float64)
+}
+
+type noopMetric struct{}
+
+func (noopMetric) Inc() {}
+func (noopMetric) Dec() {}
+func (noopMetric) Observe(float64) {}
+func (noopMetric) Set(float64) {}
+
+type reflectorMetrics struct {
+ numberOfLists CounterMetric
+ listDuration SummaryMetric
+ numberOfItemsInList SummaryMetric
+
+ numberOfWatches CounterMetric
+ numberOfShortWatches CounterMetric
+ watchDuration SummaryMetric
+ numberOfItemsInWatch SummaryMetric
+
+ lastResourceVersion GaugeMetric
+}
+
+// MetricsProvider generates various metrics used by the reflector.
+type MetricsProvider interface {
+ NewListsMetric(name string) CounterMetric
+ NewListDurationMetric(name string) SummaryMetric
+ NewItemsInListMetric(name string) SummaryMetric
+
+ NewWatchesMetric(name string) CounterMetric
+ NewShortWatchesMetric(name string) CounterMetric
+ NewWatchDurationMetric(name string) SummaryMetric
+ NewItemsInWatchMetric(name string) SummaryMetric
+
+ NewLastResourceVersionMetric(name string) GaugeMetric
+}
+
+type noopMetricsProvider struct{}
+
+func (noopMetricsProvider) NewListsMetric(name string) CounterMetric { return noopMetric{} }
+func (noopMetricsProvider) NewListDurationMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewItemsInListMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewWatchesMetric(name string) CounterMetric { return noopMetric{} }
+func (noopMetricsProvider) NewShortWatchesMetric(name string) CounterMetric { return noopMetric{} }
+func (noopMetricsProvider) NewWatchDurationMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewItemsInWatchMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewLastResourceVersionMetric(name string) GaugeMetric {
+ return noopMetric{}
+}
+
+var metricsFactory = struct {
+ metricsProvider MetricsProvider
+ setProviders sync.Once
+}{
+ metricsProvider: noopMetricsProvider{},
+}
+
+func newReflectorMetrics(name string) *reflectorMetrics {
+ var ret *reflectorMetrics
+ if len(name) == 0 {
+ return ret
+ }
+ return &reflectorMetrics{
+ numberOfLists: metricsFactory.metricsProvider.NewListsMetric(name),
+ listDuration: metricsFactory.metricsProvider.NewListDurationMetric(name),
+ numberOfItemsInList: metricsFactory.metricsProvider.NewItemsInListMetric(name),
+ numberOfWatches: metricsFactory.metricsProvider.NewWatchesMetric(name),
+ numberOfShortWatches: metricsFactory.metricsProvider.NewShortWatchesMetric(name),
+ watchDuration: metricsFactory.metricsProvider.NewWatchDurationMetric(name),
+ numberOfItemsInWatch: metricsFactory.metricsProvider.NewItemsInWatchMetric(name),
+ lastResourceVersion: metricsFactory.metricsProvider.NewLastResourceVersionMetric(name),
+ }
+}
+
+// SetReflectorMetricsProvider sets the metrics provider
+func SetReflectorMetricsProvider(metricsProvider MetricsProvider) {
+ metricsFactory.setProviders.Do(func() {
+ metricsFactory.metricsProvider = metricsProvider
+ })
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/shared_informer.go b/vendor/k8s.io/client-go/tools/cache/shared_informer.go
index a0dbbb697..f6ce07f7a 100644
--- a/vendor/k8s.io/client-go/tools/cache/shared_informer.go
+++ b/vendor/k8s.io/client-go/tools/cache/shared_informer.go
@@ -25,6 +25,8 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/util/buffer"
+ "k8s.io/client-go/util/retry"
"github.com/golang/glog"
)
@@ -92,8 +94,13 @@ func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEve
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func() bool
-// syncedPollPeriod controls how often you look at the status of your sync funcs
-const syncedPollPeriod = 100 * time.Millisecond
+const (
+ // syncedPollPeriod controls how often you look at the status of your sync funcs
+ syncedPollPeriod = 100 * time.Millisecond
+
+ // initialBufferSize is the initial number of event notifications that can be buffered.
+ initialBufferSize = 1024
+)
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
@@ -138,15 +145,12 @@ type sharedIndexInformer struct {
// clock allows for testability
clock clock.Clock
- started bool
- startedLock sync.Mutex
+ started, stopped bool
+ startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
- // stopCh is the channel used to stop the main Run process. We have to track it so that
- // late joiners can have a proper stop
- stopCh <-chan struct{}
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@@ -207,16 +211,20 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.started = true
}()
- s.stopCh = stopCh
- s.cacheMutationDetector.Run(stopCh)
- s.processor.run(stopCh)
- s.controller.Run(stopCh)
-}
+ // Separate stop channel because Processor should be stopped strictly after controller
+ processorStopCh := make(chan struct{})
+ var wg wait.Group
+ defer wg.Wait() // Wait for Processor to stop
+ defer close(processorStopCh) // Tell Processor to stop
+ wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
+ wg.StartWithChannel(processorStopCh, s.processor.run)
-func (s *sharedIndexInformer) isStarted() bool {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- return s.started
+ defer func() {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+ s.stopped = true // Don't want any new listeners
+ }()
+ s.controller.Run(stopCh)
}
func (s *sharedIndexInformer) HasSynced() bool {
@@ -287,6 +295,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.startedLock.Lock()
defer s.startedLock.Unlock()
+ if s.stopped {
+ glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
+ return
+ }
+
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
@@ -307,7 +320,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
}
- listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
+ listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started {
s.processor.addListener(listener)
@@ -322,14 +335,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
- s.processor.addListener(listener)
-
- go listener.run(s.stopCh)
- go listener.pop(s.stopCh)
-
- items := s.indexer.List()
- for i := range items {
- listener.add(addNotification{newObj: items[i]})
+ s.processor.addAndStartListener(listener)
+ for _, item := range s.indexer.List() {
+ listener.add(addNotification{newObj: item})
}
}
@@ -369,12 +377,26 @@ type sharedProcessor struct {
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
+ wg wait.Group
+}
+
+func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
+ p.listenersLock.Lock()
+ defer p.listenersLock.Unlock()
+
+ p.addListenerLocked(listener)
+ p.wg.Start(listener.run)
+ p.wg.Start(listener.pop)
}
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
+ p.addListenerLocked(listener)
+}
+
+func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
@@ -395,13 +417,21 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
+ func() {
+ p.listenersLock.RLock()
+ defer p.listenersLock.RUnlock()
+ for _, listener := range p.listeners {
+ p.wg.Start(listener.run)
+ p.wg.Start(listener.pop)
+ }
+ }()
+ <-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
-
for _, listener := range p.listeners {
- go listener.run(stopCh)
- go listener.pop(stopCh)
+ close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
+ p.wg.Wait() // Wait for all .pop() and .run() to stop
}
// shouldResync queries every listener to determine if any of them need a resync, based on each
@@ -437,21 +467,18 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
}
type processorListener struct {
- // lock/cond protects access to 'pendingNotifications'.
- lock sync.RWMutex
- cond sync.Cond
-
- // pendingNotifications is an unbounded slice that holds all notifications not yet distributed
- // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
- // added until we OOM.
- // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
- // we should try to do something better
- pendingNotifications []interface{}
-
nextCh chan interface{}
+ addCh chan interface{}
handler ResourceEventHandler
+ // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
+ // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
+ // added until we OOM.
+ // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
+ // we should try to do something better.
+ pendingNotifications buffer.RingGrowing
+
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
@@ -464,93 +491,85 @@ type processorListener struct {
resyncLock sync.Mutex
}
-func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
+func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
- pendingNotifications: []interface{}{},
nextCh: make(chan interface{}),
+ addCh: make(chan interface{}),
handler: handler,
+ pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
- ret.cond.L = &ret.lock
-
ret.determineNextResync(now)
return ret
}
func (p *processorListener) add(notification interface{}) {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- p.pendingNotifications = append(p.pendingNotifications, notification)
- p.cond.Broadcast()
+ p.addCh <- notification
}
-func (p *processorListener) pop(stopCh <-chan struct{}) {
+func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
+ defer close(p.nextCh) // Tell .run() to stop
+ var nextCh chan<- interface{}
+ var notification interface{}
for {
- blockingGet := func() (interface{}, bool) {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- for len(p.pendingNotifications) == 0 {
- // check if we're shutdown
- select {
- case <-stopCh:
- return nil, true
- default:
- }
- p.cond.Wait()
- }
-
- nt := p.pendingNotifications[0]
- p.pendingNotifications = p.pendingNotifications[1:]
- return nt, false
- }
-
- notification, stopped := blockingGet()
- if stopped {
- return
- }
-
select {
- case <-stopCh:
- return
- case p.nextCh <- notification:
+ case nextCh <- notification:
+ // Notification dispatched
+ var ok bool
+ notification, ok = p.pendingNotifications.ReadOne()
+ if !ok { // Nothing to pop
+ nextCh = nil // Disable this select case
+ }
+ case notificationToAdd, ok := <-p.addCh:
+ if !ok {
+ return
+ }
+ if notification == nil { // No notification to pop (and pendingNotifications is empty)
+ // Optimize the case - skip adding to pendingNotifications
+ notification = notificationToAdd
+ nextCh = p.nextCh
+ } else { // There is already a notification waiting to be dispatched
+ p.pendingNotifications.WriteOne(notificationToAdd)
+ }
}
}
}
-func (p *processorListener) run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
-
- for {
- var next interface{}
- select {
- case <-stopCh:
- func() {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.cond.Broadcast()
- }()
- return
- case next = <-p.nextCh:
- }
+func (p *processorListener) run() {
+ // this call blocks until the channel is closed. When a panic happens during the notification
+ // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
+ // the next notification will be attempted. This is usually better than the alternative of never
+ // delivering again.
+ stopCh := make(chan struct{})
+ wait.Until(func() {
+ // this gives us a few quick retries before a long pause and then a few more quick retries
+ err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
+ for next := range p.nextCh {
+ switch notification := next.(type) {
+ case updateNotification:
+ p.handler.OnUpdate(notification.oldObj, notification.newObj)
+ case addNotification:
+ p.handler.OnAdd(notification.newObj)
+ case deleteNotification:
+ p.handler.OnDelete(notification.oldObj)
+ default:
+ utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
+ }
+ }
+ // the only way to get here is if the p.nextCh is empty and closed
+ return true, nil
+ })
- switch notification := next.(type) {
- case updateNotification:
- p.handler.OnUpdate(notification.oldObj, notification.newObj)
- case addNotification:
- p.handler.OnAdd(notification.newObj)
- case deleteNotification:
- p.handler.OnDelete(notification.oldObj)
- default:
- utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
+ // the only way to get here is if the p.nextCh is empty and closed
+ if err == nil {
+ close(stopCh)
}
- }
+ }, 1*time.Minute, stopCh)
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
diff --git a/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go b/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
index 4eb350c43..1c201efb6 100644
--- a/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
+++ b/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
@@ -241,7 +241,7 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
-func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
+func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
@@ -249,7 +249,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(newObj)
if err != nil {
- return err
+ panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
if index == nil {
@@ -266,16 +266,15 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
set.Insert(key)
}
}
- return nil
}
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
-func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
+func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(obj)
if err != nil {
- return err
+ panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
@@ -289,7 +288,6 @@ func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
}
}
}
- return nil
}
func (c *threadSafeMap) Resync() error {
diff --git a/vendor/k8s.io/client-go/tools/clientcmd/api/doc.go b/vendor/k8s.io/client-go/tools/clientcmd/api/doc.go
new file mode 100644
index 000000000..0a081871a
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/clientcmd/api/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// +k8s:deepcopy-gen=package
+package api
diff --git a/vendor/k8s.io/client-go/tools/clientcmd/api/types.go b/vendor/k8s.io/client-go/tools/clientcmd/api/types.go
index 76090c6f5..407dec83a 100644
--- a/vendor/k8s.io/client-go/tools/clientcmd/api/types.go
+++ b/vendor/k8s.io/client-go/tools/clientcmd/api/types.go
@@ -25,6 +25,7 @@ import (
// Config holds the information needed to build connect to remote kubernetes clusters as a given user
// IMPORTANT if you add fields to this struct, please update IsConfigEmpty()
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Config struct {
// Legacy field from pkg/api/types.go TypeMeta.
// TODO(jlowdermilk): remove this after eliminating downstream dependencies.
diff --git a/vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go b/vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go
new file mode 100644
index 000000000..e575b23d7
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go
@@ -0,0 +1,270 @@
+// +build !ignore_autogenerated
+
+/*
+Copyright 2018 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// This file was autogenerated by deepcopy-gen. Do not edit it manually!
+
+package api
+
+import (
+ runtime "k8s.io/apimachinery/pkg/runtime"
+)
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *AuthInfo) DeepCopyInto(out *AuthInfo) {
+ *out = *in
+ if in.ClientCertificateData != nil {
+ in, out := &in.ClientCertificateData, &out.ClientCertificateData
+ *out = make([]byte, len(*in))
+ copy(*out, *in)
+ }
+ if in.ClientKeyData != nil {
+ in, out := &in.ClientKeyData, &out.ClientKeyData
+ *out = make([]byte, len(*in))
+ copy(*out, *in)
+ }
+ if in.ImpersonateGroups != nil {
+ in, out := &in.ImpersonateGroups, &out.ImpersonateGroups
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
+ if in.ImpersonateUserExtra != nil {
+ in, out := &in.ImpersonateUserExtra, &out.ImpersonateUserExtra
+ *out = make(map[string][]string, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = make([]string, len(val))
+ copy((*out)[key], val)
+ }
+ }
+ }
+ if in.AuthProvider != nil {
+ in, out := &in.AuthProvider, &out.AuthProvider
+ if *in == nil {
+ *out = nil
+ } else {
+ *out = new(AuthProviderConfig)
+ (*in).DeepCopyInto(*out)
+ }
+ }
+ if in.Extensions != nil {
+ in, out := &in.Extensions, &out.Extensions
+ *out = make(map[string]runtime.Object, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = val.DeepCopyObject()
+ }
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthInfo.
+func (in *AuthInfo) DeepCopy() *AuthInfo {
+ if in == nil {
+ return nil
+ }
+ out := new(AuthInfo)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *AuthProviderConfig) DeepCopyInto(out *AuthProviderConfig) {
+ *out = *in
+ if in.Config != nil {
+ in, out := &in.Config, &out.Config
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthProviderConfig.
+func (in *AuthProviderConfig) DeepCopy() *AuthProviderConfig {
+ if in == nil {
+ return nil
+ }
+ out := new(AuthProviderConfig)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Cluster) DeepCopyInto(out *Cluster) {
+ *out = *in
+ if in.CertificateAuthorityData != nil {
+ in, out := &in.CertificateAuthorityData, &out.CertificateAuthorityData
+ *out = make([]byte, len(*in))
+ copy(*out, *in)
+ }
+ if in.Extensions != nil {
+ in, out := &in.Extensions, &out.Extensions
+ *out = make(map[string]runtime.Object, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = val.DeepCopyObject()
+ }
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Cluster.
+func (in *Cluster) DeepCopy() *Cluster {
+ if in == nil {
+ return nil
+ }
+ out := new(Cluster)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Config) DeepCopyInto(out *Config) {
+ *out = *in
+ in.Preferences.DeepCopyInto(&out.Preferences)
+ if in.Clusters != nil {
+ in, out := &in.Clusters, &out.Clusters
+ *out = make(map[string]*Cluster, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = new(Cluster)
+ val.DeepCopyInto((*out)[key])
+ }
+ }
+ }
+ if in.AuthInfos != nil {
+ in, out := &in.AuthInfos, &out.AuthInfos
+ *out = make(map[string]*AuthInfo, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = new(AuthInfo)
+ val.DeepCopyInto((*out)[key])
+ }
+ }
+ }
+ if in.Contexts != nil {
+ in, out := &in.Contexts, &out.Contexts
+ *out = make(map[string]*Context, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = new(Context)
+ val.DeepCopyInto((*out)[key])
+ }
+ }
+ }
+ if in.Extensions != nil {
+ in, out := &in.Extensions, &out.Extensions
+ *out = make(map[string]runtime.Object, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = val.DeepCopyObject()
+ }
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Config.
+func (in *Config) DeepCopy() *Config {
+ if in == nil {
+ return nil
+ }
+ out := new(Config)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *Config) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ } else {
+ return nil
+ }
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Context) DeepCopyInto(out *Context) {
+ *out = *in
+ if in.Extensions != nil {
+ in, out := &in.Extensions, &out.Extensions
+ *out = make(map[string]runtime.Object, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = val.DeepCopyObject()
+ }
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Context.
+func (in *Context) DeepCopy() *Context {
+ if in == nil {
+ return nil
+ }
+ out := new(Context)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Preferences) DeepCopyInto(out *Preferences) {
+ *out = *in
+ if in.Extensions != nil {
+ in, out := &in.Extensions, &out.Extensions
+ *out = make(map[string]runtime.Object, len(*in))
+ for key, val := range *in {
+ if val == nil {
+ (*out)[key] = nil
+ } else {
+ (*out)[key] = val.DeepCopyObject()
+ }
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Preferences.
+func (in *Preferences) DeepCopy() *Preferences {
+ if in == nil {
+ return nil
+ }
+ out := new(Preferences)
+ in.DeepCopyInto(out)
+ return out
+}
diff --git a/vendor/k8s.io/client-go/tools/pager/pager.go b/vendor/k8s.io/client-go/tools/pager/pager.go
new file mode 100644
index 000000000..2e0874e0e
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/pager/pager.go
@@ -0,0 +1,118 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package pager
+
+import (
+ "fmt"
+
+ "golang.org/x/net/context"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+const defaultPageSize = 500
+
+// ListPageFunc returns a list object for the given list options.
+type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
+
+// SimplePageFunc adapts a context-less list function into one that accepts a context.
+func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc {
+ return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
+ return fn(opts)
+ }
+}
+
+// ListPager assists client code in breaking large list queries into multiple
+// smaller chunks of PageSize or smaller. PageFn is expected to accept a
+// metav1.ListOptions that supports paging and return a list. The pager does
+// not alter the field or label selectors on the initial options list.
+type ListPager struct {
+ PageSize int64
+ PageFn ListPageFunc
+
+ FullListIfExpired bool
+}
+
+// New creates a new pager from the provided pager function using the default
+// options. It will fall back to a full list if an expiration error is encountered
+// as a last resort.
+func New(fn ListPageFunc) *ListPager {
+ return &ListPager{
+ PageSize: defaultPageSize,
+ PageFn: fn,
+ FullListIfExpired: true,
+ }
+}
+
+// TODO: introduce other types of paging functions - such as those that retrieve from a list
+// of namespaces.
+
+// List returns a single list object, but attempts to retrieve smaller chunks from the
+// server to reduce the impact on the server. If the chunk attempt fails, it will load
+// the full list instead. The Limit field on options, if unset, will default to the page size.
+func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
+ if options.Limit == 0 {
+ options.Limit = p.PageSize
+ }
+ var list *metainternalversion.List
+ for {
+ obj, err := p.PageFn(ctx, options)
+ if err != nil {
+ if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
+ return nil, err
+ }
+ // the list expired while we were processing, fall back to a full list
+ options.Limit = 0
+ options.Continue = ""
+ return p.PageFn(ctx, options)
+ }
+ m, err := meta.ListAccessor(obj)
+ if err != nil {
+ return nil, fmt.Errorf("returned object must be a list: %v", err)
+ }
+
+ // exit early and return the object we got if we haven't processed any pages
+ if len(m.GetContinue()) == 0 && list == nil {
+ return obj, nil
+ }
+
+ // initialize the list and fill its contents
+ if list == nil {
+ list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)}
+ list.ResourceVersion = m.GetResourceVersion()
+ list.SelfLink = m.GetSelfLink()
+ }
+ if err := meta.EachListItem(obj, func(obj runtime.Object) error {
+ list.Items = append(list.Items, obj)
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+
+ // if we have no more items, return the list
+ if len(m.GetContinue()) == 0 {
+ return list, nil
+ }
+
+ // set the next loop up
+ options.Continue = m.GetContinue()
+ }
+}
diff --git a/vendor/k8s.io/client-go/tools/record/event.go b/vendor/k8s.io/client-go/tools/record/event.go
index 6b2fad409..b5ec44650 100644
--- a/vendor/k8s.io/client-go/tools/record/event.go
+++ b/vendor/k8s.io/client-go/tools/record/event.go
@@ -21,15 +21,15 @@ import (
"math/rand"
"time"
+ "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/pkg/api/v1"
- "k8s.io/client-go/pkg/api/v1/ref"
restclient "k8s.io/client-go/rest"
+ ref "k8s.io/client-go/tools/reference"
"net/http"
diff --git a/vendor/k8s.io/client-go/tools/record/events_cache.go b/vendor/k8s.io/client-go/tools/record/events_cache.go
index 785ec6477..6ac767c9f 100644
--- a/vendor/k8s.io/client-go/tools/record/events_cache.go
+++ b/vendor/k8s.io/client-go/tools/record/events_cache.go
@@ -25,11 +25,12 @@ import (
"github.com/golang/groupcache/lru"
+ "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
- "k8s.io/client-go/pkg/api/v1"
+ "k8s.io/client-go/util/flowcontrol"
)
const (
@@ -39,6 +40,13 @@ const (
// more than 10 times in a 10 minute period, aggregate the event
defaultAggregateMaxEvents = 10
defaultAggregateIntervalInSeconds = 600
+
+ // by default, allow a source to send 25 events about an object
+ // but control the refill rate to 1 new event every 5 minutes
+ // this helps control the long-tail of events for things that are always
+ // unhealthy
+ defaultSpamBurst = 25
+ defaultSpamQPS = 1. / 300.
)
// getEventKey builds unique event key based on source, involvedObject, reason, message
@@ -59,6 +67,20 @@ func getEventKey(event *v1.Event) string {
"")
}
+// getSpamKey builds unique event key based on source, involvedObject
+func getSpamKey(event *v1.Event) string {
+ return strings.Join([]string{
+ event.Source.Component,
+ event.Source.Host,
+ event.InvolvedObject.Kind,
+ event.InvolvedObject.Namespace,
+ event.InvolvedObject.Name,
+ string(event.InvolvedObject.UID),
+ event.InvolvedObject.APIVersion,
+ },
+ "")
+}
+
// EventFilterFunc is a function that returns true if the event should be skipped
type EventFilterFunc func(event *v1.Event) bool
@@ -67,6 +89,69 @@ func DefaultEventFilterFunc(event *v1.Event) bool {
return false
}
+// EventSourceObjectSpamFilter is responsible for throttling
+// the amount of events a source and object can produce.
+type EventSourceObjectSpamFilter struct {
+ sync.RWMutex
+
+ // the cache that manages last synced state
+ cache *lru.Cache
+
+ // burst is the amount of events we allow per source + object
+ burst int
+
+ // qps is the refill rate of the token bucket in queries per second
+ qps float32
+
+ // clock is used to allow for testing over a time interval
+ clock clock.Clock
+}
+
+// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
+func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
+ return &EventSourceObjectSpamFilter{
+ cache: lru.New(lruCacheSize),
+ burst: burst,
+ qps: qps,
+ clock: clock,
+ }
+}
+
+// spamRecord holds data used to perform spam filtering decisions.
+type spamRecord struct {
+ // rateLimiter controls the rate of events about this object
+ rateLimiter flowcontrol.RateLimiter
+}
+
+// Filter controls that a given source+object are not exceeding the allowed rate.
+func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
+ var record spamRecord
+
+ // controls our cached information about this event (source+object)
+ eventKey := getSpamKey(event)
+
+ // do we have a record of similar events in our cache?
+ f.Lock()
+ defer f.Unlock()
+ value, found := f.cache.Get(eventKey)
+ if found {
+ record = value.(spamRecord)
+ }
+
+ // verify we have a rate limiter for this record
+ if record.rateLimiter == nil {
+ record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
+ }
+
+ // ensure we have available rate
+ filter := !record.rateLimiter.TryAccept()
+
+ // update the cache
+ f.cache.Add(eventKey, record)
+
+ return filter
+}
+
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
// It returns a tuple of the following:
// aggregateKey - key the identifies the aggregate group to bucket this event
@@ -337,7 +422,6 @@ type EventCorrelateResult struct {
// prior to interacting with the API server to record the event.
//
// The default behavior is as follows:
-// * No events are filtered from being recorded
// * Aggregation is performed if a similar event is recorded 10 times in a
// in a 10 minute rolling interval. A similar event is an event that varies only by
// the Event.Message field. Rather than recording the precise event, aggregation
@@ -345,10 +429,13 @@ type EventCorrelateResult struct {
// the same reason.
// * Events are incrementally counted if the exact same event is encountered multiple
// times.
+// * A source may burst 25 events about an object, but has a refill rate budget
+// per object of 1 event every 5 minutes to control long-tail of spam.
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
cacheSize := maxLruCacheEntries
+ spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
return &EventCorrelator{
- filterFunc: DefaultEventFilterFunc,
+ filterFunc: spamFilter.Filter,
aggregator: NewEventAggregator(
cacheSize,
EventAggregatorByReasonFunc,
@@ -363,11 +450,14 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
- if c.filterFunc(newEvent) {
- return &EventCorrelateResult{Skip: true}, nil
+ if newEvent == nil {
+ return nil, fmt.Errorf("event is nil")
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
+ if c.filterFunc(observedEvent) {
+ return &EventCorrelateResult{Skip: true}, nil
+ }
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
diff --git a/vendor/k8s.io/client-go/tools/reference/ref.go b/vendor/k8s.io/client-go/tools/reference/ref.go
new file mode 100644
index 000000000..58b60fd5d
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/reference/ref.go
@@ -0,0 +1,122 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package reference
+
+import (
+ "errors"
+ "fmt"
+ "net/url"
+ "strings"
+
+ "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+var (
+ // Errors that could be returned by GetReference.
+ ErrNilObject = errors.New("can't reference a nil object")
+ ErrNoSelfLink = errors.New("selfLink was empty, can't make reference")
+)
+
+// GetReference returns an ObjectReference which refers to the given
+// object, or an error if the object doesn't follow the conventions
+// that would allow this.
+// TODO: should take a meta.Interface see http://issue.k8s.io/7127
+func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReference, error) {
+ if obj == nil {
+ return nil, ErrNilObject
+ }
+ if ref, ok := obj.(*v1.ObjectReference); ok {
+ // Don't make a reference to a reference.
+ return ref, nil
+ }
+
+ gvk := obj.GetObjectKind().GroupVersionKind()
+
+ // if the object referenced is actually persisted, we can just get kind from meta
+ // if we are building an object reference to something not yet persisted, we should fallback to scheme
+ kind := gvk.Kind
+ if len(kind) == 0 {
+ // TODO: this is wrong
+ gvks, _, err := scheme.ObjectKinds(obj)
+ if err != nil {
+ return nil, err
+ }
+ kind = gvks[0].Kind
+ }
+
+ // An object that implements only List has enough metadata to build a reference
+ var listMeta metav1.Common
+ objectMeta, err := meta.Accessor(obj)
+ if err != nil {
+ listMeta, err = meta.CommonAccessor(obj)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ listMeta = objectMeta
+ }
+
+ // if the object referenced is actually persisted, we can also get version from meta
+ version := gvk.GroupVersion().String()
+ if len(version) == 0 {
+ selfLink := listMeta.GetSelfLink()
+ if len(selfLink) == 0 {
+ return nil, ErrNoSelfLink
+ }
+ selfLinkUrl, err := url.Parse(selfLink)
+ if err != nil {
+ return nil, err
+ }
+ // example paths: /<prefix>/<version>/*
+ parts := strings.Split(selfLinkUrl.Path, "/")
+ if len(parts) < 3 {
+ return nil, fmt.Errorf("unexpected self link format: '%v'; got version '%v'", selfLink, version)
+ }
+ version = parts[2]
+ }
+
+ // only has list metadata
+ if objectMeta == nil {
+ return &v1.ObjectReference{
+ Kind: kind,
+ APIVersion: version,
+ ResourceVersion: listMeta.GetResourceVersion(),
+ }, nil
+ }
+
+ return &v1.ObjectReference{
+ Kind: kind,
+ APIVersion: version,
+ Name: objectMeta.GetName(),
+ Namespace: objectMeta.GetNamespace(),
+ UID: objectMeta.GetUID(),
+ ResourceVersion: objectMeta.GetResourceVersion(),
+ }, nil
+}
+
+// GetPartialReference is exactly like GetReference, but allows you to set the FieldPath.
+func GetPartialReference(scheme *runtime.Scheme, obj runtime.Object, fieldPath string) (*v1.ObjectReference, error) {
+ ref, err := GetReference(scheme, obj)
+ if err != nil {
+ return nil, err
+ }
+ ref.FieldPath = fieldPath
+ return ref, nil
+}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go b/vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
index a90fab1fe..6b69f366e 100644
--- a/vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
+++ b/vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
@@ -25,22 +25,20 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/httpstream"
- "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/transport"
+ spdy "k8s.io/client-go/transport/spdy"
)
// StreamOptions holds information pertaining to the current streaming session: supported stream
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
- SupportedProtocols []string
- Stdin io.Reader
- Stdout io.Writer
- Stderr io.Writer
- Tty bool
- TerminalSizeQueue TerminalSizeQueue
+ Stdin io.Reader
+ Stdout io.Writer
+ Stderr io.Writer
+ Tty bool
+ TerminalSizeQueue TerminalSizeQueue
}
// Executor is an interface for transporting shell-style streams.
@@ -52,107 +50,73 @@ type Executor interface {
Stream(options StreamOptions) error
}
-// StreamExecutor supports the ability to dial an httpstream connection and the ability to
-// run a command line stream protocol over that dialer.
-type StreamExecutor interface {
- Executor
- httpstream.Dialer
+type streamCreator interface {
+ CreateStream(headers http.Header) (httpstream.Stream, error)
+}
+
+type streamProtocolHandler interface {
+ stream(conn streamCreator) error
}
// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
- upgrader httpstream.UpgradeRoundTripper
+ upgrader spdy.Upgrader
transport http.RoundTripper
- method string
- url *url.URL
+ method string
+ url *url.URL
+ protocols []string
}
-// NewExecutor connects to the provided server and upgrades the connection to
-// multiplexed bidirectional streams. The current implementation uses SPDY,
-// but this could be replaced with HTTP/2 once it's available, or something else.
-// TODO: the common code between this and portforward could be abstracted.
-func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) {
- tlsConfig, err := restclient.TLSConfigFor(config)
- if err != nil {
- return nil, err
- }
-
- upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
- wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
+// NewSPDYExecutor connects to the provided server and upgrades the connection to
+// multiplexed bidirectional streams.
+func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
+ wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, err
}
+ return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
+}
- return &streamExecutor{
- upgrader: upgradeRoundTripper,
- transport: wrapper,
- method: method,
- url: url,
- }, nil
+// NewSPDYExecutorForTransports connects to the provided server using the given transport,
+// upgrades the response using the given upgrader to multiplexed bidirectional streams.
+func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
+ return NewSPDYExecutorForProtocols(
+ transport, upgrader, method, url,
+ remotecommand.StreamProtocolV4Name,
+ remotecommand.StreamProtocolV3Name,
+ remotecommand.StreamProtocolV2Name,
+ remotecommand.StreamProtocolV1Name,
+ )
}
-// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
-// streams. This method takes a stream upgrader and an optional function that is invoked
-// to wrap the round tripper. This method may be used by clients that are lower level than
-// Kubernetes clients or need to provide their own upgrade round tripper.
-func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
- rt := http.RoundTripper(upgrader)
- if fn != nil {
- rt = fn(rt)
- }
+// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
+// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
+// callers should use NewSPDYExecutor or NewSPDYExecutorForTransports.
+func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
return &streamExecutor{
upgrader: upgrader,
- transport: rt,
+ transport: transport,
method: method,
url: url,
+ protocols: protocols,
}, nil
}
-// Dial opens a connection to a remote server and attempts to negotiate a SPDY
-// connection. Upon success, it returns the connection and the protocol
-// selected by the server.
-func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) {
- rt := transport.DebugWrappers(e.transport)
-
- // TODO the client probably shouldn't be created here, as it doesn't allow
- // flexibility to allow callers to configure it.
- client := &http.Client{Transport: rt}
-
+// Stream opens a protocol streamer to the server and streams until a client closes
+// the connection or the server disconnects.
+func (e *streamExecutor) Stream(options StreamOptions) error {
req, err := http.NewRequest(e.method, e.url.String(), nil)
if err != nil {
- return nil, "", fmt.Errorf("error creating request: %v", err)
+ return fmt.Errorf("error creating request: %v", err)
}
- for i := range protocols {
- req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
- }
-
- resp, err := client.Do(req)
- if err != nil {
- return nil, "", fmt.Errorf("error sending request: %v", err)
- }
- defer resp.Body.Close()
-
- conn, err := e.upgrader.NewConnection(resp)
- if err != nil {
- return nil, "", err
- }
-
- return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
-}
-
-type streamCreator interface {
- CreateStream(headers http.Header) (httpstream.Stream, error)
-}
-
-type streamProtocolHandler interface {
- stream(conn streamCreator) error
-}
-// Stream opens a protocol streamer to the server and streams until a client closes
-// the connection or the server disconnects.
-func (e *streamExecutor) Stream(options StreamOptions) error {
- conn, protocol, err := e.Dial(options.SupportedProtocols...)
+ conn, protocol, err := spdy.Negotiate(
+ e.upgrader,
+ &http.Client{Transport: e.transport},
+ req,
+ e.protocols...,
+ )
if err != nil {
return err
}
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/v1.go b/vendor/k8s.io/client-go/tools/remotecommand/v1.go
index 1db917c0b..92dad727f 100644
--- a/vendor/k8s.io/client-go/tools/remotecommand/v1.go
+++ b/vendor/k8s.io/client-go/tools/remotecommand/v1.go
@@ -23,8 +23,8 @@ import (
"net/http"
"github.com/golang/glog"
+ "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
- "k8s.io/client-go/pkg/api/v1"
)
// streamProtocolV1 implements the first version of the streaming exec & attach
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/v2.go b/vendor/k8s.io/client-go/tools/remotecommand/v2.go
index 95346a439..b74ae8de2 100644
--- a/vendor/k8s.io/client-go/tools/remotecommand/v2.go
+++ b/vendor/k8s.io/client-go/tools/remotecommand/v2.go
@@ -23,8 +23,8 @@ import (
"net/http"
"sync"
+ "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/client-go/pkg/api/v1"
)
// streamProtocolV2 implements version 2 of the streaming protocol for attach
diff --git a/vendor/k8s.io/client-go/tools/remotecommand/v3.go b/vendor/k8s.io/client-go/tools/remotecommand/v3.go
index 03b9e2a68..846dd24a5 100644
--- a/vendor/k8s.io/client-go/tools/remotecommand/v3.go
+++ b/vendor/k8s.io/client-go/tools/remotecommand/v3.go
@@ -22,8 +22,8 @@ import (
"net/http"
"sync"
+ "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/client-go/pkg/api/v1"
)
// streamProtocolV3 implements version 3 of the streaming protocol for attach