summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch/watch.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch/watch.go')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/watch.go58
1 files changed, 55 insertions, 3 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/watch.go b/vendor/k8s.io/apimachinery/pkg/watch/watch.go
index 5c1380b23..3945be3ae 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/watch.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/watch.go
@@ -20,7 +20,7 @@ import (
"fmt"
"sync"
- "github.com/golang/glog"
+ "k8s.io/klog"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -44,6 +44,7 @@ const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
+ Bookmark EventType = "BOOKMARK"
Error EventType = "ERROR"
DefaultChanSize int32 = 100
@@ -57,6 +58,10 @@ type Event struct {
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
+ // * If Type is Bookmark: the object (instance of a type being watched) where
+ // only ResourceVersion field is set. On successful restart of watch from a
+ // bookmark resourceVersion, client is guaranteed to not get repeat event
+ // nor miss any events.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.Object
@@ -106,7 +111,7 @@ func (f *FakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
- glog.V(4).Infof("Stopping fake watcher.")
+ klog.V(4).Infof("Stopping fake watcher.")
close(f.result)
f.Stopped = true
}
@@ -173,7 +178,7 @@ func (f *RaceFreeFakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
- glog.V(4).Infof("Stopping fake watcher.")
+ klog.V(4).Infof("Stopping fake watcher.")
close(f.result)
f.Stopped = true
}
@@ -268,3 +273,50 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
}
}
}
+
+// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
+type ProxyWatcher struct {
+ result chan Event
+ stopCh chan struct{}
+
+ mutex sync.Mutex
+ stopped bool
+}
+
+var _ Interface = &ProxyWatcher{}
+
+// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
+func NewProxyWatcher(ch chan Event) *ProxyWatcher {
+ return &ProxyWatcher{
+ result: ch,
+ stopCh: make(chan struct{}),
+ stopped: false,
+ }
+}
+
+// Stop implements Interface
+func (pw *ProxyWatcher) Stop() {
+ pw.mutex.Lock()
+ defer pw.mutex.Unlock()
+ if !pw.stopped {
+ pw.stopped = true
+ close(pw.stopCh)
+ }
+}
+
+// Stopping returns true if Stop() has been called
+func (pw *ProxyWatcher) Stopping() bool {
+ pw.mutex.Lock()
+ defer pw.mutex.Unlock()
+ return pw.stopped
+}
+
+// ResultChan implements Interface
+func (pw *ProxyWatcher) ResultChan() <-chan Event {
+ return pw.result
+}
+
+// StopChan returns stop channel
+func (pw *ProxyWatcher) StopChan() <-chan struct{} {
+ return pw.stopCh
+}