diff options
Diffstat (limited to 'vendor/k8s.io/client-go/tools/cache/reflector.go')
-rw-r--r-- | vendor/k8s.io/client-go/tools/cache/reflector.go | 449 |
1 files changed, 0 insertions, 449 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go deleted file mode 100644 index 054a7373c..000000000 --- a/vendor/k8s.io/client-go/tools/cache/reflector.go +++ /dev/null @@ -1,449 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "errors" - "fmt" - "io" - "math/rand" - "net" - "net/url" - "reflect" - "regexp" - goruntime "runtime" - "runtime/debug" - "strconv" - "strings" - "sync" - "sync/atomic" - "syscall" - "time" - - "github.com/golang/glog" - apierrs "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/clock" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" -) - -// Reflector watches a specified resource and causes all changes to be reflected in the given store. -type Reflector struct { - // name identifies this reflector. By default it will be a file:line if possible. - name string - // metrics tracks basic metric information about the reflector - metrics *reflectorMetrics - - // The type of object we expect to place in the store. - expectedType reflect.Type - // The destination to sync up with the watch source - store Store - // listerWatcher is used to perform lists and watches. - listerWatcher ListerWatcher - // period controls timing between one watch ending and - // the beginning of the next one. - period time.Duration - resyncPeriod time.Duration - ShouldResync func() bool - // clock allows tests to manipulate time - clock clock.Clock - // lastSyncResourceVersion is the resource version token last - // observed when doing a sync with the underlying store - // it is thread safe, but not synchronized with the underlying store - lastSyncResourceVersion string - // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion - lastSyncResourceVersionMutex sync.RWMutex -} - -var ( - // We try to spread the load on apiserver by setting timeouts for - // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. - // However, it can be modified to avoid periodic resync to break the - // TCP connection. - minWatchTimeout = 5 * time.Minute -) - -// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector -// The indexer is configured to key on namespace -func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { - indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc}) - reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) - return indexer, reflector -} - -// NewReflector creates a new Reflector object which will keep the given store up to -// date with the server's contents for the given resource. Reflector promises to -// only put things in the store that have the type of expectedType, unless expectedType -// is nil. If resyncPeriod is non-zero, then lists will be executed after every -// resyncPeriod, so that you can use reflectors to periodically process everything as -// well as incrementally processing the things that change. -func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { - return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod) -} - -// reflectorDisambiguator is used to disambiguate started reflectors. -// initialized to an unstable value to ensure meaning isn't attributed to the suffix. -var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345) - -// NewNamedReflector same as NewReflector, but with a specified name for logging -func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { - reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) - r := &Reflector{ - name: name, - // we need this to be unique per process (some names are still the same) but obvious who it belongs to - metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))), - listerWatcher: lw, - store: store, - expectedType: reflect.TypeOf(expectedType), - period: time.Second, - resyncPeriod: resyncPeriod, - clock: &clock.RealClock{}, - } - return r -} - -func makeValidPrometheusMetricLabel(in string) string { - // this isn't perfect, but it removes our common characters - return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in) -} - -// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common -// call chains to NewReflector, so they'd be low entropy names for reflectors -var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"} - -// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages -// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging -func getDefaultReflectorName(ignoredPackages ...string) string { - name := "????" - const maxStack = 10 - for i := 1; i < maxStack; i++ { - _, file, line, ok := goruntime.Caller(i) - if !ok { - file, line, ok = extractStackCreator() - if !ok { - break - } - i += maxStack - } - if hasPackage(file, ignoredPackages) { - continue - } - - file = trimPackagePrefix(file) - name = fmt.Sprintf("%s:%d", file, line) - break - } - return name -} - -// hasPackage returns true if the file is in one of the ignored packages. -func hasPackage(file string, ignoredPackages []string) bool { - for _, ignoredPackage := range ignoredPackages { - if strings.Contains(file, ignoredPackage) { - return true - } - } - return false -} - -// trimPackagePrefix reduces duplicate values off the front of a package name. -func trimPackagePrefix(file string) string { - if l := strings.LastIndex(file, "k8s.io/client-go/pkg/"); l >= 0 { - return file[l+len("k8s.io/client-go/"):] - } - if l := strings.LastIndex(file, "/src/"); l >= 0 { - return file[l+5:] - } - if l := strings.LastIndex(file, "/pkg/"); l >= 0 { - return file[l+1:] - } - return file -} - -var stackCreator = regexp.MustCompile(`(?m)^created by (.*)\n\s+(.*):(\d+) \+0x[[:xdigit:]]+$`) - -// extractStackCreator retrieves the goroutine file and line that launched this stack. Returns false -// if the creator cannot be located. -// TODO: Go does not expose this via runtime https://github.com/golang/go/issues/11440 -func extractStackCreator() (string, int, bool) { - stack := debug.Stack() - matches := stackCreator.FindStringSubmatch(string(stack)) - if matches == nil || len(matches) != 4 { - return "", 0, false - } - line, err := strconv.Atoi(matches[3]) - if err != nil { - return "", 0, false - } - return matches[2], line, true -} - -// Run starts a watch and handles watch events. Will restart the watch if it is closed. -// Run will exit when stopCh is closed. -func (r *Reflector) Run(stopCh <-chan struct{}) { - glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) - wait.Until(func() { - if err := r.ListAndWatch(stopCh); err != nil { - utilruntime.HandleError(err) - } - }, r.period, stopCh) -} - -var ( - // nothing will ever be sent down this channel - neverExitWatch <-chan time.Time = make(chan time.Time) - - // Used to indicate that watching stopped so that a resync could happen. - errorResyncRequested = errors.New("resync channel fired") - - // Used to indicate that watching stopped because of a signal from the stop - // channel passed in from a client of the reflector. - errorStopRequested = errors.New("Stop requested") -) - -// resyncChan returns a channel which will receive something when a resync is -// required, and a cleanup function. -func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { - if r.resyncPeriod == 0 { - return neverExitWatch, func() bool { return false } - } - // The cleanup function is required: imagine the scenario where watches - // always fail so we end up listing frequently. Then, if we don't - // manually stop the timer, we could end up with many timers active - // concurrently. - t := r.clock.NewTimer(r.resyncPeriod) - return t.C(), t.Stop -} - -// ListAndWatch first lists all items and get the resource version at the moment of call, -// and then use the resource version to watch. -// It returns error if ListAndWatch didn't even try to initialize watch. -func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { - glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name) - var resourceVersion string - - // Explicitly set "0" as resource version - it's fine for the List() - // to be served from cache and potentially be delayed relative to - // etcd contents. Reflector framework will catch up via Watch() eventually. - options := metav1.ListOptions{ResourceVersion: "0"} - r.metrics.numberOfLists.Inc() - start := r.clock.Now() - list, err := r.listerWatcher.List(options) - if err != nil { - return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) - } - r.metrics.listDuration.Observe(time.Since(start).Seconds()) - listMetaInterface, err := meta.ListAccessor(list) - if err != nil { - return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) - } - resourceVersion = listMetaInterface.GetResourceVersion() - items, err := meta.ExtractList(list) - if err != nil { - return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) - } - r.metrics.numberOfItemsInList.Observe(float64(len(items))) - if err := r.syncWith(items, resourceVersion); err != nil { - return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) - } - r.setLastSyncResourceVersion(resourceVersion) - - resyncerrc := make(chan error, 1) - cancelCh := make(chan struct{}) - defer close(cancelCh) - go func() { - resyncCh, cleanup := r.resyncChan() - defer func() { - cleanup() // Call the last one written into cleanup - }() - for { - select { - case <-resyncCh: - case <-stopCh: - return - case <-cancelCh: - return - } - if r.ShouldResync == nil || r.ShouldResync() { - glog.V(4).Infof("%s: forcing resync", r.name) - if err := r.store.Resync(); err != nil { - resyncerrc <- err - return - } - } - cleanup() - resyncCh, cleanup = r.resyncChan() - } - }() - - for { - // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors - select { - case <-stopCh: - return nil - default: - } - - timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) - options = metav1.ListOptions{ - ResourceVersion: resourceVersion, - // We want to avoid situations of hanging watchers. Stop any wachers that do not - // receive any events within the timeout window. - TimeoutSeconds: &timeoutSeconds, - } - - r.metrics.numberOfWatches.Inc() - w, err := r.listerWatcher.Watch(options) - if err != nil { - switch err { - case io.EOF: - // watch closed normally - case io.ErrUnexpectedEOF: - glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) - default: - utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) - } - // If this is "connection refused" error, it means that most likely apiserver is not responsive. - // It doesn't make sense to re-list all objects because most likely we will be able to restart - // watch where we ended. - // If that's the case wait and resend watch request. - if urlError, ok := err.(*url.Error); ok { - if opError, ok := urlError.Err.(*net.OpError); ok { - if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { - time.Sleep(time.Second) - continue - } - } - } - return nil - } - - if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { - if err != errorStopRequested { - glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) - } - return nil - } - } -} - -// syncWith replaces the store's items with the given list. -func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { - found := make([]interface{}, 0, len(items)) - for _, item := range items { - found = append(found, item) - } - return r.store.Replace(found, resourceVersion) -} - -// watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { - start := r.clock.Now() - eventCount := 0 - - // Stopping the watcher should be idempotent and if we return from this function there's no way - // we're coming back in with the same watch interface. - defer w.Stop() - // update metrics - defer func() { - r.metrics.numberOfItemsInWatch.Observe(float64(eventCount)) - r.metrics.watchDuration.Observe(time.Since(start).Seconds()) - }() - -loop: - for { - select { - case <-stopCh: - return errorStopRequested - case err := <-errc: - return err - case event, ok := <-w.ResultChan(): - if !ok { - break loop - } - if event.Type == watch.Error { - return apierrs.FromObject(event.Object) - } - if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { - utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) - continue - } - meta, err := meta.Accessor(event.Object) - if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) - continue - } - newResourceVersion := meta.GetResourceVersion() - switch event.Type { - case watch.Added: - err := r.store.Add(event.Object) - if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) - } - case watch.Modified: - err := r.store.Update(event.Object) - if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) - } - case watch.Deleted: - // TODO: Will any consumers need access to the "last known - // state", which is passed in event.Object? If so, may need - // to change this. - err := r.store.Delete(event.Object) - if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) - } - default: - utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) - } - *resourceVersion = newResourceVersion - r.setLastSyncResourceVersion(newResourceVersion) - eventCount++ - } - } - - watchDuration := r.clock.Now().Sub(start) - if watchDuration < 1*time.Second && eventCount == 0 { - r.metrics.numberOfShortWatches.Inc() - return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) - } - glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) - return nil -} - -// LastSyncResourceVersion is the resource version observed when last sync with the underlying store -// The value returned is not synchronized with access to the underlying store and is not thread-safe -func (r *Reflector) LastSyncResourceVersion() string { - r.lastSyncResourceVersionMutex.RLock() - defer r.lastSyncResourceVersionMutex.RUnlock() - return r.lastSyncResourceVersion -} - -func (r *Reflector) setLastSyncResourceVersion(v string) { - r.lastSyncResourceVersionMutex.Lock() - defer r.lastSyncResourceVersionMutex.Unlock() - r.lastSyncResourceVersion = v - - rv, err := strconv.Atoi(v) - if err == nil { - r.metrics.lastResourceVersion.Set(float64(rv)) - } -} |