summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
diff options
context:
space:
mode:
authordependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2021-04-09 08:04:51 +0000
committerGitHub <noreply@github.com>2021-04-09 08:04:51 +0000
commit14375f35ee00c16327edcd0f5883cc66810fc7db (patch)
treed904edbc6162b8eddc563476614d03dd1eee75ed /vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
parent4efac1f76012c35122bca7c8feebc33141fc47d3 (diff)
downloadpodman-14375f35ee00c16327edcd0f5883cc66810fc7db.tar.gz
podman-14375f35ee00c16327edcd0f5883cc66810fc7db.tar.bz2
podman-14375f35ee00c16327edcd0f5883cc66810fc7db.zip
Bump k8s.io/api from 0.20.5 to 0.21.0
Bumps [k8s.io/api](https://github.com/kubernetes/api) from 0.20.5 to 0.21.0. - [Release notes](https://github.com/kubernetes/api/releases) - [Commits](https://github.com/kubernetes/api/compare/v0.20.5...v0.21.0) Signed-off-by: dependabot[bot] <support@github.com>
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go36
1 files changed, 20 insertions, 16 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
index 99f6770b9..42dcac2b9 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
@@ -55,7 +55,7 @@ type StreamWatcher struct {
source Decoder
reporter Reporter
result chan Event
- stopped bool
+ done chan struct{}
}
// NewStreamWatcher creates a StreamWatcher from the given decoder.
@@ -67,6 +67,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
+ // If the watcher is externally stopped there is no receiver anymore
+ // and the send operations on the result channel, especially the
+ // error reporting might block forever.
+ // Therefore a dedicated stop channel is used to resolve this blocking.
+ done: make(chan struct{}),
}
go sw.receive()
return sw
@@ -82,19 +87,15 @@ func (sw *StreamWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
sw.Lock()
defer sw.Unlock()
- if !sw.stopped {
- sw.stopped = true
+ // closing a closed channel always panics, therefore check before closing
+ select {
+ case <-sw.done:
+ default:
+ close(sw.done)
sw.source.Close()
}
}
-// stopping returns true if Stop() was called previously.
-func (sw *StreamWatcher) stopping() bool {
- sw.Lock()
- defer sw.Unlock()
- return sw.stopped
-}
-
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
defer utilruntime.HandleCrash()
@@ -103,10 +104,6 @@ func (sw *StreamWatcher) receive() {
for {
action, obj, err := sw.source.Decode()
if err != nil {
- // Ignore expected error.
- if sw.stopping() {
- return
- }
switch err {
case io.EOF:
// watch closed normally
@@ -116,17 +113,24 @@ func (sw *StreamWatcher) receive() {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
- sw.result <- Event{
+ select {
+ case <-sw.done:
+ case sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
+ }:
}
}
}
return
}
- sw.result <- Event{
+ select {
+ case <-sw.done:
+ return
+ case sw.result <- Event{
Type: action,
Object: obj,
+ }:
}
}
}