summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/tools/cache/mutation_cache.go')
-rw-r--r--vendor/k8s.io/client-go/tools/cache/mutation_cache.go261
1 files changed, 261 insertions, 0 deletions
diff --git a/vendor/k8s.io/client-go/tools/cache/mutation_cache.go b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
new file mode 100644
index 000000000..0fa06bf77
--- /dev/null
+++ b/vendor/k8s.io/client-go/tools/cache/mutation_cache.go
@@ -0,0 +1,261 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilcache "k8s.io/apimachinery/pkg/util/cache"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/sets"
+)
+
+// MutationCache is able to take the result of update operations and stores them in an LRU
+// that can be used to provide a more current view of a requested object. It requires interpreting
+// resourceVersions for comparisons.
+// Implementations must be thread-safe.
+// TODO find a way to layer this into an informer/lister
+type MutationCache interface {
+ GetByKey(key string) (interface{}, bool, error)
+ ByIndex(indexName, indexKey string) ([]interface{}, error)
+ Mutation(interface{})
+}
+
+type ResourceVersionComparator interface {
+ CompareResourceVersion(lhs, rhs runtime.Object) int
+}
+
+// NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
+// deal with objects that have a resource version that:
+//
+// - is an integer
+// - increases when updated
+// - is comparable across the same resource in a namespace
+//
+// Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
+// remains in the mutation cache before it is removed.
+//
+// If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
+// in the underlying store. This is only safe if your use of the cache can handle mutation entries
+// remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
+func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
+ return &mutationCache{
+ backingCache: backingCache,
+ indexer: indexer,
+ mutationCache: utilcache.NewLRUExpireCache(100),
+ comparator: etcdObjectVersioner{},
+ ttl: ttl,
+ includeAdds: includeAdds,
+ }
+}
+
+// mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
+// since you can't distinguish between, "didn't observe create" and "was deleted after create",
+// if the key is missing from the backing cache, we always return it as missing
+type mutationCache struct {
+ lock sync.Mutex
+ backingCache Store
+ indexer Indexer
+ mutationCache *utilcache.LRUExpireCache
+ includeAdds bool
+ ttl time.Duration
+
+ comparator ResourceVersionComparator
+}
+
+// GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
+// be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
+// You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
+func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ obj, exists, err := c.backingCache.GetByKey(key)
+ if err != nil {
+ return nil, false, err
+ }
+ if !exists {
+ if !c.includeAdds {
+ // we can't distinguish between, "didn't observe create" and "was deleted after create", so
+ // if the key is missing, we always return it as missing
+ return nil, false, nil
+ }
+ obj, exists = c.mutationCache.Get(key)
+ if !exists {
+ return nil, false, nil
+ }
+ }
+ objRuntime, ok := obj.(runtime.Object)
+ if !ok {
+ return obj, true, nil
+ }
+ return c.newerObject(key, objRuntime), true, nil
+}
+
+// ByIndex returns the newer objects that match the provided index and indexer key.
+// Will return an error if no indexer was provided.
+func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if c.indexer == nil {
+ return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
+ }
+ keys, err := c.indexer.IndexKeys(name, indexKey)
+ if err != nil {
+ return nil, err
+ }
+ var items []interface{}
+ keySet := sets.NewString()
+ for _, key := range keys {
+ keySet.Insert(key)
+ obj, exists, err := c.indexer.GetByKey(key)
+ if err != nil {
+ return nil, err
+ }
+ if !exists {
+ continue
+ }
+ if objRuntime, ok := obj.(runtime.Object); ok {
+ items = append(items, c.newerObject(key, objRuntime))
+ } else {
+ items = append(items, obj)
+ }
+ }
+
+ if c.includeAdds {
+ fn := c.indexer.GetIndexers()[name]
+ // Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
+ for _, key := range c.mutationCache.Keys() {
+ updated, ok := c.mutationCache.Get(key)
+ if !ok {
+ continue
+ }
+ if keySet.Has(key.(string)) {
+ continue
+ }
+ elements, err := fn(updated)
+ if err != nil {
+ glog.V(4).Info("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
+ continue
+ }
+ for _, inIndex := range elements {
+ if inIndex != indexKey {
+ continue
+ }
+ items = append(items, updated)
+ break
+ }
+ }
+ }
+
+ return items, nil
+}
+
+// newerObject checks the mutation cache for a newer object and returns one if found. If the
+// mutated object is older than the backing object, it is removed from the Must be
+// called while the lock is held.
+func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
+ mutatedObj, exists := c.mutationCache.Get(key)
+ if !exists {
+ return backing
+ }
+ mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
+ if !ok {
+ return backing
+ }
+ if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
+ c.mutationCache.Remove(key)
+ return backing
+ }
+ return mutatedObjRuntime
+}
+
+// Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
+// copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
+// which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
+// preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
+// "one that was valid at some point in time", not "the one that I want".
+func (c *mutationCache) Mutation(obj interface{}) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
+ if err != nil {
+ // this is a "nice to have", so failures shouldn't do anything weird
+ utilruntime.HandleError(err)
+ return
+ }
+
+ if objRuntime, ok := obj.(runtime.Object); ok {
+ if mutatedObj, exists := c.mutationCache.Get(key); exists {
+ if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
+ if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
+ return
+ }
+ }
+ }
+ }
+ c.mutationCache.Add(key, obj, c.ttl)
+}
+
+// etcdObjectVersioner implements versioning and extracting etcd node information
+// for objects that have an embedded ObjectMeta or ListMeta field.
+type etcdObjectVersioner struct{}
+
+// ObjectResourceVersion implements Versioner
+func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
+ accessor, err := meta.Accessor(obj)
+ if err != nil {
+ return 0, err
+ }
+ version := accessor.GetResourceVersion()
+ if len(version) == 0 {
+ return 0, nil
+ }
+ return strconv.ParseUint(version, 10, 64)
+}
+
+// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
+// but etcd resource versions are special, they're actually ints, so we can easily compare them.
+func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
+ lhsVersion, err := a.ObjectResourceVersion(lhs)
+ if err != nil {
+ // coder error
+ panic(err)
+ }
+ rhsVersion, err := a.ObjectResourceVersion(rhs)
+ if err != nil {
+ // coder error
+ panic(err)
+ }
+
+ if lhsVersion == rhsVersion {
+ return 0
+ }
+ if lhsVersion < rhsVersion {
+ return -1
+ }
+
+ return 1
+}