summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/doc.go19
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/filter.go109
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/mux.go257
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go119
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/until.go87
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/watch.go269
6 files changed, 860 insertions, 0 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/doc.go b/vendor/k8s.io/apimachinery/pkg/watch/doc.go
new file mode 100644
index 000000000..7e6bf3fb9
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/watch/doc.go
@@ -0,0 +1,19 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package watch contains a generic watchable interface, and a fake for
+// testing code that uses the watch interface.
+package watch // import "k8s.io/apimachinery/pkg/watch"
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/filter.go b/vendor/k8s.io/apimachinery/pkg/watch/filter.go
new file mode 100644
index 000000000..3ca27f22c
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/watch/filter.go
@@ -0,0 +1,109 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+ "sync"
+)
+
+// FilterFunc should take an event, possibly modify it in some way, and return
+// the modified event. If the event should be ignored, then return keep=false.
+type FilterFunc func(in Event) (out Event, keep bool)
+
+// Filter passes all events through f before allowing them to pass on.
+// Putting a filter on a watch, as an unavoidable side-effect due to the way
+// go channels work, effectively causes the watch's event channel to have its
+// queue length increased by one.
+//
+// WARNING: filter has a fatal flaw, in that it can't properly update the
+// Type field (Add/Modified/Deleted) to reflect items beginning to pass the
+// filter when they previously didn't.
+//
+func Filter(w Interface, f FilterFunc) Interface {
+ fw := &filteredWatch{
+ incoming: w,
+ result: make(chan Event),
+ f: f,
+ }
+ go fw.loop()
+ return fw
+}
+
+type filteredWatch struct {
+ incoming Interface
+ result chan Event
+ f FilterFunc
+}
+
+// ResultChan returns a channel which will receive filtered events.
+func (fw *filteredWatch) ResultChan() <-chan Event {
+ return fw.result
+}
+
+// Stop stops the upstream watch, which will eventually stop this watch.
+func (fw *filteredWatch) Stop() {
+ fw.incoming.Stop()
+}
+
+// loop waits for new values, filters them, and resends them.
+func (fw *filteredWatch) loop() {
+ defer close(fw.result)
+ for {
+ event, ok := <-fw.incoming.ResultChan()
+ if !ok {
+ break
+ }
+ filtered, keep := fw.f(event)
+ if keep {
+ fw.result <- filtered
+ }
+ }
+}
+
+// Recorder records all events that are sent from the watch until it is closed.
+type Recorder struct {
+ Interface
+
+ lock sync.Mutex
+ events []Event
+}
+
+var _ Interface = &Recorder{}
+
+// NewRecorder wraps an Interface and records any changes sent across it.
+func NewRecorder(w Interface) *Recorder {
+ r := &Recorder{}
+ r.Interface = Filter(w, r.record)
+ return r
+}
+
+// record is a FilterFunc and tracks each received event.
+func (r *Recorder) record(in Event) (Event, bool) {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+ r.events = append(r.events, in)
+ return in, true
+}
+
+// Events returns a copy of the events sent across this recorder.
+func (r *Recorder) Events() []Event {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+ copied := make([]Event, len(r.events))
+ copy(copied, r.events)
+ return copied
+}
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/mux.go b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
new file mode 100644
index 000000000..fafccd78e
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
@@ -0,0 +1,257 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+ "sync"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+)
+
+// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
+// channel is full.
+type FullChannelBehavior int
+
+const (
+ WaitIfChannelFull FullChannelBehavior = iota
+ DropIfChannelFull
+)
+
+// Buffer the incoming queue a little bit even though it should rarely ever accumulate
+// anything, just in case a few events are received in such a short window that
+// Broadcaster can't move them onto the watchers' queues fast enough.
+const incomingQueueLength = 25
+
+// Broadcaster distributes event notifications among any number of watchers. Every event
+// is delivered to every watcher.
+type Broadcaster struct {
+ // TODO: see if this lock is needed now that new watchers go through
+ // the incoming channel.
+ lock sync.Mutex
+
+ watchers map[int64]*broadcasterWatcher
+ nextWatcher int64
+ distributing sync.WaitGroup
+
+ incoming chan Event
+
+ // How large to make watcher's channel.
+ watchQueueLength int
+ // If one of the watch channels is full, don't wait for it to become empty.
+ // Instead just deliver it to the watchers that do have space in their
+ // channels and move on to the next event.
+ // It's more fair to do this on a per-watcher basis than to do it on the
+ // "incoming" channel, which would allow one slow watcher to prevent all
+ // other watchers from getting new events.
+ fullChannelBehavior FullChannelBehavior
+}
+
+// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
+// It is guaranteed that events will be distributed in the order in which they occur,
+// but the order in which a single event is distributed among all of the watchers is unspecified.
+func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
+ m := &Broadcaster{
+ watchers: map[int64]*broadcasterWatcher{},
+ incoming: make(chan Event, incomingQueueLength),
+ watchQueueLength: queueLength,
+ fullChannelBehavior: fullChannelBehavior,
+ }
+ m.distributing.Add(1)
+ go m.loop()
+ return m
+}
+
+const internalRunFunctionMarker = "internal-do-function"
+
+// a function type we can shoehorn into the queue.
+type functionFakeRuntimeObject func()
+
+func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind {
+ return schema.EmptyObjectKind
+}
+
+// Execute f, blocking the incoming queue (and waiting for it to drain first).
+// The purpose of this terrible hack is so that watchers added after an event
+// won't ever see that event, and will always see any event after they are
+// added.
+func (b *Broadcaster) blockQueue(f func()) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ b.incoming <- Event{
+ Type: internalRunFunctionMarker,
+ Object: functionFakeRuntimeObject(func() {
+ defer wg.Done()
+ f()
+ }),
+ }
+ wg.Wait()
+}
+
+// Watch adds a new watcher to the list and returns an Interface for it.
+// Note: new watchers will only receive new events. They won't get an entire history
+// of previous events.
+func (m *Broadcaster) Watch() Interface {
+ var w *broadcasterWatcher
+ m.blockQueue(func() {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ id := m.nextWatcher
+ m.nextWatcher++
+ w = &broadcasterWatcher{
+ result: make(chan Event, m.watchQueueLength),
+ stopped: make(chan struct{}),
+ id: id,
+ m: m,
+ }
+ m.watchers[id] = w
+ })
+ return w
+}
+
+// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
+// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
+// The returned watch will have a queue length that is at least large enough to accommodate
+// all of the items in queuedEvents.
+func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
+ var w *broadcasterWatcher
+ m.blockQueue(func() {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ id := m.nextWatcher
+ m.nextWatcher++
+ length := m.watchQueueLength
+ if n := len(queuedEvents) + 1; n > length {
+ length = n
+ }
+ w = &broadcasterWatcher{
+ result: make(chan Event, length),
+ stopped: make(chan struct{}),
+ id: id,
+ m: m,
+ }
+ m.watchers[id] = w
+ for _, e := range queuedEvents {
+ w.result <- e
+ }
+ })
+ return w
+}
+
+// stopWatching stops the given watcher and removes it from the list.
+func (m *Broadcaster) stopWatching(id int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ w, ok := m.watchers[id]
+ if !ok {
+ // No need to do anything, it's already been removed from the list.
+ return
+ }
+ delete(m.watchers, id)
+ close(w.result)
+}
+
+// closeAll disconnects all watchers (presumably in response to a Shutdown call).
+func (m *Broadcaster) closeAll() {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ for _, w := range m.watchers {
+ close(w.result)
+ }
+ // Delete everything from the map, since presence/absence in the map is used
+ // by stopWatching to avoid double-closing the channel.
+ m.watchers = map[int64]*broadcasterWatcher{}
+}
+
+// Action distributes the given event among all watchers.
+func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
+ m.incoming <- Event{action, obj}
+}
+
+// Shutdown disconnects all watchers (but any queued events will still be distributed).
+// You must not call Action or Watch* after calling Shutdown. This call blocks
+// until all events have been distributed through the outbound channels. Note
+// that since they can be buffered, this means that the watchers might not
+// have received the data yet as it can remain sitting in the buffered
+// channel.
+func (m *Broadcaster) Shutdown() {
+ close(m.incoming)
+ m.distributing.Wait()
+}
+
+// loop receives from m.incoming and distributes to all watchers.
+func (m *Broadcaster) loop() {
+ // Deliberately not catching crashes here. Yes, bring down the process if there's a
+ // bug in watch.Broadcaster.
+ for {
+ event, ok := <-m.incoming
+ if !ok {
+ break
+ }
+ if event.Type == internalRunFunctionMarker {
+ event.Object.(functionFakeRuntimeObject)()
+ continue
+ }
+ m.distribute(event)
+ }
+ m.closeAll()
+ m.distributing.Done()
+}
+
+// distribute sends event to all watchers. Blocking.
+func (m *Broadcaster) distribute(event Event) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ if m.fullChannelBehavior == DropIfChannelFull {
+ for _, w := range m.watchers {
+ select {
+ case w.result <- event:
+ case <-w.stopped:
+ default: // Don't block if the event can't be queued.
+ }
+ }
+ } else {
+ for _, w := range m.watchers {
+ select {
+ case w.result <- event:
+ case <-w.stopped:
+ }
+ }
+ }
+}
+
+// broadcasterWatcher handles a single watcher of a broadcaster
+type broadcasterWatcher struct {
+ result chan Event
+ stopped chan struct{}
+ stop sync.Once
+ id int64
+ m *Broadcaster
+}
+
+// ResultChan returns a channel to use for waiting on events.
+func (mw *broadcasterWatcher) ResultChan() <-chan Event {
+ return mw.result
+}
+
+// Stop stops watching and removes mw from its list.
+func (mw *broadcasterWatcher) Stop() {
+ mw.stop.Do(func() {
+ close(mw.stopped)
+ mw.m.stopWatching(mw.id)
+ })
+}
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
new file mode 100644
index 000000000..93bb1cdf7
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
@@ -0,0 +1,119 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+ "io"
+ "sync"
+
+ "github.com/golang/glog"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/net"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+)
+
+// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
+type Decoder interface {
+ // Decode should return the type of event, the decoded object, or an error.
+ // An error will cause StreamWatcher to call Close(). Decode should block until
+ // it has data or an error occurs.
+ Decode() (action EventType, object runtime.Object, err error)
+
+ // Close should close the underlying io.Reader, signalling to the source of
+ // the stream that it is no longer being watched. Close() must cause any
+ // outstanding call to Decode() to return with an error of some sort.
+ Close()
+}
+
+// StreamWatcher turns any stream for which you can write a Decoder interface
+// into a watch.Interface.
+type StreamWatcher struct {
+ sync.Mutex
+ source Decoder
+ result chan Event
+ stopped bool
+}
+
+// NewStreamWatcher creates a StreamWatcher from the given decoder.
+func NewStreamWatcher(d Decoder) *StreamWatcher {
+ sw := &StreamWatcher{
+ source: d,
+ // It's easy for a consumer to add buffering via an extra
+ // goroutine/channel, but impossible for them to remove it,
+ // so nonbuffered is better.
+ result: make(chan Event),
+ }
+ go sw.receive()
+ return sw
+}
+
+// ResultChan implements Interface.
+func (sw *StreamWatcher) ResultChan() <-chan Event {
+ return sw.result
+}
+
+// Stop implements Interface.
+func (sw *StreamWatcher) Stop() {
+ // Call Close() exactly once by locking and setting a flag.
+ sw.Lock()
+ defer sw.Unlock()
+ if !sw.stopped {
+ sw.stopped = true
+ sw.source.Close()
+ }
+}
+
+// stopping returns true if Stop() was called previously.
+func (sw *StreamWatcher) stopping() bool {
+ sw.Lock()
+ defer sw.Unlock()
+ return sw.stopped
+}
+
+// receive reads result from the decoder in a loop and sends down the result channel.
+func (sw *StreamWatcher) receive() {
+ defer close(sw.result)
+ defer sw.Stop()
+ defer utilruntime.HandleCrash()
+ for {
+ action, obj, err := sw.source.Decode()
+ if err != nil {
+ // Ignore expected error.
+ if sw.stopping() {
+ return
+ }
+ switch err {
+ case io.EOF:
+ // watch closed normally
+ case io.ErrUnexpectedEOF:
+ glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
+ default:
+ msg := "Unable to decode an event from the watch stream: %v"
+ if net.IsProbableEOF(err) {
+ glog.V(5).Infof(msg, err)
+ } else {
+ glog.Errorf(msg, err)
+ }
+ }
+ return
+ }
+ sw.result <- Event{
+ Type: action,
+ Object: obj,
+ }
+ }
+}
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/until.go b/vendor/k8s.io/apimachinery/pkg/watch/until.go
new file mode 100644
index 000000000..c2772ddb5
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/watch/until.go
@@ -0,0 +1,87 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+ "errors"
+ "time"
+
+ "k8s.io/apimachinery/pkg/util/wait"
+)
+
+// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
+// or an error if the condition cannot be checked and should terminate. In general, it is better to define
+// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
+// from false to true).
+type ConditionFunc func(event Event) (bool, error)
+
+// ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
+var ErrWatchClosed = errors.New("watch closed before Until timeout")
+
+// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
+// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
+// If no event has been received, the returned event will be nil.
+// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
+// A zero timeout means to wait forever.
+func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
+ ch := watcher.ResultChan()
+ defer watcher.Stop()
+ var after <-chan time.Time
+ if timeout > 0 {
+ after = time.After(timeout)
+ } else {
+ ch := make(chan time.Time)
+ defer close(ch)
+ after = ch
+ }
+ var lastEvent *Event
+ for _, condition := range conditions {
+ // check the next condition against the previous event and short circuit waiting for the next watch
+ if lastEvent != nil {
+ done, err := condition(*lastEvent)
+ if err != nil {
+ return lastEvent, err
+ }
+ if done {
+ continue
+ }
+ }
+ ConditionSucceeded:
+ for {
+ select {
+ case event, ok := <-ch:
+ if !ok {
+ return lastEvent, ErrWatchClosed
+ }
+ lastEvent = &event
+
+ // TODO: check for watch expired error and retry watch from latest point?
+ done, err := condition(event)
+ if err != nil {
+ return lastEvent, err
+ }
+ if done {
+ break ConditionSucceeded
+ }
+
+ case <-after:
+ return lastEvent, wait.ErrWaitTimeout
+ }
+ }
+ }
+ return lastEvent, nil
+}
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/watch.go b/vendor/k8s.io/apimachinery/pkg/watch/watch.go
new file mode 100644
index 000000000..dd49c41f9
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/watch/watch.go
@@ -0,0 +1,269 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+ "fmt"
+ "sync"
+
+ "k8s.io/apimachinery/pkg/runtime"
+
+ "github.com/golang/glog"
+)
+
+// Interface can be implemented by anything that knows how to watch and report changes.
+type Interface interface {
+ // Stops watching. Will close the channel returned by ResultChan(). Releases
+ // any resources used by the watch.
+ Stop()
+
+ // Returns a chan which will receive all the events. If an error occurs
+ // or Stop() is called, this channel will be closed, in which case the
+ // watch should be completely cleaned up.
+ ResultChan() <-chan Event
+}
+
+// EventType defines the possible types of events.
+type EventType string
+
+const (
+ Added EventType = "ADDED"
+ Modified EventType = "MODIFIED"
+ Deleted EventType = "DELETED"
+ Error EventType = "ERROR"
+
+ DefaultChanSize int32 = 100
+)
+
+// Event represents a single event to a watched resource.
+type Event struct {
+ Type EventType
+
+ // Object is:
+ // * If Type is Added or Modified: the new state of the object.
+ // * If Type is Deleted: the state of the object immediately before deletion.
+ // * If Type is Error: *api.Status is recommended; other types may make sense
+ // depending on context.
+ Object runtime.Object
+}
+
+type emptyWatch chan Event
+
+// NewEmptyWatch returns a watch interface that returns no results and is closed.
+// May be used in certain error conditions where no information is available but
+// an error is not warranted.
+func NewEmptyWatch() Interface {
+ ch := make(chan Event)
+ close(ch)
+ return emptyWatch(ch)
+}
+
+// Stop implements Interface
+func (w emptyWatch) Stop() {
+}
+
+// ResultChan implements Interface
+func (w emptyWatch) ResultChan() <-chan Event {
+ return chan Event(w)
+}
+
+// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
+type FakeWatcher struct {
+ result chan Event
+ Stopped bool
+ sync.Mutex
+}
+
+func NewFake() *FakeWatcher {
+ return &FakeWatcher{
+ result: make(chan Event),
+ }
+}
+
+func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
+ return &FakeWatcher{
+ result: make(chan Event, size),
+ }
+}
+
+// Stop implements Interface.Stop().
+func (f *FakeWatcher) Stop() {
+ f.Lock()
+ defer f.Unlock()
+ if !f.Stopped {
+ glog.V(4).Infof("Stopping fake watcher.")
+ close(f.result)
+ f.Stopped = true
+ }
+}
+
+func (f *FakeWatcher) IsStopped() bool {
+ f.Lock()
+ defer f.Unlock()
+ return f.Stopped
+}
+
+// Reset prepares the watcher to be reused.
+func (f *FakeWatcher) Reset() {
+ f.Lock()
+ defer f.Unlock()
+ f.Stopped = false
+ f.result = make(chan Event)
+}
+
+func (f *FakeWatcher) ResultChan() <-chan Event {
+ return f.result
+}
+
+// Add sends an add event.
+func (f *FakeWatcher) Add(obj runtime.Object) {
+ f.result <- Event{Added, obj}
+}
+
+// Modify sends a modify event.
+func (f *FakeWatcher) Modify(obj runtime.Object) {
+ f.result <- Event{Modified, obj}
+}
+
+// Delete sends a delete event.
+func (f *FakeWatcher) Delete(lastValue runtime.Object) {
+ f.result <- Event{Deleted, lastValue}
+}
+
+// Error sends an Error event.
+func (f *FakeWatcher) Error(errValue runtime.Object) {
+ f.result <- Event{Error, errValue}
+}
+
+// Action sends an event of the requested type, for table-based testing.
+func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
+ f.result <- Event{action, obj}
+}
+
+// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
+type RaceFreeFakeWatcher struct {
+ result chan Event
+ Stopped bool
+ sync.Mutex
+}
+
+func NewRaceFreeFake() *RaceFreeFakeWatcher {
+ return &RaceFreeFakeWatcher{
+ result: make(chan Event, DefaultChanSize),
+ }
+}
+
+// Stop implements Interface.Stop().
+func (f *RaceFreeFakeWatcher) Stop() {
+ f.Lock()
+ defer f.Unlock()
+ if !f.Stopped {
+ glog.V(4).Infof("Stopping fake watcher.")
+ close(f.result)
+ f.Stopped = true
+ }
+}
+
+func (f *RaceFreeFakeWatcher) IsStopped() bool {
+ f.Lock()
+ defer f.Unlock()
+ return f.Stopped
+}
+
+// Reset prepares the watcher to be reused.
+func (f *RaceFreeFakeWatcher) Reset() {
+ f.Lock()
+ defer f.Unlock()
+ f.Stopped = false
+ f.result = make(chan Event, DefaultChanSize)
+}
+
+func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
+ f.Lock()
+ defer f.Unlock()
+ return f.result
+}
+
+// Add sends an add event.
+func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
+ f.Lock()
+ defer f.Unlock()
+ if !f.Stopped {
+ select {
+ case f.result <- Event{Added, obj}:
+ return
+ default:
+ panic(fmt.Errorf("channel full"))
+ }
+ }
+}
+
+// Modify sends a modify event.
+func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
+ f.Lock()
+ defer f.Unlock()
+ if !f.Stopped {
+ select {
+ case f.result <- Event{Modified, obj}:
+ return
+ default:
+ panic(fmt.Errorf("channel full"))
+ }
+ }
+}
+
+// Delete sends a delete event.
+func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
+ f.Lock()
+ defer f.Unlock()
+ if !f.Stopped {
+ select {
+ case f.result <- Event{Deleted, lastValue}:
+ return
+ default:
+ panic(fmt.Errorf("channel full"))
+ }
+ }
+}
+
+// Error sends an Error event.
+func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
+ f.Lock()
+ defer f.Unlock()
+ if !f.Stopped {
+ select {
+ case f.result <- Event{Error, errValue}:
+ return
+ default:
+ panic(fmt.Errorf("channel full"))
+ }
+ }
+}
+
+// Action sends an event of the requested type, for table-based testing.
+func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
+ f.Lock()
+ defer f.Unlock()
+ if !f.Stopped {
+ select {
+ case f.result <- Event{action, obj}:
+ return
+ default:
+ panic(fmt.Errorf("channel full"))
+ }
+ }
+}