aboutsummaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go36
1 files changed, 20 insertions, 16 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
index 99f6770b9..42dcac2b9 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
@@ -55,7 +55,7 @@ type StreamWatcher struct {
source Decoder
reporter Reporter
result chan Event
- stopped bool
+ done chan struct{}
}
// NewStreamWatcher creates a StreamWatcher from the given decoder.
@@ -67,6 +67,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
+ // If the watcher is externally stopped there is no receiver anymore
+ // and the send operations on the result channel, especially the
+ // error reporting might block forever.
+ // Therefore a dedicated stop channel is used to resolve this blocking.
+ done: make(chan struct{}),
}
go sw.receive()
return sw
@@ -82,19 +87,15 @@ func (sw *StreamWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
sw.Lock()
defer sw.Unlock()
- if !sw.stopped {
- sw.stopped = true
+ // closing a closed channel always panics, therefore check before closing
+ select {
+ case <-sw.done:
+ default:
+ close(sw.done)
sw.source.Close()
}
}
-// stopping returns true if Stop() was called previously.
-func (sw *StreamWatcher) stopping() bool {
- sw.Lock()
- defer sw.Unlock()
- return sw.stopped
-}
-
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
defer utilruntime.HandleCrash()
@@ -103,10 +104,6 @@ func (sw *StreamWatcher) receive() {
for {
action, obj, err := sw.source.Decode()
if err != nil {
- // Ignore expected error.
- if sw.stopping() {
- return
- }
switch err {
case io.EOF:
// watch closed normally
@@ -116,17 +113,24 @@ func (sw *StreamWatcher) receive() {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
- sw.result <- Event{
+ select {
+ case <-sw.done:
+ case sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
+ }:
}
}
}
return
}
- sw.result <- Event{
+ select {
+ case <-sw.done:
+ return
+ case sw.result <- Event{
Type: action,
Object: obj,
+ }:
}
}
}