diff options
author | dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> | 2021-04-09 08:04:51 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-09 08:04:51 +0000 |
commit | 14375f35ee00c16327edcd0f5883cc66810fc7db (patch) | |
tree | d904edbc6162b8eddc563476614d03dd1eee75ed /vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go | |
parent | 4efac1f76012c35122bca7c8feebc33141fc47d3 (diff) | |
download | podman-14375f35ee00c16327edcd0f5883cc66810fc7db.tar.gz podman-14375f35ee00c16327edcd0f5883cc66810fc7db.tar.bz2 podman-14375f35ee00c16327edcd0f5883cc66810fc7db.zip |
Bump k8s.io/api from 0.20.5 to 0.21.0
Bumps [k8s.io/api](https://github.com/kubernetes/api) from 0.20.5 to 0.21.0.
- [Release notes](https://github.com/kubernetes/api/releases)
- [Commits](https://github.com/kubernetes/api/compare/v0.20.5...v0.21.0)
Signed-off-by: dependabot[bot] <support@github.com>
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go')
-rw-r--r-- | vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go | 36 |
1 files changed, 20 insertions, 16 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 99f6770b9..42dcac2b9 100644 --- a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -55,7 +55,7 @@ type StreamWatcher struct { source Decoder reporter Reporter result chan Event - stopped bool + done chan struct{} } // NewStreamWatcher creates a StreamWatcher from the given decoder. @@ -67,6 +67,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), } go sw.receive() return sw @@ -82,19 +87,15 @@ func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() - if !sw.stopped { - sw.stopped = true + // closing a closed channel always panics, therefore check before closing + select { + case <-sw.done: + default: + close(sw.done) sw.source.Close() } } -// stopping returns true if Stop() was called previously. -func (sw *StreamWatcher) stopping() bool { - sw.Lock() - defer sw.Unlock() - return sw.stopped -} - // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { defer utilruntime.HandleCrash() @@ -103,10 +104,6 @@ func (sw *StreamWatcher) receive() { for { action, obj, err := sw.source.Decode() if err != nil { - // Ignore expected error. - if sw.stopping() { - return - } switch err { case io.EOF: // watch closed normally @@ -116,17 +113,24 @@ func (sw *StreamWatcher) receive() { if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - sw.result <- Event{ + select { + case <-sw.done: + case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + }: } } } return } - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: action, Object: obj, + }: } } } |