summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/filter.go6
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/mux.go6
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go33
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/until.go87
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/watch.go58
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/zz_generated.deepcopy.go6
6 files changed, 82 insertions, 114 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/filter.go b/vendor/k8s.io/apimachinery/pkg/watch/filter.go
index 3ca27f22c..22c9449f5 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/filter.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/filter.go
@@ -62,11 +62,7 @@ func (fw *filteredWatch) Stop() {
// loop waits for new values, filters them, and resends them.
func (fw *filteredWatch) loop() {
defer close(fw.result)
- for {
- event, ok := <-fw.incoming.ResultChan()
- if !ok {
- break
- }
+ for event := range fw.incoming.ResultChan() {
filtered, keep := fw.f(event)
if keep {
fw.result <- filtered
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/mux.go b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
index a65088c1c..0ac8dc4ef 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/mux.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
@@ -204,11 +204,7 @@ func (m *Broadcaster) Shutdown() {
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
- for {
- event, ok := <-m.incoming
- if !ok {
- break
- }
+ for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
index 93bb1cdf7..8af256eb1 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
@@ -17,10 +17,12 @@ limitations under the License.
package watch
import (
+ "fmt"
"io"
"sync"
- "github.com/golang/glog"
+ "k8s.io/klog"
+
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -39,19 +41,28 @@ type Decoder interface {
Close()
}
+// Reporter hides the details of how an error is turned into a runtime.Object for
+// reporting on a watch stream since this package may not import a higher level report.
+type Reporter interface {
+ // AsObject must convert err into a valid runtime.Object for the watch stream.
+ AsObject(err error) runtime.Object
+}
+
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
sync.Mutex
- source Decoder
- result chan Event
- stopped bool
+ source Decoder
+ reporter Reporter
+ result chan Event
+ stopped bool
}
// NewStreamWatcher creates a StreamWatcher from the given decoder.
-func NewStreamWatcher(d Decoder) *StreamWatcher {
+func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
sw := &StreamWatcher{
- source: d,
+ source: d,
+ reporter: r,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
@@ -100,13 +111,15 @@ func (sw *StreamWatcher) receive() {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
- glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
+ klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
default:
- msg := "Unable to decode an event from the watch stream: %v"
if net.IsProbableEOF(err) {
- glog.V(5).Infof(msg, err)
+ klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
- glog.Errorf(msg, err)
+ sw.result <- Event{
+ Type: Error,
+ Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
+ }
}
}
return
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/until.go b/vendor/k8s.io/apimachinery/pkg/watch/until.go
deleted file mode 100644
index c2772ddb5..000000000
--- a/vendor/k8s.io/apimachinery/pkg/watch/until.go
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package watch
-
-import (
- "errors"
- "time"
-
- "k8s.io/apimachinery/pkg/util/wait"
-)
-
-// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
-// or an error if the condition cannot be checked and should terminate. In general, it is better to define
-// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
-// from false to true).
-type ConditionFunc func(event Event) (bool, error)
-
-// ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
-var ErrWatchClosed = errors.New("watch closed before Until timeout")
-
-// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
-// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
-// If no event has been received, the returned event will be nil.
-// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
-// A zero timeout means to wait forever.
-func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
- ch := watcher.ResultChan()
- defer watcher.Stop()
- var after <-chan time.Time
- if timeout > 0 {
- after = time.After(timeout)
- } else {
- ch := make(chan time.Time)
- defer close(ch)
- after = ch
- }
- var lastEvent *Event
- for _, condition := range conditions {
- // check the next condition against the previous event and short circuit waiting for the next watch
- if lastEvent != nil {
- done, err := condition(*lastEvent)
- if err != nil {
- return lastEvent, err
- }
- if done {
- continue
- }
- }
- ConditionSucceeded:
- for {
- select {
- case event, ok := <-ch:
- if !ok {
- return lastEvent, ErrWatchClosed
- }
- lastEvent = &event
-
- // TODO: check for watch expired error and retry watch from latest point?
- done, err := condition(event)
- if err != nil {
- return lastEvent, err
- }
- if done {
- break ConditionSucceeded
- }
-
- case <-after:
- return lastEvent, wait.ErrWaitTimeout
- }
- }
- }
- return lastEvent, nil
-}
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/watch.go b/vendor/k8s.io/apimachinery/pkg/watch/watch.go
index 5c1380b23..3945be3ae 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/watch.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/watch.go
@@ -20,7 +20,7 @@ import (
"fmt"
"sync"
- "github.com/golang/glog"
+ "k8s.io/klog"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -44,6 +44,7 @@ const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
+ Bookmark EventType = "BOOKMARK"
Error EventType = "ERROR"
DefaultChanSize int32 = 100
@@ -57,6 +58,10 @@ type Event struct {
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
+ // * If Type is Bookmark: the object (instance of a type being watched) where
+ // only ResourceVersion field is set. On successful restart of watch from a
+ // bookmark resourceVersion, client is guaranteed to not get repeat event
+ // nor miss any events.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.Object
@@ -106,7 +111,7 @@ func (f *FakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
- glog.V(4).Infof("Stopping fake watcher.")
+ klog.V(4).Infof("Stopping fake watcher.")
close(f.result)
f.Stopped = true
}
@@ -173,7 +178,7 @@ func (f *RaceFreeFakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
- glog.V(4).Infof("Stopping fake watcher.")
+ klog.V(4).Infof("Stopping fake watcher.")
close(f.result)
f.Stopped = true
}
@@ -268,3 +273,50 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
}
}
}
+
+// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
+type ProxyWatcher struct {
+ result chan Event
+ stopCh chan struct{}
+
+ mutex sync.Mutex
+ stopped bool
+}
+
+var _ Interface = &ProxyWatcher{}
+
+// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
+func NewProxyWatcher(ch chan Event) *ProxyWatcher {
+ return &ProxyWatcher{
+ result: ch,
+ stopCh: make(chan struct{}),
+ stopped: false,
+ }
+}
+
+// Stop implements Interface
+func (pw *ProxyWatcher) Stop() {
+ pw.mutex.Lock()
+ defer pw.mutex.Unlock()
+ if !pw.stopped {
+ pw.stopped = true
+ close(pw.stopCh)
+ }
+}
+
+// Stopping returns true if Stop() has been called
+func (pw *ProxyWatcher) Stopping() bool {
+ pw.mutex.Lock()
+ defer pw.mutex.Unlock()
+ return pw.stopped
+}
+
+// ResultChan implements Interface
+func (pw *ProxyWatcher) ResultChan() <-chan Event {
+ return pw.result
+}
+
+// StopChan returns stop channel
+func (pw *ProxyWatcher) StopChan() <-chan struct{} {
+ return pw.stopCh
+}
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/zz_generated.deepcopy.go b/vendor/k8s.io/apimachinery/pkg/watch/zz_generated.deepcopy.go
index b1b19d118..71ef4da33 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/zz_generated.deepcopy.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/zz_generated.deepcopy.go
@@ -1,7 +1,7 @@
// +build !ignore_autogenerated
/*
-Copyright 2018 The Kubernetes Authors.
+Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -23,9 +23,7 @@ package watch
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Event) DeepCopyInto(out *Event) {
*out = *in
- if in.Object == nil {
- out.Object = nil
- } else {
+ if in.Object != nil {
out.Object = in.Object.DeepCopyObject()
}
return