summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools/cache
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/tools/cache')
-rw-r--r--vendor/k8s.io/client-go/tools/cache/controller.go5
-rw-r--r--vendor/k8s.io/client-go/tools/cache/delta_fifo.go4
-rw-r--r--vendor/k8s.io/client-go/tools/cache/fifo.go4
-rw-r--r--vendor/k8s.io/client-go/tools/cache/heap.go323
-rw-r--r--vendor/k8s.io/client-go/tools/cache/listwatch.go32
-rw-r--r--vendor/k8s.io/client-go/tools/cache/mutation_cache.go2
-rw-r--r--vendor/k8s.io/client-go/tools/cache/mutation_detector.go34
-rw-r--r--vendor/k8s.io/client-go/tools/cache/reflector.go70
-rw-r--r--vendor/k8s.io/client-go/tools/cache/reflector_metrics.go119
-rw-r--r--vendor/k8s.io/client-go/tools/cache/shared_informer.go221
-rw-r--r--vendor/k8s.io/client-go/tools/cache/thread_safe_store.go10
11 files changed, 668 insertions, 156 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/controller.go b/vendor/k8s.io/client-go/tools/cache/controller.go
index 2c97b8658..e7b98befa 100644
--- a/vendor/k8s.io/client-go/tools/cache/controller.go
+++ b/vendor/k8s.io/client-go/tools/cache/controller.go
@@ -116,7 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflector = r
c.reflectorMutex.Unlock()
- r.RunUntil(stopCh)
+ var wg wait.Group
+ defer wg.Wait()
+
+ wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
diff --git a/vendor/k8s.io/client-go/tools/cache/delta_fifo.go b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
index a71db6048..f06d1c5b1 100644
--- a/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
+++ b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go
@@ -539,6 +539,10 @@ func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
+ if f.knownObjects == nil {
+ return nil
+ }
+
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
diff --git a/vendor/k8s.io/client-go/tools/cache/fifo.go b/vendor/k8s.io/client-go/tools/cache/fifo.go
index 3f6e2a948..e05c01ee2 100644
--- a/vendor/k8s.io/client-go/tools/cache/fifo.go
+++ b/vendor/k8s.io/client-go/tools/cache/fifo.go
@@ -59,7 +59,7 @@ type Queue interface {
// has since been added.
AddIfNotPresent(interface{}) error
- // Return true if the first batch of items has been popped
+ // HasSynced returns true if the first batch of items has been popped
HasSynced() bool
// Close queue
@@ -169,7 +169,7 @@ func (f *FIFO) AddIfNotPresent(obj interface{}) error {
return nil
}
-// addIfNotPresent assumes the fifo lock is already held and adds the the provided
+// addIfNotPresent assumes the fifo lock is already held and adds the provided
// item to the queue under id if it does not already exist.
func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
f.populated = true
diff --git a/vendor/k8s.io/client-go/tools/cache/heap.go b/vendor/k8s.io/client-go/tools/cache/heap.go
new file mode 100644
index 000000000..78e492455
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/heap.go
@@ -0,0 +1,323 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// This file implements a heap data structure.
+
+package cache
+
+import (
+ "container/heap"
+ "fmt"
+ "sync"
+)
+
+const (
+ closedMsg = "heap is closed"
+)
+
+type LessFunc func(interface{}, interface{}) bool
+type heapItem struct {
+ obj interface{} // The object which is stored in the heap.
+ index int // The index of the object's key in the Heap.queue.
+}
+
+type itemKeyValue struct {
+ key string
+ obj interface{}
+}
+
+// heapData is an internal struct that implements the standard heap interface
+// and keeps the data stored in the heap.
+type heapData struct {
+ // items is a map from key of the objects to the objects and their index.
+ // We depend on the property that items in the map are in the queue and vice versa.
+ items map[string]*heapItem
+ // queue implements a heap data structure and keeps the order of elements
+ // according to the heap invariant. The queue keeps the keys of objects stored
+ // in "items".
+ queue []string
+
+ // keyFunc is used to make the key used for queued item insertion and retrieval, and
+ // should be deterministic.
+ keyFunc KeyFunc
+ // lessFunc is used to compare two objects in the heap.
+ lessFunc LessFunc
+}
+
+var (
+ _ = heap.Interface(&heapData{}) // heapData is a standard heap
+)
+
+// Less compares two objects and returns true if the first one should go
+// in front of the second one in the heap.
+func (h *heapData) Less(i, j int) bool {
+ if i > len(h.queue) || j > len(h.queue) {
+ return false
+ }
+ itemi, ok := h.items[h.queue[i]]
+ if !ok {
+ return false
+ }
+ itemj, ok := h.items[h.queue[j]]
+ if !ok {
+ return false
+ }
+ return h.lessFunc(itemi.obj, itemj.obj)
+}
+
+// Len returns the number of items in the Heap.
+func (h *heapData) Len() int { return len(h.queue) }
+
+// Swap implements swapping of two elements in the heap. This is a part of standard
+// heap interface and should never be called directly.
+func (h *heapData) Swap(i, j int) {
+ h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
+ item := h.items[h.queue[i]]
+ item.index = i
+ item = h.items[h.queue[j]]
+ item.index = j
+}
+
+// Push is supposed to be called by heap.Push only.
+func (h *heapData) Push(kv interface{}) {
+ keyValue := kv.(*itemKeyValue)
+ n := len(h.queue)
+ h.items[keyValue.key] = &heapItem{keyValue.obj, n}
+ h.queue = append(h.queue, keyValue.key)
+}
+
+// Pop is supposed to be called by heap.Pop only.
+func (h *heapData) Pop() interface{} {
+ key := h.queue[len(h.queue)-1]
+ h.queue = h.queue[0 : len(h.queue)-1]
+ item, ok := h.items[key]
+ if !ok {
+ // This is an error
+ return nil
+ }
+ delete(h.items, key)
+ return item.obj
+}
+
+// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
+// It can be used to implement priority queues and similar data structures.
+type Heap struct {
+ lock sync.RWMutex
+ cond sync.Cond
+
+ // data stores objects and has a queue that keeps their ordering according
+ // to the heap invariant.
+ data *heapData
+
+ // closed indicates that the queue is closed.
+ // It is mainly used to let Pop() exit its control loop while waiting for an item.
+ closed bool
+}
+
+// Close the Heap and signals condition variables that may be waiting to pop
+// items from the heap.
+func (h *Heap) Close() {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ h.closed = true
+ h.cond.Broadcast()
+}
+
+// Add inserts an item, and puts it in the queue. The item is updated if it
+// already exists.
+func (h *Heap) Add(obj interface{}) error {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if h.closed {
+ return fmt.Errorf(closedMsg)
+ }
+ if _, exists := h.data.items[key]; exists {
+ h.data.items[key].obj = obj
+ heap.Fix(h.data, h.data.items[key].index)
+ } else {
+ h.addIfNotPresentLocked(key, obj)
+ }
+ h.cond.Broadcast()
+ return nil
+}
+
+// Adds all the items in the list to the queue and then signals the condition
+// variable. It is useful when the caller would like to add all of the items
+// to the queue before consumer starts processing them.
+func (h *Heap) BulkAdd(list []interface{}) error {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if h.closed {
+ return fmt.Errorf(closedMsg)
+ }
+ for _, obj := range list {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ if _, exists := h.data.items[key]; exists {
+ h.data.items[key].obj = obj
+ heap.Fix(h.data, h.data.items[key].index)
+ } else {
+ h.addIfNotPresentLocked(key, obj)
+ }
+ }
+ h.cond.Broadcast()
+ return nil
+}
+
+// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
+// the key is present in the map, no changes is made to the item.
+//
+// This is useful in a single producer/consumer scenario so that the consumer can
+// safely retry items without contending with the producer and potentially enqueueing
+// stale items.
+func (h *Heap) AddIfNotPresent(obj interface{}) error {
+ id, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if h.closed {
+ return fmt.Errorf(closedMsg)
+ }
+ h.addIfNotPresentLocked(id, obj)
+ h.cond.Broadcast()
+ return nil
+}
+
+// addIfNotPresentLocked assumes the lock is already held and adds the the provided
+// item to the queue if it does not already exist.
+func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
+ if _, exists := h.data.items[key]; exists {
+ return
+ }
+ heap.Push(h.data, &itemKeyValue{key, obj})
+}
+
+// Update is the same as Add in this implementation. When the item does not
+// exist, it is added.
+func (h *Heap) Update(obj interface{}) error {
+ return h.Add(obj)
+}
+
+// Delete removes an item.
+func (h *Heap) Delete(obj interface{}) error {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ if item, ok := h.data.items[key]; ok {
+ heap.Remove(h.data, item.index)
+ return nil
+ }
+ return fmt.Errorf("object not found")
+}
+
+// Pop waits until an item is ready. If multiple items are
+// ready, they are returned in the order given by Heap.data.lessFunc.
+func (h *Heap) Pop() (interface{}, error) {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ for len(h.data.queue) == 0 {
+ // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
+ // When Close() is called, the h.closed is set and the condition is broadcast,
+ // which causes this loop to continue and return from the Pop().
+ if h.closed {
+ return nil, fmt.Errorf("heap is closed")
+ }
+ h.cond.Wait()
+ }
+ obj := heap.Pop(h.data)
+ if obj != nil {
+ return obj, nil
+ } else {
+ return nil, fmt.Errorf("object was removed from heap data")
+ }
+}
+
+// List returns a list of all the items.
+func (h *Heap) List() []interface{} {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ list := make([]interface{}, 0, len(h.data.items))
+ for _, item := range h.data.items {
+ list = append(list, item.obj)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently in the Heap.
+func (h *Heap) ListKeys() []string {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ list := make([]string, 0, len(h.data.items))
+ for key := range h.data.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+// Get returns the requested item, or sets exists=false.
+func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
+ key, err := h.data.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return h.GetByKey(key)
+}
+
+// GetByKey returns the requested item, or sets exists=false.
+func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ item, exists := h.data.items[key]
+ if !exists {
+ return nil, false, nil
+ }
+ return item.obj, true, nil
+}
+
+// IsClosed returns true if the queue is closed.
+func (h *Heap) IsClosed() bool {
+ h.lock.RLock()
+ defer h.lock.RUnlock()
+ if h.closed {
+ return true
+ }
+ return false
+}
+
+// NewHeap returns a Heap which can be used to queue up items to process.
+func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
+ h := &Heap{
+ data: &heapData{
+ items: map[string]*heapItem{},
+ queue: []string{},
+ keyFunc: keyFn,
+ lessFunc: lessFn,
+ },
+ }
+ h.cond.L = &h.lock
+ return h
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/listwatch.go b/vendor/k8s.io/client-go/tools/cache/listwatch.go
index af01d4745..06657a3b0 100644
--- a/vendor/k8s.io/client-go/tools/cache/listwatch.go
+++ b/vendor/k8s.io/client-go/tools/cache/listwatch.go
@@ -19,12 +19,16 @@ package cache
import (
"time"
+ "golang.org/x/net/context"
+
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/pager"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
@@ -48,6 +52,8 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
+ // DisableChunking requests no chunking for this list watcher.
+ DisableChunking bool
}
// Getter interface knows how to access Get method from RESTClient.
@@ -57,22 +63,32 @@ type Getter interface {
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
+ optionsModifier := func(options *metav1.ListOptions) {
+ options.FieldSelector = fieldSelector.String()
+ }
+ return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
+}
+
+// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
+// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
+// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
+func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
+ optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
- FieldsSelectorParam(fieldSelector).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
+ optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
- FieldsSelectorParam(fieldSelector).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
@@ -87,6 +103,9 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
+ if !lw.DisableChunking {
+ return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
+ }
return lw.ListFunc(options)
}
@@ -95,6 +114,8 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
return lw.WatchFunc(options)
}
+// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
+// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
@@ -158,5 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
return nil, err
}
- return watch.Until(timeout, watchInterface, remainingConditions...)
+ evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
+ if err == watch.ErrWatchClosed {
+ // present a consistent error interface to callers
+ err = wait.ErrWaitTimeout
+ }
+ return evt, err
}
diff --git a/vendor/k8s.io/client-go/tools/cache/mutation_cache.go b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
index 0fa06bf77..cbb6434eb 100644
--- a/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
+++ b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
@@ -156,7 +156,7 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er
}
elements, err := fn(updated)
if err != nil {
- glog.V(4).Info("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
+ glog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
continue
}
for _, inIndex := range elements {
diff --git a/vendor/k8s.io/client-go/tools/cache/mutation_detector.go b/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
index cc6094ce4..8e6338a1b 100644
--- a/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
+++ b/vendor/k8s.io/client-go/tools/cache/mutation_detector.go
@@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
- "k8s.io/client-go/kubernetes/scheme"
)
var mutationDetectionEnabled = false
@@ -79,17 +78,15 @@ type cacheObj struct {
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
// we DON'T want protection from panics. If we're running this code, we want to die
- go func() {
- for {
- d.CompareObjects()
-
- select {
- case <-stopCh:
- return
- case <-time.After(d.period):
- }
+ for {
+ d.CompareObjects()
+
+ select {
+ case <-stopCh:
+ return
+ case <-time.After(d.period):
}
- }()
+ }
}
// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
@@ -98,18 +95,13 @@ func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
if _, ok := obj.(DeletedFinalStateUnknown); ok {
return
}
- if _, ok := obj.(runtime.Object); !ok {
- return
- }
+ if obj, ok := obj.(runtime.Object); ok {
+ copiedObj := obj.DeepCopyObject()
- copiedObj, err := scheme.Scheme.Copy(obj.(runtime.Object))
- if err != nil {
- return
+ d.lock.Lock()
+ defer d.lock.Unlock()
+ d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
}
-
- d.lock.Lock()
- defer d.lock.Unlock()
- d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
}
func (d *defaultCacheMutationDetector) CompareObjects() {
diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go
index 9a730610c..054a7373c 100644
--- a/vendor/k8s.io/client-go/tools/cache/reflector.go
+++ b/vendor/k8s.io/client-go/tools/cache/reflector.go
@@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"time"
@@ -48,6 +49,8 @@ import (
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
+ // metrics tracks basic metric information about the reflector
+ metrics *reflectorMetrics
// The type of object we expect to place in the store.
expectedType reflect.Type
@@ -96,10 +99,17 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
}
+// reflectorDisambiguator is used to disambiguate started reflectors.
+// initialized to an unstable value to ensure meaning isn't attributed to the suffix.
+var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
+
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
+ reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
r := &Reflector{
- name: name,
+ name: name,
+ // we need this to be unique per process (some names are still the same) but obvious who it belongs to
+ metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
@@ -110,6 +120,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
return r
}
+func makeValidPrometheusMetricLabel(in string) string {
+ // this isn't perfect, but it removes our common characters
+ return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
+}
+
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
@@ -182,21 +197,10 @@ func extractStackCreator() (string, int, bool) {
}
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
-// Run starts a goroutine and returns immediately.
-func (r *Reflector) Run() {
+// Run will exit when stopCh is closed.
+func (r *Reflector) Run(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
- go wait.Until(func() {
- if err := r.ListAndWatch(wait.NeverStop); err != nil {
- utilruntime.HandleError(err)
- }
- }, r.period, wait.NeverStop)
-}
-
-// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
-// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
-func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
- glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
- go wait.Until(func() {
+ wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
@@ -235,17 +239,18 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
var resourceVersion string
- resyncCh, cleanup := r.resyncChan()
- defer cleanup()
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
+ r.metrics.numberOfLists.Inc()
+ start := r.clock.Now()
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
+ r.metrics.listDuration.Observe(time.Since(start).Seconds())
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
@@ -255,6 +260,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
+ r.metrics.numberOfItemsInList.Observe(float64(len(items)))
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
@@ -264,6 +270,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
+ resyncCh, cleanup := r.resyncChan()
+ defer func() {
+ cleanup() // Call the last one written into cleanup
+ }()
for {
select {
case <-resyncCh:
@@ -285,14 +295,22 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}()
for {
- timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
+ // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
+ select {
+ case <-stopCh:
+ return nil
+ default:
+ }
+
+ timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
- TimeoutSeconds: &timemoutseconds,
+ TimeoutSeconds: &timeoutSeconds,
}
+ r.metrics.numberOfWatches.Inc()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
@@ -344,6 +362,11 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, err
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
+ // update metrics
+ defer func() {
+ r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
+ r.metrics.watchDuration.Observe(time.Since(start).Seconds())
+ }()
loop:
for {
@@ -399,8 +422,8 @@ loop:
watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
- glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
- return errors.New("very short watch")
+ r.metrics.numberOfShortWatches.Inc()
+ return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
return nil
@@ -418,4 +441,9 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
+
+ rv, err := strconv.Atoi(v)
+ if err == nil {
+ r.metrics.lastResourceVersion.Set(float64(rv))
+ }
}
diff --git a/vendor/k8s.io/client-go/tools/cache/reflector_metrics.go b/vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
new file mode 100644
index 000000000..0945e5c3a
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
@@ -0,0 +1,119 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// This file provides abstractions for setting the provider (e.g., prometheus)
+// of metrics.
+
+package cache
+
+import (
+ "sync"
+)
+
+// GaugeMetric represents a single numerical value that can arbitrarily go up
+// and down.
+type GaugeMetric interface {
+ Set(float64)
+}
+
+// CounterMetric represents a single numerical value that only ever
+// goes up.
+type CounterMetric interface {
+ Inc()
+}
+
+// SummaryMetric captures individual observations.
+type SummaryMetric interface {
+ Observe(float64)
+}
+
+type noopMetric struct{}
+
+func (noopMetric) Inc() {}
+func (noopMetric) Dec() {}
+func (noopMetric) Observe(float64) {}
+func (noopMetric) Set(float64) {}
+
+type reflectorMetrics struct {
+ numberOfLists CounterMetric
+ listDuration SummaryMetric
+ numberOfItemsInList SummaryMetric
+
+ numberOfWatches CounterMetric
+ numberOfShortWatches CounterMetric
+ watchDuration SummaryMetric
+ numberOfItemsInWatch SummaryMetric
+
+ lastResourceVersion GaugeMetric
+}
+
+// MetricsProvider generates various metrics used by the reflector.
+type MetricsProvider interface {
+ NewListsMetric(name string) CounterMetric
+ NewListDurationMetric(name string) SummaryMetric
+ NewItemsInListMetric(name string) SummaryMetric
+
+ NewWatchesMetric(name string) CounterMetric
+ NewShortWatchesMetric(name string) CounterMetric
+ NewWatchDurationMetric(name string) SummaryMetric
+ NewItemsInWatchMetric(name string) SummaryMetric
+
+ NewLastResourceVersionMetric(name string) GaugeMetric
+}
+
+type noopMetricsProvider struct{}
+
+func (noopMetricsProvider) NewListsMetric(name string) CounterMetric { return noopMetric{} }
+func (noopMetricsProvider) NewListDurationMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewItemsInListMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewWatchesMetric(name string) CounterMetric { return noopMetric{} }
+func (noopMetricsProvider) NewShortWatchesMetric(name string) CounterMetric { return noopMetric{} }
+func (noopMetricsProvider) NewWatchDurationMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewItemsInWatchMetric(name string) SummaryMetric { return noopMetric{} }
+func (noopMetricsProvider) NewLastResourceVersionMetric(name string) GaugeMetric {
+ return noopMetric{}
+}
+
+var metricsFactory = struct {
+ metricsProvider MetricsProvider
+ setProviders sync.Once
+}{
+ metricsProvider: noopMetricsProvider{},
+}
+
+func newReflectorMetrics(name string) *reflectorMetrics {
+ var ret *reflectorMetrics
+ if len(name) == 0 {
+ return ret
+ }
+ return &reflectorMetrics{
+ numberOfLists: metricsFactory.metricsProvider.NewListsMetric(name),
+ listDuration: metricsFactory.metricsProvider.NewListDurationMetric(name),
+ numberOfItemsInList: metricsFactory.metricsProvider.NewItemsInListMetric(name),
+ numberOfWatches: metricsFactory.metricsProvider.NewWatchesMetric(name),
+ numberOfShortWatches: metricsFactory.metricsProvider.NewShortWatchesMetric(name),
+ watchDuration: metricsFactory.metricsProvider.NewWatchDurationMetric(name),
+ numberOfItemsInWatch: metricsFactory.metricsProvider.NewItemsInWatchMetric(name),
+ lastResourceVersion: metricsFactory.metricsProvider.NewLastResourceVersionMetric(name),
+ }
+}
+
+// SetReflectorMetricsProvider sets the metrics provider
+func SetReflectorMetricsProvider(metricsProvider MetricsProvider) {
+ metricsFactory.setProviders.Do(func() {
+ metricsFactory.metricsProvider = metricsProvider
+ })
+}
diff --git a/vendor/k8s.io/client-go/tools/cache/shared_informer.go b/vendor/k8s.io/client-go/tools/cache/shared_informer.go
index a0dbbb697..f6ce07f7a 100644
--- a/vendor/k8s.io/client-go/tools/cache/shared_informer.go
+++ b/vendor/k8s.io/client-go/tools/cache/shared_informer.go
@@ -25,6 +25,8 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/util/buffer"
+ "k8s.io/client-go/util/retry"
"github.com/golang/glog"
)
@@ -92,8 +94,13 @@ func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEve
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func() bool
-// syncedPollPeriod controls how often you look at the status of your sync funcs
-const syncedPollPeriod = 100 * time.Millisecond
+const (
+ // syncedPollPeriod controls how often you look at the status of your sync funcs
+ syncedPollPeriod = 100 * time.Millisecond
+
+ // initialBufferSize is the initial number of event notifications that can be buffered.
+ initialBufferSize = 1024
+)
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
@@ -138,15 +145,12 @@ type sharedIndexInformer struct {
// clock allows for testability
clock clock.Clock
- started bool
- startedLock sync.Mutex
+ started, stopped bool
+ startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
- // stopCh is the channel used to stop the main Run process. We have to track it so that
- // late joiners can have a proper stop
- stopCh <-chan struct{}
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@@ -207,16 +211,20 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.started = true
}()
- s.stopCh = stopCh
- s.cacheMutationDetector.Run(stopCh)
- s.processor.run(stopCh)
- s.controller.Run(stopCh)
-}
+ // Separate stop channel because Processor should be stopped strictly after controller
+ processorStopCh := make(chan struct{})
+ var wg wait.Group
+ defer wg.Wait() // Wait for Processor to stop
+ defer close(processorStopCh) // Tell Processor to stop
+ wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
+ wg.StartWithChannel(processorStopCh, s.processor.run)
-func (s *sharedIndexInformer) isStarted() bool {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- return s.started
+ defer func() {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+ s.stopped = true // Don't want any new listeners
+ }()
+ s.controller.Run(stopCh)
}
func (s *sharedIndexInformer) HasSynced() bool {
@@ -287,6 +295,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.startedLock.Lock()
defer s.startedLock.Unlock()
+ if s.stopped {
+ glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
+ return
+ }
+
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
@@ -307,7 +320,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
}
- listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
+ listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started {
s.processor.addListener(listener)
@@ -322,14 +335,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
- s.processor.addListener(listener)
-
- go listener.run(s.stopCh)
- go listener.pop(s.stopCh)
-
- items := s.indexer.List()
- for i := range items {
- listener.add(addNotification{newObj: items[i]})
+ s.processor.addAndStartListener(listener)
+ for _, item := range s.indexer.List() {
+ listener.add(addNotification{newObj: item})
}
}
@@ -369,12 +377,26 @@ type sharedProcessor struct {
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
+ wg wait.Group
+}
+
+func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
+ p.listenersLock.Lock()
+ defer p.listenersLock.Unlock()
+
+ p.addListenerLocked(listener)
+ p.wg.Start(listener.run)
+ p.wg.Start(listener.pop)
}
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
+ p.addListenerLocked(listener)
+}
+
+func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
@@ -395,13 +417,21 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
+ func() {
+ p.listenersLock.RLock()
+ defer p.listenersLock.RUnlock()
+ for _, listener := range p.listeners {
+ p.wg.Start(listener.run)
+ p.wg.Start(listener.pop)
+ }
+ }()
+ <-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
-
for _, listener := range p.listeners {
- go listener.run(stopCh)
- go listener.pop(stopCh)
+ close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
+ p.wg.Wait() // Wait for all .pop() and .run() to stop
}
// shouldResync queries every listener to determine if any of them need a resync, based on each
@@ -437,21 +467,18 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
}
type processorListener struct {
- // lock/cond protects access to 'pendingNotifications'.
- lock sync.RWMutex
- cond sync.Cond
-
- // pendingNotifications is an unbounded slice that holds all notifications not yet distributed
- // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
- // added until we OOM.
- // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
- // we should try to do something better
- pendingNotifications []interface{}
-
nextCh chan interface{}
+ addCh chan interface{}
handler ResourceEventHandler
+ // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
+ // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
+ // added until we OOM.
+ // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
+ // we should try to do something better.
+ pendingNotifications buffer.RingGrowing
+
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
@@ -464,93 +491,85 @@ type processorListener struct {
resyncLock sync.Mutex
}
-func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
+func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
- pendingNotifications: []interface{}{},
nextCh: make(chan interface{}),
+ addCh: make(chan interface{}),
handler: handler,
+ pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
- ret.cond.L = &ret.lock
-
ret.determineNextResync(now)
return ret
}
func (p *processorListener) add(notification interface{}) {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- p.pendingNotifications = append(p.pendingNotifications, notification)
- p.cond.Broadcast()
+ p.addCh <- notification
}
-func (p *processorListener) pop(stopCh <-chan struct{}) {
+func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
+ defer close(p.nextCh) // Tell .run() to stop
+ var nextCh chan<- interface{}
+ var notification interface{}
for {
- blockingGet := func() (interface{}, bool) {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- for len(p.pendingNotifications) == 0 {
- // check if we're shutdown
- select {
- case <-stopCh:
- return nil, true
- default:
- }
- p.cond.Wait()
- }
-
- nt := p.pendingNotifications[0]
- p.pendingNotifications = p.pendingNotifications[1:]
- return nt, false
- }
-
- notification, stopped := blockingGet()
- if stopped {
- return
- }
-
select {
- case <-stopCh:
- return
- case p.nextCh <- notification:
+ case nextCh <- notification:
+ // Notification dispatched
+ var ok bool
+ notification, ok = p.pendingNotifications.ReadOne()
+ if !ok { // Nothing to pop
+ nextCh = nil // Disable this select case
+ }
+ case notificationToAdd, ok := <-p.addCh:
+ if !ok {
+ return
+ }
+ if notification == nil { // No notification to pop (and pendingNotifications is empty)
+ // Optimize the case - skip adding to pendingNotifications
+ notification = notificationToAdd
+ nextCh = p.nextCh
+ } else { // There is already a notification waiting to be dispatched
+ p.pendingNotifications.WriteOne(notificationToAdd)
+ }
}
}
}
-func (p *processorListener) run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
-
- for {
- var next interface{}
- select {
- case <-stopCh:
- func() {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.cond.Broadcast()
- }()
- return
- case next = <-p.nextCh:
- }
+func (p *processorListener) run() {
+ // this call blocks until the channel is closed. When a panic happens during the notification
+ // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
+ // the next notification will be attempted. This is usually better than the alternative of never
+ // delivering again.
+ stopCh := make(chan struct{})
+ wait.Until(func() {
+ // this gives us a few quick retries before a long pause and then a few more quick retries
+ err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
+ for next := range p.nextCh {
+ switch notification := next.(type) {
+ case updateNotification:
+ p.handler.OnUpdate(notification.oldObj, notification.newObj)
+ case addNotification:
+ p.handler.OnAdd(notification.newObj)
+ case deleteNotification:
+ p.handler.OnDelete(notification.oldObj)
+ default:
+ utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
+ }
+ }
+ // the only way to get here is if the p.nextCh is empty and closed
+ return true, nil
+ })
- switch notification := next.(type) {
- case updateNotification:
- p.handler.OnUpdate(notification.oldObj, notification.newObj)
- case addNotification:
- p.handler.OnAdd(notification.newObj)
- case deleteNotification:
- p.handler.OnDelete(notification.oldObj)
- default:
- utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
+ // the only way to get here is if the p.nextCh is empty and closed
+ if err == nil {
+ close(stopCh)
}
- }
+ }, 1*time.Minute, stopCh)
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
diff --git a/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go b/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
index 4eb350c43..1c201efb6 100644
--- a/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
+++ b/vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
@@ -241,7 +241,7 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
-func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
+func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
@@ -249,7 +249,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(newObj)
if err != nil {
- return err
+ panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
if index == nil {
@@ -266,16 +266,15 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
set.Insert(key)
}
}
- return nil
}
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
-func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
+func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(obj)
if err != nil {
- return err
+ panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
@@ -289,7 +288,6 @@ func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
}
}
}
- return nil
}
func (c *threadSafeMap) Resync() error {