diff options
author | Matthew Heon <matthew.heon@gmail.com> | 2017-11-01 11:24:59 -0400 |
---|---|---|
committer | Matthew Heon <matthew.heon@gmail.com> | 2017-11-01 11:24:59 -0400 |
commit | a031b83a09a8628435317a03f199cdc18b78262f (patch) | |
tree | bc017a96769ce6de33745b8b0b1304ccf38e9df0 /vendor/k8s.io/apimachinery/pkg/watch | |
parent | 2b74391cd5281f6fdf391ff8ad50fd1490f6bf89 (diff) | |
download | podman-a031b83a09a8628435317a03f199cdc18b78262f.tar.gz podman-a031b83a09a8628435317a03f199cdc18b78262f.tar.bz2 podman-a031b83a09a8628435317a03f199cdc18b78262f.zip |
Initial checkin from CRI-O repo
Signed-off-by: Matthew Heon <matthew.heon@gmail.com>
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch')
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/doc.go | 19 | ||||
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/filter.go | 109 | ||||
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/mux.go | 257 | ||||
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go | 119 | ||||
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/until.go | 87 | ||||
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/watch.go | 269 |
6 files changed, 860 insertions, 0 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/doc.go b/vendor/k8s.io/apimachinery/pkg/watch/doc.go new file mode 100644 index 000000000..7e6bf3fb9 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/watch/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package watch contains a generic watchable interface, and a fake for +// testing code that uses the watch interface. +package watch // import "k8s.io/apimachinery/pkg/watch" diff --git a/vendor/k8s.io/apimachinery/pkg/watch/filter.go b/vendor/k8s.io/apimachinery/pkg/watch/filter.go new file mode 100644 index 000000000..3ca27f22c --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/watch/filter.go @@ -0,0 +1,109 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" +) + +// FilterFunc should take an event, possibly modify it in some way, and return +// the modified event. If the event should be ignored, then return keep=false. +type FilterFunc func(in Event) (out Event, keep bool) + +// Filter passes all events through f before allowing them to pass on. +// Putting a filter on a watch, as an unavoidable side-effect due to the way +// go channels work, effectively causes the watch's event channel to have its +// queue length increased by one. +// +// WARNING: filter has a fatal flaw, in that it can't properly update the +// Type field (Add/Modified/Deleted) to reflect items beginning to pass the +// filter when they previously didn't. +// +func Filter(w Interface, f FilterFunc) Interface { + fw := &filteredWatch{ + incoming: w, + result: make(chan Event), + f: f, + } + go fw.loop() + return fw +} + +type filteredWatch struct { + incoming Interface + result chan Event + f FilterFunc +} + +// ResultChan returns a channel which will receive filtered events. +func (fw *filteredWatch) ResultChan() <-chan Event { + return fw.result +} + +// Stop stops the upstream watch, which will eventually stop this watch. +func (fw *filteredWatch) Stop() { + fw.incoming.Stop() +} + +// loop waits for new values, filters them, and resends them. +func (fw *filteredWatch) loop() { + defer close(fw.result) + for { + event, ok := <-fw.incoming.ResultChan() + if !ok { + break + } + filtered, keep := fw.f(event) + if keep { + fw.result <- filtered + } + } +} + +// Recorder records all events that are sent from the watch until it is closed. +type Recorder struct { + Interface + + lock sync.Mutex + events []Event +} + +var _ Interface = &Recorder{} + +// NewRecorder wraps an Interface and records any changes sent across it. +func NewRecorder(w Interface) *Recorder { + r := &Recorder{} + r.Interface = Filter(w, r.record) + return r +} + +// record is a FilterFunc and tracks each received event. +func (r *Recorder) record(in Event) (Event, bool) { + r.lock.Lock() + defer r.lock.Unlock() + r.events = append(r.events, in) + return in, true +} + +// Events returns a copy of the events sent across this recorder. +func (r *Recorder) Events() []Event { + r.lock.Lock() + defer r.lock.Unlock() + copied := make([]Event, len(r.events)) + copy(copied, r.events) + return copied +} diff --git a/vendor/k8s.io/apimachinery/pkg/watch/mux.go b/vendor/k8s.io/apimachinery/pkg/watch/mux.go new file mode 100644 index 000000000..fafccd78e --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/watch/mux.go @@ -0,0 +1,257 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch +// channel is full. +type FullChannelBehavior int + +const ( + WaitIfChannelFull FullChannelBehavior = iota + DropIfChannelFull +) + +// Buffer the incoming queue a little bit even though it should rarely ever accumulate +// anything, just in case a few events are received in such a short window that +// Broadcaster can't move them onto the watchers' queues fast enough. +const incomingQueueLength = 25 + +// Broadcaster distributes event notifications among any number of watchers. Every event +// is delivered to every watcher. +type Broadcaster struct { + // TODO: see if this lock is needed now that new watchers go through + // the incoming channel. + lock sync.Mutex + + watchers map[int64]*broadcasterWatcher + nextWatcher int64 + distributing sync.WaitGroup + + incoming chan Event + + // How large to make watcher's channel. + watchQueueLength int + // If one of the watch channels is full, don't wait for it to become empty. + // Instead just deliver it to the watchers that do have space in their + // channels and move on to the next event. + // It's more fair to do this on a per-watcher basis than to do it on the + // "incoming" channel, which would allow one slow watcher to prevent all + // other watchers from getting new events. + fullChannelBehavior FullChannelBehavior +} + +// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher. +// It is guaranteed that events will be distributed in the order in which they occur, +// but the order in which a single event is distributed among all of the watchers is unspecified. +func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { + m := &Broadcaster{ + watchers: map[int64]*broadcasterWatcher{}, + incoming: make(chan Event, incomingQueueLength), + watchQueueLength: queueLength, + fullChannelBehavior: fullChannelBehavior, + } + m.distributing.Add(1) + go m.loop() + return m +} + +const internalRunFunctionMarker = "internal-do-function" + +// a function type we can shoehorn into the queue. +type functionFakeRuntimeObject func() + +func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind { + return schema.EmptyObjectKind +} + +// Execute f, blocking the incoming queue (and waiting for it to drain first). +// The purpose of this terrible hack is so that watchers added after an event +// won't ever see that event, and will always see any event after they are +// added. +func (b *Broadcaster) blockQueue(f func()) { + var wg sync.WaitGroup + wg.Add(1) + b.incoming <- Event{ + Type: internalRunFunctionMarker, + Object: functionFakeRuntimeObject(func() { + defer wg.Done() + f() + }), + } + wg.Wait() +} + +// Watch adds a new watcher to the list and returns an Interface for it. +// Note: new watchers will only receive new events. They won't get an entire history +// of previous events. +func (m *Broadcaster) Watch() Interface { + var w *broadcasterWatcher + m.blockQueue(func() { + m.lock.Lock() + defer m.lock.Unlock() + id := m.nextWatcher + m.nextWatcher++ + w = &broadcasterWatcher{ + result: make(chan Event, m.watchQueueLength), + stopped: make(chan struct{}), + id: id, + m: m, + } + m.watchers[id] = w + }) + return w +} + +// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends +// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster. +// The returned watch will have a queue length that is at least large enough to accommodate +// all of the items in queuedEvents. +func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface { + var w *broadcasterWatcher + m.blockQueue(func() { + m.lock.Lock() + defer m.lock.Unlock() + id := m.nextWatcher + m.nextWatcher++ + length := m.watchQueueLength + if n := len(queuedEvents) + 1; n > length { + length = n + } + w = &broadcasterWatcher{ + result: make(chan Event, length), + stopped: make(chan struct{}), + id: id, + m: m, + } + m.watchers[id] = w + for _, e := range queuedEvents { + w.result <- e + } + }) + return w +} + +// stopWatching stops the given watcher and removes it from the list. +func (m *Broadcaster) stopWatching(id int64) { + m.lock.Lock() + defer m.lock.Unlock() + w, ok := m.watchers[id] + if !ok { + // No need to do anything, it's already been removed from the list. + return + } + delete(m.watchers, id) + close(w.result) +} + +// closeAll disconnects all watchers (presumably in response to a Shutdown call). +func (m *Broadcaster) closeAll() { + m.lock.Lock() + defer m.lock.Unlock() + for _, w := range m.watchers { + close(w.result) + } + // Delete everything from the map, since presence/absence in the map is used + // by stopWatching to avoid double-closing the channel. + m.watchers = map[int64]*broadcasterWatcher{} +} + +// Action distributes the given event among all watchers. +func (m *Broadcaster) Action(action EventType, obj runtime.Object) { + m.incoming <- Event{action, obj} +} + +// Shutdown disconnects all watchers (but any queued events will still be distributed). +// You must not call Action or Watch* after calling Shutdown. This call blocks +// until all events have been distributed through the outbound channels. Note +// that since they can be buffered, this means that the watchers might not +// have received the data yet as it can remain sitting in the buffered +// channel. +func (m *Broadcaster) Shutdown() { + close(m.incoming) + m.distributing.Wait() +} + +// loop receives from m.incoming and distributes to all watchers. +func (m *Broadcaster) loop() { + // Deliberately not catching crashes here. Yes, bring down the process if there's a + // bug in watch.Broadcaster. + for { + event, ok := <-m.incoming + if !ok { + break + } + if event.Type == internalRunFunctionMarker { + event.Object.(functionFakeRuntimeObject)() + continue + } + m.distribute(event) + } + m.closeAll() + m.distributing.Done() +} + +// distribute sends event to all watchers. Blocking. +func (m *Broadcaster) distribute(event Event) { + m.lock.Lock() + defer m.lock.Unlock() + if m.fullChannelBehavior == DropIfChannelFull { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + default: // Don't block if the event can't be queued. + } + } + } else { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + } + } + } +} + +// broadcasterWatcher handles a single watcher of a broadcaster +type broadcasterWatcher struct { + result chan Event + stopped chan struct{} + stop sync.Once + id int64 + m *Broadcaster +} + +// ResultChan returns a channel to use for waiting on events. +func (mw *broadcasterWatcher) ResultChan() <-chan Event { + return mw.result +} + +// Stop stops watching and removes mw from its list. +func (mw *broadcasterWatcher) Stop() { + mw.stop.Do(func() { + close(mw.stopped) + mw.m.stopWatching(mw.id) + }) +} diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go new file mode 100644 index 000000000..93bb1cdf7 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -0,0 +1,119 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "io" + "sync" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/net" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" +) + +// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written. +type Decoder interface { + // Decode should return the type of event, the decoded object, or an error. + // An error will cause StreamWatcher to call Close(). Decode should block until + // it has data or an error occurs. + Decode() (action EventType, object runtime.Object, err error) + + // Close should close the underlying io.Reader, signalling to the source of + // the stream that it is no longer being watched. Close() must cause any + // outstanding call to Decode() to return with an error of some sort. + Close() +} + +// StreamWatcher turns any stream for which you can write a Decoder interface +// into a watch.Interface. +type StreamWatcher struct { + sync.Mutex + source Decoder + result chan Event + stopped bool +} + +// NewStreamWatcher creates a StreamWatcher from the given decoder. +func NewStreamWatcher(d Decoder) *StreamWatcher { + sw := &StreamWatcher{ + source: d, + // It's easy for a consumer to add buffering via an extra + // goroutine/channel, but impossible for them to remove it, + // so nonbuffered is better. + result: make(chan Event), + } + go sw.receive() + return sw +} + +// ResultChan implements Interface. +func (sw *StreamWatcher) ResultChan() <-chan Event { + return sw.result +} + +// Stop implements Interface. +func (sw *StreamWatcher) Stop() { + // Call Close() exactly once by locking and setting a flag. + sw.Lock() + defer sw.Unlock() + if !sw.stopped { + sw.stopped = true + sw.source.Close() + } +} + +// stopping returns true if Stop() was called previously. +func (sw *StreamWatcher) stopping() bool { + sw.Lock() + defer sw.Unlock() + return sw.stopped +} + +// receive reads result from the decoder in a loop and sends down the result channel. +func (sw *StreamWatcher) receive() { + defer close(sw.result) + defer sw.Stop() + defer utilruntime.HandleCrash() + for { + action, obj, err := sw.source.Decode() + if err != nil { + // Ignore expected error. + if sw.stopping() { + return + } + switch err { + case io.EOF: + // watch closed normally + case io.ErrUnexpectedEOF: + glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) + default: + msg := "Unable to decode an event from the watch stream: %v" + if net.IsProbableEOF(err) { + glog.V(5).Infof(msg, err) + } else { + glog.Errorf(msg, err) + } + } + return + } + sw.result <- Event{ + Type: action, + Object: obj, + } + } +} diff --git a/vendor/k8s.io/apimachinery/pkg/watch/until.go b/vendor/k8s.io/apimachinery/pkg/watch/until.go new file mode 100644 index 000000000..c2772ddb5 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/watch/until.go @@ -0,0 +1,87 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "errors" + "time" + + "k8s.io/apimachinery/pkg/util/wait" +) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event Event) (bool, error) + +// ErrWatchClosed is returned when the watch channel is closed before timeout in Until. +var ErrWatchClosed = errors.New("watch closed before Until timeout") + +// Until reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// A zero timeout means to wait forever. +func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var after <-chan time.Time + if timeout > 0 { + after = time.After(timeout) + } else { + ch := make(chan time.Time) + defer close(ch) + after = ch + } + var lastEvent *Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, ErrWatchClosed + } + lastEvent = &event + + // TODO: check for watch expired error and retry watch from latest point? + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-after: + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} diff --git a/vendor/k8s.io/apimachinery/pkg/watch/watch.go b/vendor/k8s.io/apimachinery/pkg/watch/watch.go new file mode 100644 index 000000000..dd49c41f9 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/watch/watch.go @@ -0,0 +1,269 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/runtime" + + "github.com/golang/glog" +) + +// Interface can be implemented by anything that knows how to watch and report changes. +type Interface interface { + // Stops watching. Will close the channel returned by ResultChan(). Releases + // any resources used by the watch. + Stop() + + // Returns a chan which will receive all the events. If an error occurs + // or Stop() is called, this channel will be closed, in which case the + // watch should be completely cleaned up. + ResultChan() <-chan Event +} + +// EventType defines the possible types of events. +type EventType string + +const ( + Added EventType = "ADDED" + Modified EventType = "MODIFIED" + Deleted EventType = "DELETED" + Error EventType = "ERROR" + + DefaultChanSize int32 = 100 +) + +// Event represents a single event to a watched resource. +type Event struct { + Type EventType + + // Object is: + // * If Type is Added or Modified: the new state of the object. + // * If Type is Deleted: the state of the object immediately before deletion. + // * If Type is Error: *api.Status is recommended; other types may make sense + // depending on context. + Object runtime.Object +} + +type emptyWatch chan Event + +// NewEmptyWatch returns a watch interface that returns no results and is closed. +// May be used in certain error conditions where no information is available but +// an error is not warranted. +func NewEmptyWatch() Interface { + ch := make(chan Event) + close(ch) + return emptyWatch(ch) +} + +// Stop implements Interface +func (w emptyWatch) Stop() { +} + +// ResultChan implements Interface +func (w emptyWatch) ResultChan() <-chan Event { + return chan Event(w) +} + +// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. +type FakeWatcher struct { + result chan Event + Stopped bool + sync.Mutex +} + +func NewFake() *FakeWatcher { + return &FakeWatcher{ + result: make(chan Event), + } +} + +func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher { + return &FakeWatcher{ + result: make(chan Event, size), + } +} + +// Stop implements Interface.Stop(). +func (f *FakeWatcher) Stop() { + f.Lock() + defer f.Unlock() + if !f.Stopped { + glog.V(4).Infof("Stopping fake watcher.") + close(f.result) + f.Stopped = true + } +} + +func (f *FakeWatcher) IsStopped() bool { + f.Lock() + defer f.Unlock() + return f.Stopped +} + +// Reset prepares the watcher to be reused. +func (f *FakeWatcher) Reset() { + f.Lock() + defer f.Unlock() + f.Stopped = false + f.result = make(chan Event) +} + +func (f *FakeWatcher) ResultChan() <-chan Event { + return f.result +} + +// Add sends an add event. +func (f *FakeWatcher) Add(obj runtime.Object) { + f.result <- Event{Added, obj} +} + +// Modify sends a modify event. +func (f *FakeWatcher) Modify(obj runtime.Object) { + f.result <- Event{Modified, obj} +} + +// Delete sends a delete event. +func (f *FakeWatcher) Delete(lastValue runtime.Object) { + f.result <- Event{Deleted, lastValue} +} + +// Error sends an Error event. +func (f *FakeWatcher) Error(errValue runtime.Object) { + f.result <- Event{Error, errValue} +} + +// Action sends an event of the requested type, for table-based testing. +func (f *FakeWatcher) Action(action EventType, obj runtime.Object) { + f.result <- Event{action, obj} +} + +// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. +type RaceFreeFakeWatcher struct { + result chan Event + Stopped bool + sync.Mutex +} + +func NewRaceFreeFake() *RaceFreeFakeWatcher { + return &RaceFreeFakeWatcher{ + result: make(chan Event, DefaultChanSize), + } +} + +// Stop implements Interface.Stop(). +func (f *RaceFreeFakeWatcher) Stop() { + f.Lock() + defer f.Unlock() + if !f.Stopped { + glog.V(4).Infof("Stopping fake watcher.") + close(f.result) + f.Stopped = true + } +} + +func (f *RaceFreeFakeWatcher) IsStopped() bool { + f.Lock() + defer f.Unlock() + return f.Stopped +} + +// Reset prepares the watcher to be reused. +func (f *RaceFreeFakeWatcher) Reset() { + f.Lock() + defer f.Unlock() + f.Stopped = false + f.result = make(chan Event, DefaultChanSize) +} + +func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event { + f.Lock() + defer f.Unlock() + return f.result +} + +// Add sends an add event. +func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Added, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Modify sends a modify event. +func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Modified, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Delete sends a delete event. +func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Deleted, lastValue}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Error sends an Error event. +func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Error, errValue}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Action sends an event of the requested type, for table-based testing. +func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{action, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} |