diff options
Diffstat (limited to 'vendor/k8s.io/client-go/tools/cache/shared_informer.go')
-rw-r--r-- | vendor/k8s.io/client-go/tools/cache/shared_informer.go | 600 |
1 files changed, 0 insertions, 600 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/shared_informer.go b/vendor/k8s.io/client-go/tools/cache/shared_informer.go deleted file mode 100644 index f6ce07f7a..000000000 --- a/vendor/k8s.io/client-go/tools/cache/shared_informer.go +++ /dev/null @@ -1,600 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "fmt" - "sync" - "time" - - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/clock" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/buffer" - "k8s.io/client-go/util/retry" - - "github.com/golang/glog" -) - -// SharedInformer has a shared data cache and is capable of distributing notifications for changes -// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is -// one behavior change compared to a standard Informer. When you receive a notification, the cache -// will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend -// on the contents of the cache exactly matching the notification you've received in handler -// functions. If there was a create, followed by a delete, the cache may NOT have your item. This -// has advantages over the broadcaster since it allows us to share a common cache across many -// controllers. Extending the broadcaster would have required us keep duplicate caches for each -// watch. -type SharedInformer interface { - // AddEventHandler adds an event handler to the shared informer using the shared informer's resync - // period. Events to a single handler are delivered sequentially, but there is no coordination - // between different handlers. - AddEventHandler(handler ResourceEventHandler) - // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the - // specified resync period. Events to a single handler are delivered sequentially, but there is - // no coordination between different handlers. - AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) - // GetStore returns the Store. - GetStore() Store - // GetController gives back a synthetic interface that "votes" to start the informer - GetController() Controller - // Run starts the shared informer, which will be stopped when stopCh is closed. - Run(stopCh <-chan struct{}) - // HasSynced returns true if the shared informer's store has synced. - HasSynced() bool - // LastSyncResourceVersion is the resource version observed when last synced with the underlying - // store. The value returned is not synchronized with access to the underlying store and is not - // thread-safe. - LastSyncResourceVersion() string -} - -type SharedIndexInformer interface { - SharedInformer - // AddIndexers add indexers to the informer before it starts. - AddIndexers(indexers Indexers) error - GetIndexer() Indexer -} - -// NewSharedInformer creates a new instance for the listwatcher. -func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { - return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) -} - -// NewSharedIndexInformer creates a new instance for the listwatcher. -func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { - realClock := &clock.RealClock{} - sharedIndexInformer := &sharedIndexInformer{ - processor: &sharedProcessor{clock: realClock}, - indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), - listerWatcher: lw, - objectType: objType, - resyncCheckPeriod: defaultEventHandlerResyncPeriod, - defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, - cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), - clock: realClock, - } - return sharedIndexInformer -} - -// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. -type InformerSynced func() bool - -const ( - // syncedPollPeriod controls how often you look at the status of your sync funcs - syncedPollPeriod = 100 * time.Millisecond - - // initialBufferSize is the initial number of event notifications that can be buffered. - initialBufferSize = 1024 -) - -// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false -// if the controller should shutdown -func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { - err := wait.PollUntil(syncedPollPeriod, - func() (bool, error) { - for _, syncFunc := range cacheSyncs { - if !syncFunc() { - return false, nil - } - } - return true, nil - }, - stopCh) - if err != nil { - glog.V(2).Infof("stop requested") - return false - } - - glog.V(4).Infof("caches populated") - return true -} - -type sharedIndexInformer struct { - indexer Indexer - controller Controller - - processor *sharedProcessor - cacheMutationDetector CacheMutationDetector - - // This block is tracked to handle late initialization of the controller - listerWatcher ListerWatcher - objectType runtime.Object - - // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call - // shouldResync to check if any of our listeners need a resync. - resyncCheckPeriod time.Duration - // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via - // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default - // value). - defaultEventHandlerResyncPeriod time.Duration - // clock allows for testability - clock clock.Clock - - started, stopped bool - startedLock sync.Mutex - - // blockDeltas gives a way to stop all event distribution so that a late event handler - // can safely join the shared informer. - blockDeltas sync.Mutex -} - -// dummyController hides the fact that a SharedInformer is different from a dedicated one -// where a caller can `Run`. The run method is disconnected in this case, because higher -// level logic will decide when to start the SharedInformer and related controller. -// Because returning information back is always asynchronous, the legacy callers shouldn't -// notice any change in behavior. -type dummyController struct { - informer *sharedIndexInformer -} - -func (v *dummyController) Run(stopCh <-chan struct{}) { -} - -func (v *dummyController) HasSynced() bool { - return v.informer.HasSynced() -} - -func (c *dummyController) LastSyncResourceVersion() string { - return "" -} - -type updateNotification struct { - oldObj interface{} - newObj interface{} -} - -type addNotification struct { - newObj interface{} -} - -type deleteNotification struct { - oldObj interface{} -} - -func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer) - - cfg := &Config{ - Queue: fifo, - ListerWatcher: s.listerWatcher, - ObjectType: s.objectType, - FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, - ShouldResync: s.processor.shouldResync, - - Process: s.HandleDeltas, - } - - func() { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - s.controller = New(cfg) - s.controller.(*controller).clock = s.clock - s.started = true - }() - - // Separate stop channel because Processor should be stopped strictly after controller - processorStopCh := make(chan struct{}) - var wg wait.Group - defer wg.Wait() // Wait for Processor to stop - defer close(processorStopCh) // Tell Processor to stop - wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) - wg.StartWithChannel(processorStopCh, s.processor.run) - - defer func() { - s.startedLock.Lock() - defer s.startedLock.Unlock() - s.stopped = true // Don't want any new listeners - }() - s.controller.Run(stopCh) -} - -func (s *sharedIndexInformer) HasSynced() bool { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.controller == nil { - return false - } - return s.controller.HasSynced() -} - -func (s *sharedIndexInformer) LastSyncResourceVersion() string { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.controller == nil { - return "" - } - return s.controller.LastSyncResourceVersion() -} - -func (s *sharedIndexInformer) GetStore() Store { - return s.indexer -} - -func (s *sharedIndexInformer) GetIndexer() Indexer { - return s.indexer -} - -func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.started { - return fmt.Errorf("informer has already started") - } - - return s.indexer.AddIndexers(indexers) -} - -func (s *sharedIndexInformer) GetController() Controller { - return &dummyController{informer: s} -} - -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { - s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) -} - -func determineResyncPeriod(desired, check time.Duration) time.Duration { - if desired == 0 { - return desired - } - if check == 0 { - glog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) - return 0 - } - if desired < check { - glog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) - return check - } - return desired -} - -const minimumResyncPeriod = 1 * time.Second - -func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.stopped { - glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) - return - } - - if resyncPeriod > 0 { - if resyncPeriod < minimumResyncPeriod { - glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod) - resyncPeriod = minimumResyncPeriod - } - - if resyncPeriod < s.resyncCheckPeriod { - if s.started { - glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) - resyncPeriod = s.resyncCheckPeriod - } else { - // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update - // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners - // accordingly - s.resyncCheckPeriod = resyncPeriod - s.processor.resyncCheckPeriodChanged(resyncPeriod) - } - } - } - - listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) - - if !s.started { - s.processor.addListener(listener) - return - } - - // in order to safely join, we have to - // 1. stop sending add/update/delete notifications - // 2. do a list against the store - // 3. send synthetic "Add" events to the new handler - // 4. unblock - s.blockDeltas.Lock() - defer s.blockDeltas.Unlock() - - s.processor.addAndStartListener(listener) - for _, item := range s.indexer.List() { - listener.add(addNotification{newObj: item}) - } -} - -func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { - s.blockDeltas.Lock() - defer s.blockDeltas.Unlock() - - // from oldest to newest - for _, d := range obj.(Deltas) { - switch d.Type { - case Sync, Added, Updated: - isSync := d.Type == Sync - s.cacheMutationDetector.AddObject(d.Object) - if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { - if err := s.indexer.Update(d.Object); err != nil { - return err - } - s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) - } else { - if err := s.indexer.Add(d.Object); err != nil { - return err - } - s.processor.distribute(addNotification{newObj: d.Object}, isSync) - } - case Deleted: - if err := s.indexer.Delete(d.Object); err != nil { - return err - } - s.processor.distribute(deleteNotification{oldObj: d.Object}, false) - } - } - return nil -} - -type sharedProcessor struct { - listenersLock sync.RWMutex - listeners []*processorListener - syncingListeners []*processorListener - clock clock.Clock - wg wait.Group -} - -func (p *sharedProcessor) addAndStartListener(listener *processorListener) { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - - p.addListenerLocked(listener) - p.wg.Start(listener.run) - p.wg.Start(listener.pop) -} - -func (p *sharedProcessor) addListener(listener *processorListener) { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - - p.addListenerLocked(listener) -} - -func (p *sharedProcessor) addListenerLocked(listener *processorListener) { - p.listeners = append(p.listeners, listener) - p.syncingListeners = append(p.syncingListeners, listener) -} - -func (p *sharedProcessor) distribute(obj interface{}, sync bool) { - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - - if sync { - for _, listener := range p.syncingListeners { - listener.add(obj) - } - } else { - for _, listener := range p.listeners { - listener.add(obj) - } - } -} - -func (p *sharedProcessor) run(stopCh <-chan struct{}) { - func() { - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { - p.wg.Start(listener.run) - p.wg.Start(listener.pop) - } - }() - <-stopCh - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { - close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop - } - p.wg.Wait() // Wait for all .pop() and .run() to stop -} - -// shouldResync queries every listener to determine if any of them need a resync, based on each -// listener's resyncPeriod. -func (p *sharedProcessor) shouldResync() bool { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - - p.syncingListeners = []*processorListener{} - - resyncNeeded := false - now := p.clock.Now() - for _, listener := range p.listeners { - // need to loop through all the listeners to see if they need to resync so we can prepare any - // listeners that are going to be resyncing. - if listener.shouldResync(now) { - resyncNeeded = true - p.syncingListeners = append(p.syncingListeners, listener) - listener.determineNextResync(now) - } - } - return resyncNeeded -} - -func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - - for _, listener := range p.listeners { - resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) - listener.setResyncPeriod(resyncPeriod) - } -} - -type processorListener struct { - nextCh chan interface{} - addCh chan interface{} - - handler ResourceEventHandler - - // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. - // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications - // added until we OOM. - // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but - // we should try to do something better. - pendingNotifications buffer.RingGrowing - - // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer - requestedResyncPeriod time.Duration - // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This - // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the - // informer's overall resync check period. - resyncPeriod time.Duration - // nextResync is the earliest time the listener should get a full resync - nextResync time.Time - // resyncLock guards access to resyncPeriod and nextResync - resyncLock sync.Mutex -} - -func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { - ret := &processorListener{ - nextCh: make(chan interface{}), - addCh: make(chan interface{}), - handler: handler, - pendingNotifications: *buffer.NewRingGrowing(bufferSize), - requestedResyncPeriod: requestedResyncPeriod, - resyncPeriod: resyncPeriod, - } - - ret.determineNextResync(now) - - return ret -} - -func (p *processorListener) add(notification interface{}) { - p.addCh <- notification -} - -func (p *processorListener) pop() { - defer utilruntime.HandleCrash() - defer close(p.nextCh) // Tell .run() to stop - - var nextCh chan<- interface{} - var notification interface{} - for { - select { - case nextCh <- notification: - // Notification dispatched - var ok bool - notification, ok = p.pendingNotifications.ReadOne() - if !ok { // Nothing to pop - nextCh = nil // Disable this select case - } - case notificationToAdd, ok := <-p.addCh: - if !ok { - return - } - if notification == nil { // No notification to pop (and pendingNotifications is empty) - // Optimize the case - skip adding to pendingNotifications - notification = notificationToAdd - nextCh = p.nextCh - } else { // There is already a notification waiting to be dispatched - p.pendingNotifications.WriteOne(notificationToAdd) - } - } - } -} - -func (p *processorListener) run() { - // this call blocks until the channel is closed. When a panic happens during the notification - // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) - // the next notification will be attempted. This is usually better than the alternative of never - // delivering again. - stopCh := make(chan struct{}) - wait.Until(func() { - // this gives us a few quick retries before a long pause and then a few more quick retries - err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { - for next := range p.nextCh { - switch notification := next.(type) { - case updateNotification: - p.handler.OnUpdate(notification.oldObj, notification.newObj) - case addNotification: - p.handler.OnAdd(notification.newObj) - case deleteNotification: - p.handler.OnDelete(notification.oldObj) - default: - utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) - } - } - // the only way to get here is if the p.nextCh is empty and closed - return true, nil - }) - - // the only way to get here is if the p.nextCh is empty and closed - if err == nil { - close(stopCh) - } - }, 1*time.Minute, stopCh) -} - -// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, -// this always returns false. -func (p *processorListener) shouldResync(now time.Time) bool { - p.resyncLock.Lock() - defer p.resyncLock.Unlock() - - if p.resyncPeriod == 0 { - return false - } - - return now.After(p.nextResync) || now.Equal(p.nextResync) -} - -func (p *processorListener) determineNextResync(now time.Time) { - p.resyncLock.Lock() - defer p.resyncLock.Unlock() - - p.nextResync = now.Add(p.resyncPeriod) -} - -func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) { - p.resyncLock.Lock() - defer p.resyncLock.Unlock() - - p.resyncPeriod = resyncPeriod -} |