summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/mux.go28
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go36
2 files changed, 48 insertions, 16 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/mux.go b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
index 0aaf01adc..e01d51906 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/mux.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
@@ -74,6 +74,22 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
return m
}
+// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
+// except that the incoming queue is the same size as the outgoing queues
+// (specified by queueLength).
+func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
+ m := &Broadcaster{
+ watchers: map[int64]*broadcasterWatcher{},
+ incoming: make(chan Event, queueLength),
+ stopped: make(chan struct{}),
+ watchQueueLength: queueLength,
+ fullChannelBehavior: fullChannelBehavior,
+ }
+ m.distributing.Add(1)
+ go m.loop()
+ return m
+}
+
const internalRunFunctionMarker = "internal-do-function"
// a function type we can shoehorn into the queue.
@@ -198,6 +214,18 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
+// Action distributes the given event among all watchers, or drops it on the floor
+// if too many incoming actions are queued up. Returns true if the action was sent,
+// false if dropped.
+func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
+ select {
+ case m.incoming <- Event{action, obj}:
+ return true
+ default:
+ return false
+ }
+}
+
// Shutdown disconnects all watchers (but any queued events will still be distributed).
// You must not call Action or Watch* after calling Shutdown. This call blocks
// until all events have been distributed through the outbound channels. Note
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
index 99f6770b9..42dcac2b9 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/streamwatcher.go
@@ -55,7 +55,7 @@ type StreamWatcher struct {
source Decoder
reporter Reporter
result chan Event
- stopped bool
+ done chan struct{}
}
// NewStreamWatcher creates a StreamWatcher from the given decoder.
@@ -67,6 +67,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
+ // If the watcher is externally stopped there is no receiver anymore
+ // and the send operations on the result channel, especially the
+ // error reporting might block forever.
+ // Therefore a dedicated stop channel is used to resolve this blocking.
+ done: make(chan struct{}),
}
go sw.receive()
return sw
@@ -82,19 +87,15 @@ func (sw *StreamWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
sw.Lock()
defer sw.Unlock()
- if !sw.stopped {
- sw.stopped = true
+ // closing a closed channel always panics, therefore check before closing
+ select {
+ case <-sw.done:
+ default:
+ close(sw.done)
sw.source.Close()
}
}
-// stopping returns true if Stop() was called previously.
-func (sw *StreamWatcher) stopping() bool {
- sw.Lock()
- defer sw.Unlock()
- return sw.stopped
-}
-
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
defer utilruntime.HandleCrash()
@@ -103,10 +104,6 @@ func (sw *StreamWatcher) receive() {
for {
action, obj, err := sw.source.Decode()
if err != nil {
- // Ignore expected error.
- if sw.stopping() {
- return
- }
switch err {
case io.EOF:
// watch closed normally
@@ -116,17 +113,24 @@ func (sw *StreamWatcher) receive() {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
- sw.result <- Event{
+ select {
+ case <-sw.done:
+ case sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
+ }:
}
}
}
return
}
- sw.result <- Event{
+ select {
+ case <-sw.done:
+ return
+ case sw.result <- Event{
Type: action,
Object: obj,
+ }:
}
}
}