diff options
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/kubelet/container/cache.go')
-rw-r--r-- | vendor/k8s.io/kubernetes/pkg/kubelet/container/cache.go | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/container/cache.go b/vendor/k8s.io/kubernetes/pkg/kubelet/container/cache.go new file mode 100644 index 000000000..82852a9d9 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/container/cache.go @@ -0,0 +1,199 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/types" +) + +// Cache stores the PodStatus for the pods. It represents *all* the visible +// pods/containers in the container runtime. All cache entries are at least as +// new or newer than the global timestamp (set by UpdateTime()), while +// individual entries may be slightly newer than the global timestamp. If a pod +// has no states known by the runtime, Cache returns an empty PodStatus object +// with ID populated. +// +// Cache provides two methods to retrive the PodStatus: the non-blocking Get() +// and the blocking GetNewerThan() method. The component responsible for +// populating the cache is expected to call Delete() to explicitly free the +// cache entries. +type Cache interface { + Get(types.UID) (*PodStatus, error) + Set(types.UID, *PodStatus, error, time.Time) + // GetNewerThan is a blocking call that only returns the status + // when it is newer than the given time. + GetNewerThan(types.UID, time.Time) (*PodStatus, error) + Delete(types.UID) + UpdateTime(time.Time) +} + +type data struct { + // Status of the pod. + status *PodStatus + // Error got when trying to inspect the pod. + err error + // Time when the data was last modified. + modified time.Time +} + +type subRecord struct { + time time.Time + ch chan *data +} + +// cache implements Cache. +type cache struct { + // Lock which guards all internal data structures. + lock sync.RWMutex + // Map that stores the pod statuses. + pods map[types.UID]*data + // A global timestamp represents how fresh the cached data is. All + // cache content is at the least newer than this timestamp. Note that the + // timestamp is nil after initialization, and will only become non-nil when + // it is ready to serve the cached statuses. + timestamp *time.Time + // Map that stores the subscriber records. + subscribers map[types.UID][]*subRecord +} + +// NewCache creates a pod cache. +func NewCache() Cache { + return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}} +} + +// Get returns the PodStatus for the pod; callers are expected not to +// modify the objects returned. +func (c *cache) Get(id types.UID) (*PodStatus, error) { + c.lock.RLock() + defer c.lock.RUnlock() + d := c.get(id) + return d.status, d.err +} + +func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) { + ch := c.subscribe(id, minTime) + d := <-ch + return d.status, d.err +} + +// Set sets the PodStatus for the pod. +func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + defer c.notify(id, timestamp) + c.pods[id] = &data{status: status, err: err, modified: timestamp} +} + +// Delete removes the entry of the pod. +func (c *cache) Delete(id types.UID) { + c.lock.Lock() + defer c.lock.Unlock() + delete(c.pods, id) +} + +// UpdateTime modifies the global timestamp of the cache and notify +// subscribers if needed. +func (c *cache) UpdateTime(timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + c.timestamp = ×tamp + // Notify all the subscribers if the condition is met. + for id := range c.subscribers { + c.notify(id, *c.timestamp) + } +} + +func makeDefaultData(id types.UID) *data { + return &data{status: &PodStatus{ID: id}, err: nil} +} + +func (c *cache) get(id types.UID) *data { + d, ok := c.pods[id] + if !ok { + // Cache should store *all* pod/container information known by the + // container runtime. A cache miss indicates that there are no states + // regarding the pod last time we queried the container runtime. + // What this *really* means is that there are no visible pod/containers + // associated with this pod. Simply return an default (mostly empty) + // PodStatus to reflect this. + return makeDefaultData(id) + } + return d +} + +// getIfNewerThan returns the data it is newer than the given time. +// Otherwise, it returns nil. The caller should acquire the lock. +func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data { + d, ok := c.pods[id] + globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime)) + if !ok && globalTimestampIsNewer { + // Status is not cached, but the global timestamp is newer than + // minTime, return the default status. + return makeDefaultData(id) + } + if ok && (d.modified.After(minTime) || globalTimestampIsNewer) { + // Status is cached, return status if either of the following is true. + // * status was modified after minTime + // * the global timestamp of the cache is newer than minTime. + return d + } + // The pod status is not ready. + return nil +} + +// notify sends notifications for pod with the given id, if the requirements +// are met. Note that the caller should acquire the lock. +func (c *cache) notify(id types.UID, timestamp time.Time) { + list, ok := c.subscribers[id] + if !ok { + // No one to notify. + return + } + newList := []*subRecord{} + for i, r := range list { + if timestamp.Before(r.time) { + // Doesn't meet the time requirement; keep the record. + newList = append(newList, list[i]) + continue + } + r.ch <- c.get(id) + close(r.ch) + } + if len(newList) == 0 { + delete(c.subscribers, id) + } else { + c.subscribers[id] = newList + } +} + +func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data { + ch := make(chan *data, 1) + c.lock.Lock() + defer c.lock.Unlock() + d := c.getIfNewerThan(id, timestamp) + if d != nil { + // If the cache entry is ready, send the data and return immediately. + ch <- d + return ch + } + // Add the subscription record. + c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch}) + return ch +} |