summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch/mux.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch/mux.go')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/mux.go257
1 files changed, 257 insertions, 0 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/mux.go b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
new file mode 100644
index 000000000..fafccd78e
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
@@ -0,0 +1,257 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+ "sync"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+)
+
+// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
+// channel is full.
+type FullChannelBehavior int
+
+const (
+ WaitIfChannelFull FullChannelBehavior = iota
+ DropIfChannelFull
+)
+
+// Buffer the incoming queue a little bit even though it should rarely ever accumulate
+// anything, just in case a few events are received in such a short window that
+// Broadcaster can't move them onto the watchers' queues fast enough.
+const incomingQueueLength = 25
+
+// Broadcaster distributes event notifications among any number of watchers. Every event
+// is delivered to every watcher.
+type Broadcaster struct {
+ // TODO: see if this lock is needed now that new watchers go through
+ // the incoming channel.
+ lock sync.Mutex
+
+ watchers map[int64]*broadcasterWatcher
+ nextWatcher int64
+ distributing sync.WaitGroup
+
+ incoming chan Event
+
+ // How large to make watcher's channel.
+ watchQueueLength int
+ // If one of the watch channels is full, don't wait for it to become empty.
+ // Instead just deliver it to the watchers that do have space in their
+ // channels and move on to the next event.
+ // It's more fair to do this on a per-watcher basis than to do it on the
+ // "incoming" channel, which would allow one slow watcher to prevent all
+ // other watchers from getting new events.
+ fullChannelBehavior FullChannelBehavior
+}
+
+// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
+// It is guaranteed that events will be distributed in the order in which they occur,
+// but the order in which a single event is distributed among all of the watchers is unspecified.
+func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
+ m := &Broadcaster{
+ watchers: map[int64]*broadcasterWatcher{},
+ incoming: make(chan Event, incomingQueueLength),
+ watchQueueLength: queueLength,
+ fullChannelBehavior: fullChannelBehavior,
+ }
+ m.distributing.Add(1)
+ go m.loop()
+ return m
+}
+
+const internalRunFunctionMarker = "internal-do-function"
+
+// a function type we can shoehorn into the queue.
+type functionFakeRuntimeObject func()
+
+func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind {
+ return schema.EmptyObjectKind
+}
+
+// Execute f, blocking the incoming queue (and waiting for it to drain first).
+// The purpose of this terrible hack is so that watchers added after an event
+// won't ever see that event, and will always see any event after they are
+// added.
+func (b *Broadcaster) blockQueue(f func()) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ b.incoming <- Event{
+ Type: internalRunFunctionMarker,
+ Object: functionFakeRuntimeObject(func() {
+ defer wg.Done()
+ f()
+ }),
+ }
+ wg.Wait()
+}
+
+// Watch adds a new watcher to the list and returns an Interface for it.
+// Note: new watchers will only receive new events. They won't get an entire history
+// of previous events.
+func (m *Broadcaster) Watch() Interface {
+ var w *broadcasterWatcher
+ m.blockQueue(func() {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ id := m.nextWatcher
+ m.nextWatcher++
+ w = &broadcasterWatcher{
+ result: make(chan Event, m.watchQueueLength),
+ stopped: make(chan struct{}),
+ id: id,
+ m: m,
+ }
+ m.watchers[id] = w
+ })
+ return w
+}
+
+// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
+// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
+// The returned watch will have a queue length that is at least large enough to accommodate
+// all of the items in queuedEvents.
+func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
+ var w *broadcasterWatcher
+ m.blockQueue(func() {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ id := m.nextWatcher
+ m.nextWatcher++
+ length := m.watchQueueLength
+ if n := len(queuedEvents) + 1; n > length {
+ length = n
+ }
+ w = &broadcasterWatcher{
+ result: make(chan Event, length),
+ stopped: make(chan struct{}),
+ id: id,
+ m: m,
+ }
+ m.watchers[id] = w
+ for _, e := range queuedEvents {
+ w.result <- e
+ }
+ })
+ return w
+}
+
+// stopWatching stops the given watcher and removes it from the list.
+func (m *Broadcaster) stopWatching(id int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ w, ok := m.watchers[id]
+ if !ok {
+ // No need to do anything, it's already been removed from the list.
+ return
+ }
+ delete(m.watchers, id)
+ close(w.result)
+}
+
+// closeAll disconnects all watchers (presumably in response to a Shutdown call).
+func (m *Broadcaster) closeAll() {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ for _, w := range m.watchers {
+ close(w.result)
+ }
+ // Delete everything from the map, since presence/absence in the map is used
+ // by stopWatching to avoid double-closing the channel.
+ m.watchers = map[int64]*broadcasterWatcher{}
+}
+
+// Action distributes the given event among all watchers.
+func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
+ m.incoming <- Event{action, obj}
+}
+
+// Shutdown disconnects all watchers (but any queued events will still be distributed).
+// You must not call Action or Watch* after calling Shutdown. This call blocks
+// until all events have been distributed through the outbound channels. Note
+// that since they can be buffered, this means that the watchers might not
+// have received the data yet as it can remain sitting in the buffered
+// channel.
+func (m *Broadcaster) Shutdown() {
+ close(m.incoming)
+ m.distributing.Wait()
+}
+
+// loop receives from m.incoming and distributes to all watchers.
+func (m *Broadcaster) loop() {
+ // Deliberately not catching crashes here. Yes, bring down the process if there's a
+ // bug in watch.Broadcaster.
+ for {
+ event, ok := <-m.incoming
+ if !ok {
+ break
+ }
+ if event.Type == internalRunFunctionMarker {
+ event.Object.(functionFakeRuntimeObject)()
+ continue
+ }
+ m.distribute(event)
+ }
+ m.closeAll()
+ m.distributing.Done()
+}
+
+// distribute sends event to all watchers. Blocking.
+func (m *Broadcaster) distribute(event Event) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ if m.fullChannelBehavior == DropIfChannelFull {
+ for _, w := range m.watchers {
+ select {
+ case w.result <- event:
+ case <-w.stopped:
+ default: // Don't block if the event can't be queued.
+ }
+ }
+ } else {
+ for _, w := range m.watchers {
+ select {
+ case w.result <- event:
+ case <-w.stopped:
+ }
+ }
+ }
+}
+
+// broadcasterWatcher handles a single watcher of a broadcaster
+type broadcasterWatcher struct {
+ result chan Event
+ stopped chan struct{}
+ stop sync.Once
+ id int64
+ m *Broadcaster
+}
+
+// ResultChan returns a channel to use for waiting on events.
+func (mw *broadcasterWatcher) ResultChan() <-chan Event {
+ return mw.result
+}
+
+// Stop stops watching and removes mw from its list.
+func (mw *broadcasterWatcher) Stop() {
+ mw.stop.Do(func() {
+ close(mw.stopped)
+ mw.m.stopWatching(mw.id)
+ })
+}