diff options
author | OpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com> | 2021-04-12 12:19:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-12 12:19:22 +0200 |
commit | 0ee1da50f5e221383c236ccee258cb7592ccabd6 (patch) | |
tree | 08b3798ab08a621920eff4d1f6a3a5f940d3a944 /vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go | |
parent | 669311d8d8a8881571ccc81ce48b9202b15b9def (diff) | |
parent | 14375f35ee00c16327edcd0f5883cc66810fc7db (diff) | |
download | podman-0ee1da50f5e221383c236ccee258cb7592ccabd6.tar.gz podman-0ee1da50f5e221383c236ccee258cb7592ccabd6.tar.bz2 podman-0ee1da50f5e221383c236ccee258cb7592ccabd6.zip |
Merge pull request #9981 from containers/dependabot/go_modules/k8s.io/api-0.21.0
Bump k8s.io/api from 0.20.5 to 0.21.0
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go')
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go | 36 |
1 files changed, 20 insertions, 16 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 99f6770b9..42dcac2b9 100644 --- a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -55,7 +55,7 @@ type StreamWatcher struct { source Decoder reporter Reporter result chan Event - stopped bool + done chan struct{} } // NewStreamWatcher creates a StreamWatcher from the given decoder. @@ -67,6 +67,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), } go sw.receive() return sw @@ -82,19 +87,15 @@ func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() - if !sw.stopped { - sw.stopped = true + // closing a closed channel always panics, therefore check before closing + select { + case <-sw.done: + default: + close(sw.done) sw.source.Close() } } -// stopping returns true if Stop() was called previously. -func (sw *StreamWatcher) stopping() bool { - sw.Lock() - defer sw.Unlock() - return sw.stopped -} - // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { defer utilruntime.HandleCrash() @@ -103,10 +104,6 @@ func (sw *StreamWatcher) receive() { for { action, obj, err := sw.source.Decode() if err != nil { - // Ignore expected error. - if sw.stopping() { - return - } switch err { case io.EOF: // watch closed normally @@ -116,17 +113,24 @@ func (sw *StreamWatcher) receive() { if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - sw.result <- Event{ + select { + case <-sw.done: + case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + }: } } } return } - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: action, Object: obj, + }: } } } |