summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools/cache/shared_informer.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/tools/cache/shared_informer.go')
-rw-r--r--vendor/k8s.io/client-go/tools/cache/shared_informer.go600
1 files changed, 0 insertions, 600 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/shared_informer.go b/vendor/k8s.io/client-go/tools/cache/shared_informer.go
deleted file mode 100644
index f6ce07f7a..000000000
--- a/vendor/k8s.io/client-go/tools/cache/shared_informer.go
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package cache
-
-import (
- "fmt"
- "sync"
- "time"
-
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/util/clock"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/util/buffer"
- "k8s.io/client-go/util/retry"
-
- "github.com/golang/glog"
-)
-
-// SharedInformer has a shared data cache and is capable of distributing notifications for changes
-// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
-// one behavior change compared to a standard Informer. When you receive a notification, the cache
-// will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
-// on the contents of the cache exactly matching the notification you've received in handler
-// functions. If there was a create, followed by a delete, the cache may NOT have your item. This
-// has advantages over the broadcaster since it allows us to share a common cache across many
-// controllers. Extending the broadcaster would have required us keep duplicate caches for each
-// watch.
-type SharedInformer interface {
- // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
- // period. Events to a single handler are delivered sequentially, but there is no coordination
- // between different handlers.
- AddEventHandler(handler ResourceEventHandler)
- // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
- // specified resync period. Events to a single handler are delivered sequentially, but there is
- // no coordination between different handlers.
- AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
- // GetStore returns the Store.
- GetStore() Store
- // GetController gives back a synthetic interface that "votes" to start the informer
- GetController() Controller
- // Run starts the shared informer, which will be stopped when stopCh is closed.
- Run(stopCh <-chan struct{})
- // HasSynced returns true if the shared informer's store has synced.
- HasSynced() bool
- // LastSyncResourceVersion is the resource version observed when last synced with the underlying
- // store. The value returned is not synchronized with access to the underlying store and is not
- // thread-safe.
- LastSyncResourceVersion() string
-}
-
-type SharedIndexInformer interface {
- SharedInformer
- // AddIndexers add indexers to the informer before it starts.
- AddIndexers(indexers Indexers) error
- GetIndexer() Indexer
-}
-
-// NewSharedInformer creates a new instance for the listwatcher.
-func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
- return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
-}
-
-// NewSharedIndexInformer creates a new instance for the listwatcher.
-func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
- realClock := &clock.RealClock{}
- sharedIndexInformer := &sharedIndexInformer{
- processor: &sharedProcessor{clock: realClock},
- indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
- listerWatcher: lw,
- objectType: objType,
- resyncCheckPeriod: defaultEventHandlerResyncPeriod,
- defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
- cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
- clock: realClock,
- }
- return sharedIndexInformer
-}
-
-// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
-type InformerSynced func() bool
-
-const (
- // syncedPollPeriod controls how often you look at the status of your sync funcs
- syncedPollPeriod = 100 * time.Millisecond
-
- // initialBufferSize is the initial number of event notifications that can be buffered.
- initialBufferSize = 1024
-)
-
-// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
-// if the controller should shutdown
-func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
- err := wait.PollUntil(syncedPollPeriod,
- func() (bool, error) {
- for _, syncFunc := range cacheSyncs {
- if !syncFunc() {
- return false, nil
- }
- }
- return true, nil
- },
- stopCh)
- if err != nil {
- glog.V(2).Infof("stop requested")
- return false
- }
-
- glog.V(4).Infof("caches populated")
- return true
-}
-
-type sharedIndexInformer struct {
- indexer Indexer
- controller Controller
-
- processor *sharedProcessor
- cacheMutationDetector CacheMutationDetector
-
- // This block is tracked to handle late initialization of the controller
- listerWatcher ListerWatcher
- objectType runtime.Object
-
- // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
- // shouldResync to check if any of our listeners need a resync.
- resyncCheckPeriod time.Duration
- // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
- // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
- // value).
- defaultEventHandlerResyncPeriod time.Duration
- // clock allows for testability
- clock clock.Clock
-
- started, stopped bool
- startedLock sync.Mutex
-
- // blockDeltas gives a way to stop all event distribution so that a late event handler
- // can safely join the shared informer.
- blockDeltas sync.Mutex
-}
-
-// dummyController hides the fact that a SharedInformer is different from a dedicated one
-// where a caller can `Run`. The run method is disconnected in this case, because higher
-// level logic will decide when to start the SharedInformer and related controller.
-// Because returning information back is always asynchronous, the legacy callers shouldn't
-// notice any change in behavior.
-type dummyController struct {
- informer *sharedIndexInformer
-}
-
-func (v *dummyController) Run(stopCh <-chan struct{}) {
-}
-
-func (v *dummyController) HasSynced() bool {
- return v.informer.HasSynced()
-}
-
-func (c *dummyController) LastSyncResourceVersion() string {
- return ""
-}
-
-type updateNotification struct {
- oldObj interface{}
- newObj interface{}
-}
-
-type addNotification struct {
- newObj interface{}
-}
-
-type deleteNotification struct {
- oldObj interface{}
-}
-
-func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
-
- fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
-
- cfg := &Config{
- Queue: fifo,
- ListerWatcher: s.listerWatcher,
- ObjectType: s.objectType,
- FullResyncPeriod: s.resyncCheckPeriod,
- RetryOnError: false,
- ShouldResync: s.processor.shouldResync,
-
- Process: s.HandleDeltas,
- }
-
- func() {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
-
- s.controller = New(cfg)
- s.controller.(*controller).clock = s.clock
- s.started = true
- }()
-
- // Separate stop channel because Processor should be stopped strictly after controller
- processorStopCh := make(chan struct{})
- var wg wait.Group
- defer wg.Wait() // Wait for Processor to stop
- defer close(processorStopCh) // Tell Processor to stop
- wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
- wg.StartWithChannel(processorStopCh, s.processor.run)
-
- defer func() {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- s.stopped = true // Don't want any new listeners
- }()
- s.controller.Run(stopCh)
-}
-
-func (s *sharedIndexInformer) HasSynced() bool {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
-
- if s.controller == nil {
- return false
- }
- return s.controller.HasSynced()
-}
-
-func (s *sharedIndexInformer) LastSyncResourceVersion() string {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
-
- if s.controller == nil {
- return ""
- }
- return s.controller.LastSyncResourceVersion()
-}
-
-func (s *sharedIndexInformer) GetStore() Store {
- return s.indexer
-}
-
-func (s *sharedIndexInformer) GetIndexer() Indexer {
- return s.indexer
-}
-
-func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
-
- if s.started {
- return fmt.Errorf("informer has already started")
- }
-
- return s.indexer.AddIndexers(indexers)
-}
-
-func (s *sharedIndexInformer) GetController() Controller {
- return &dummyController{informer: s}
-}
-
-func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
- s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
-}
-
-func determineResyncPeriod(desired, check time.Duration) time.Duration {
- if desired == 0 {
- return desired
- }
- if check == 0 {
- glog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
- return 0
- }
- if desired < check {
- glog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
- return check
- }
- return desired
-}
-
-const minimumResyncPeriod = 1 * time.Second
-
-func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
-
- if s.stopped {
- glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
- return
- }
-
- if resyncPeriod > 0 {
- if resyncPeriod < minimumResyncPeriod {
- glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
- resyncPeriod = minimumResyncPeriod
- }
-
- if resyncPeriod < s.resyncCheckPeriod {
- if s.started {
- glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
- resyncPeriod = s.resyncCheckPeriod
- } else {
- // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
- // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
- // accordingly
- s.resyncCheckPeriod = resyncPeriod
- s.processor.resyncCheckPeriodChanged(resyncPeriod)
- }
- }
- }
-
- listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
-
- if !s.started {
- s.processor.addListener(listener)
- return
- }
-
- // in order to safely join, we have to
- // 1. stop sending add/update/delete notifications
- // 2. do a list against the store
- // 3. send synthetic "Add" events to the new handler
- // 4. unblock
- s.blockDeltas.Lock()
- defer s.blockDeltas.Unlock()
-
- s.processor.addAndStartListener(listener)
- for _, item := range s.indexer.List() {
- listener.add(addNotification{newObj: item})
- }
-}
-
-func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
- s.blockDeltas.Lock()
- defer s.blockDeltas.Unlock()
-
- // from oldest to newest
- for _, d := range obj.(Deltas) {
- switch d.Type {
- case Sync, Added, Updated:
- isSync := d.Type == Sync
- s.cacheMutationDetector.AddObject(d.Object)
- if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
- if err := s.indexer.Update(d.Object); err != nil {
- return err
- }
- s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
- } else {
- if err := s.indexer.Add(d.Object); err != nil {
- return err
- }
- s.processor.distribute(addNotification{newObj: d.Object}, isSync)
- }
- case Deleted:
- if err := s.indexer.Delete(d.Object); err != nil {
- return err
- }
- s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
- }
- }
- return nil
-}
-
-type sharedProcessor struct {
- listenersLock sync.RWMutex
- listeners []*processorListener
- syncingListeners []*processorListener
- clock clock.Clock
- wg wait.Group
-}
-
-func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
- p.listenersLock.Lock()
- defer p.listenersLock.Unlock()
-
- p.addListenerLocked(listener)
- p.wg.Start(listener.run)
- p.wg.Start(listener.pop)
-}
-
-func (p *sharedProcessor) addListener(listener *processorListener) {
- p.listenersLock.Lock()
- defer p.listenersLock.Unlock()
-
- p.addListenerLocked(listener)
-}
-
-func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
- p.listeners = append(p.listeners, listener)
- p.syncingListeners = append(p.syncingListeners, listener)
-}
-
-func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
-
- if sync {
- for _, listener := range p.syncingListeners {
- listener.add(obj)
- }
- } else {
- for _, listener := range p.listeners {
- listener.add(obj)
- }
- }
-}
-
-func (p *sharedProcessor) run(stopCh <-chan struct{}) {
- func() {
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
- for _, listener := range p.listeners {
- p.wg.Start(listener.run)
- p.wg.Start(listener.pop)
- }
- }()
- <-stopCh
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
- for _, listener := range p.listeners {
- close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
- }
- p.wg.Wait() // Wait for all .pop() and .run() to stop
-}
-
-// shouldResync queries every listener to determine if any of them need a resync, based on each
-// listener's resyncPeriod.
-func (p *sharedProcessor) shouldResync() bool {
- p.listenersLock.Lock()
- defer p.listenersLock.Unlock()
-
- p.syncingListeners = []*processorListener{}
-
- resyncNeeded := false
- now := p.clock.Now()
- for _, listener := range p.listeners {
- // need to loop through all the listeners to see if they need to resync so we can prepare any
- // listeners that are going to be resyncing.
- if listener.shouldResync(now) {
- resyncNeeded = true
- p.syncingListeners = append(p.syncingListeners, listener)
- listener.determineNextResync(now)
- }
- }
- return resyncNeeded
-}
-
-func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
-
- for _, listener := range p.listeners {
- resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
- listener.setResyncPeriod(resyncPeriod)
- }
-}
-
-type processorListener struct {
- nextCh chan interface{}
- addCh chan interface{}
-
- handler ResourceEventHandler
-
- // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
- // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
- // added until we OOM.
- // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
- // we should try to do something better.
- pendingNotifications buffer.RingGrowing
-
- // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
- requestedResyncPeriod time.Duration
- // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
- // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
- // informer's overall resync check period.
- resyncPeriod time.Duration
- // nextResync is the earliest time the listener should get a full resync
- nextResync time.Time
- // resyncLock guards access to resyncPeriod and nextResync
- resyncLock sync.Mutex
-}
-
-func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
- ret := &processorListener{
- nextCh: make(chan interface{}),
- addCh: make(chan interface{}),
- handler: handler,
- pendingNotifications: *buffer.NewRingGrowing(bufferSize),
- requestedResyncPeriod: requestedResyncPeriod,
- resyncPeriod: resyncPeriod,
- }
-
- ret.determineNextResync(now)
-
- return ret
-}
-
-func (p *processorListener) add(notification interface{}) {
- p.addCh <- notification
-}
-
-func (p *processorListener) pop() {
- defer utilruntime.HandleCrash()
- defer close(p.nextCh) // Tell .run() to stop
-
- var nextCh chan<- interface{}
- var notification interface{}
- for {
- select {
- case nextCh <- notification:
- // Notification dispatched
- var ok bool
- notification, ok = p.pendingNotifications.ReadOne()
- if !ok { // Nothing to pop
- nextCh = nil // Disable this select case
- }
- case notificationToAdd, ok := <-p.addCh:
- if !ok {
- return
- }
- if notification == nil { // No notification to pop (and pendingNotifications is empty)
- // Optimize the case - skip adding to pendingNotifications
- notification = notificationToAdd
- nextCh = p.nextCh
- } else { // There is already a notification waiting to be dispatched
- p.pendingNotifications.WriteOne(notificationToAdd)
- }
- }
- }
-}
-
-func (p *processorListener) run() {
- // this call blocks until the channel is closed. When a panic happens during the notification
- // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
- // the next notification will be attempted. This is usually better than the alternative of never
- // delivering again.
- stopCh := make(chan struct{})
- wait.Until(func() {
- // this gives us a few quick retries before a long pause and then a few more quick retries
- err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
- for next := range p.nextCh {
- switch notification := next.(type) {
- case updateNotification:
- p.handler.OnUpdate(notification.oldObj, notification.newObj)
- case addNotification:
- p.handler.OnAdd(notification.newObj)
- case deleteNotification:
- p.handler.OnDelete(notification.oldObj)
- default:
- utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
- }
- }
- // the only way to get here is if the p.nextCh is empty and closed
- return true, nil
- })
-
- // the only way to get here is if the p.nextCh is empty and closed
- if err == nil {
- close(stopCh)
- }
- }, 1*time.Minute, stopCh)
-}
-
-// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
-// this always returns false.
-func (p *processorListener) shouldResync(now time.Time) bool {
- p.resyncLock.Lock()
- defer p.resyncLock.Unlock()
-
- if p.resyncPeriod == 0 {
- return false
- }
-
- return now.After(p.nextResync) || now.Equal(p.nextResync)
-}
-
-func (p *processorListener) determineNextResync(now time.Time) {
- p.resyncLock.Lock()
- defer p.resyncLock.Unlock()
-
- p.nextResync = now.Add(p.resyncPeriod)
-}
-
-func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
- p.resyncLock.Lock()
- defer p.resyncLock.Unlock()
-
- p.resyncPeriod = resyncPeriod
-}