summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools/cache/fifo.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/tools/cache/fifo.go')
-rw-r--r--vendor/k8s.io/client-go/tools/cache/fifo.go358
1 files changed, 0 insertions, 358 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/fifo.go b/vendor/k8s.io/client-go/tools/cache/fifo.go
deleted file mode 100644
index e05c01ee2..000000000
--- a/vendor/k8s.io/client-go/tools/cache/fifo.go
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
-Copyright 2014 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package cache
-
-import (
- "errors"
- "sync"
-
- "k8s.io/apimachinery/pkg/util/sets"
-)
-
-// PopProcessFunc is passed to Pop() method of Queue interface.
-// It is supposed to process the element popped from the queue.
-type PopProcessFunc func(interface{}) error
-
-// ErrRequeue may be returned by a PopProcessFunc to safely requeue
-// the current item. The value of Err will be returned from Pop.
-type ErrRequeue struct {
- // Err is returned by the Pop function
- Err error
-}
-
-var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
-
-func (e ErrRequeue) Error() string {
- if e.Err == nil {
- return "the popped item should be requeued without returning an error"
- }
- return e.Err.Error()
-}
-
-// Queue is exactly like a Store, but has a Pop() method too.
-type Queue interface {
- Store
-
- // Pop blocks until it has something to process.
- // It returns the object that was process and the result of processing.
- // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
- // should be requeued before releasing the lock on the queue.
- Pop(PopProcessFunc) (interface{}, error)
-
- // AddIfNotPresent adds a value previously
- // returned by Pop back into the queue as long
- // as nothing else (presumably more recent)
- // has since been added.
- AddIfNotPresent(interface{}) error
-
- // HasSynced returns true if the first batch of items has been popped
- HasSynced() bool
-
- // Close queue
- Close()
-}
-
-// Helper function for popping from Queue.
-// WARNING: Do NOT use this function in non-test code to avoid races
-// unless you really really really really know what you are doing.
-func Pop(queue Queue) interface{} {
- var result interface{}
- queue.Pop(func(obj interface{}) error {
- result = obj
- return nil
- })
- return result
-}
-
-// FIFO receives adds and updates from a Reflector, and puts them in a queue for
-// FIFO order processing. If multiple adds/updates of a single item happen while
-// an item is in the queue before it has been processed, it will only be
-// processed once, and when it is processed, the most recent version will be
-// processed. This can't be done with a channel.
-//
-// FIFO solves this use case:
-// * You want to process every object (exactly) once.
-// * You want to process the most recent version of the object when you process it.
-// * You do not want to process deleted objects, they should be removed from the queue.
-// * You do not want to periodically reprocess objects.
-// Compare with DeltaFIFO for other use cases.
-type FIFO struct {
- lock sync.RWMutex
- cond sync.Cond
- // We depend on the property that items in the set are in the queue and vice versa.
- items map[string]interface{}
- queue []string
-
- // populated is true if the first batch of items inserted by Replace() has been populated
- // or Delete/Add/Update was called first.
- populated bool
- // initialPopulationCount is the number of items inserted by the first call of Replace()
- initialPopulationCount int
-
- // keyFunc is used to make the key used for queued item insertion and retrieval, and
- // should be deterministic.
- keyFunc KeyFunc
-
- // Indication the queue is closed.
- // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
- // Currently, not used to gate any of CRED operations.
- closed bool
- closedLock sync.Mutex
-}
-
-var (
- _ = Queue(&FIFO{}) // FIFO is a Queue
-)
-
-// Close the queue.
-func (f *FIFO) Close() {
- f.closedLock.Lock()
- defer f.closedLock.Unlock()
- f.closed = true
- f.cond.Broadcast()
-}
-
-// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
-// or an Update called first but the first batch of items inserted by Replace() has been popped
-func (f *FIFO) HasSynced() bool {
- f.lock.Lock()
- defer f.lock.Unlock()
- return f.populated && f.initialPopulationCount == 0
-}
-
-// Add inserts an item, and puts it in the queue. The item is only enqueued
-// if it doesn't already exist in the set.
-func (f *FIFO) Add(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- if _, exists := f.items[id]; !exists {
- f.queue = append(f.queue, id)
- }
- f.items[id] = obj
- f.cond.Broadcast()
- return nil
-}
-
-// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
-// present in the set, it is neither enqueued nor added to the set.
-//
-// This is useful in a single producer/consumer scenario so that the consumer can
-// safely retry items without contending with the producer and potentially enqueueing
-// stale items.
-func (f *FIFO) AddIfNotPresent(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.addIfNotPresent(id, obj)
- return nil
-}
-
-// addIfNotPresent assumes the fifo lock is already held and adds the provided
-// item to the queue under id if it does not already exist.
-func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
- f.populated = true
- if _, exists := f.items[id]; exists {
- return
- }
-
- f.queue = append(f.queue, id)
- f.items[id] = obj
- f.cond.Broadcast()
-}
-
-// Update is the same as Add in this implementation.
-func (f *FIFO) Update(obj interface{}) error {
- return f.Add(obj)
-}
-
-// Delete removes an item. It doesn't add it to the queue, because
-// this implementation assumes the consumer only cares about the objects,
-// not the order in which they were created/added.
-func (f *FIFO) Delete(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- delete(f.items, id)
- return err
-}
-
-// List returns a list of all the items.
-func (f *FIFO) List() []interface{} {
- f.lock.RLock()
- defer f.lock.RUnlock()
- list := make([]interface{}, 0, len(f.items))
- for _, item := range f.items {
- list = append(list, item)
- }
- return list
-}
-
-// ListKeys returns a list of all the keys of the objects currently
-// in the FIFO.
-func (f *FIFO) ListKeys() []string {
- f.lock.RLock()
- defer f.lock.RUnlock()
- list := make([]string, 0, len(f.items))
- for key := range f.items {
- list = append(list, key)
- }
- return list
-}
-
-// Get returns the requested item, or sets exists=false.
-func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
- key, err := f.keyFunc(obj)
- if err != nil {
- return nil, false, KeyError{obj, err}
- }
- return f.GetByKey(key)
-}
-
-// GetByKey returns the requested item, or sets exists=false.
-func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
- f.lock.RLock()
- defer f.lock.RUnlock()
- item, exists = f.items[key]
- return item, exists, nil
-}
-
-// Checks if the queue is closed
-func (f *FIFO) IsClosed() bool {
- f.closedLock.Lock()
- defer f.closedLock.Unlock()
- if f.closed {
- return true
- }
- return false
-}
-
-// Pop waits until an item is ready and processes it. If multiple items are
-// ready, they are returned in the order in which they were added/updated.
-// The item is removed from the queue (and the store) before it is processed,
-// so if you don't successfully process it, it should be added back with
-// AddIfNotPresent(). process function is called under lock, so it is safe
-// update data structures in it that need to be in sync with the queue.
-func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
- f.lock.Lock()
- defer f.lock.Unlock()
- for {
- for len(f.queue) == 0 {
- // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
- // When Close() is called, the f.closed is set and the condition is broadcasted.
- // Which causes this loop to continue and return from the Pop().
- if f.IsClosed() {
- return nil, FIFOClosedError
- }
-
- f.cond.Wait()
- }
- id := f.queue[0]
- f.queue = f.queue[1:]
- if f.initialPopulationCount > 0 {
- f.initialPopulationCount--
- }
- item, ok := f.items[id]
- if !ok {
- // Item may have been deleted subsequently.
- continue
- }
- delete(f.items, id)
- err := process(item)
- if e, ok := err.(ErrRequeue); ok {
- f.addIfNotPresent(id, item)
- err = e.Err
- }
- return item, err
- }
-}
-
-// Replace will delete the contents of 'f', using instead the given map.
-// 'f' takes ownership of the map, you should not reference the map again
-// after calling this function. f's queue is reset, too; upon return, it
-// will contain the items in the map, in no particular order.
-func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
- items := map[string]interface{}{}
- for _, item := range list {
- key, err := f.keyFunc(item)
- if err != nil {
- return KeyError{item, err}
- }
- items[key] = item
- }
-
- f.lock.Lock()
- defer f.lock.Unlock()
-
- if !f.populated {
- f.populated = true
- f.initialPopulationCount = len(items)
- }
-
- f.items = items
- f.queue = f.queue[:0]
- for id := range items {
- f.queue = append(f.queue, id)
- }
- if len(f.queue) > 0 {
- f.cond.Broadcast()
- }
- return nil
-}
-
-// Resync will touch all objects to put them into the processing queue
-func (f *FIFO) Resync() error {
- f.lock.Lock()
- defer f.lock.Unlock()
-
- inQueue := sets.NewString()
- for _, id := range f.queue {
- inQueue.Insert(id)
- }
- for id := range f.items {
- if !inQueue.Has(id) {
- f.queue = append(f.queue, id)
- }
- }
- if len(f.queue) > 0 {
- f.cond.Broadcast()
- }
- return nil
-}
-
-// NewFIFO returns a Store which can be used to queue up items to
-// process.
-func NewFIFO(keyFunc KeyFunc) *FIFO {
- f := &FIFO{
- items: map[string]interface{}{},
- queue: []string{},
- keyFunc: keyFunc,
- }
- f.cond.L = &f.lock
- return f
-}