summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/watch/mux.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/watch/mux.go')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/watch/mux.go65
1 files changed, 38 insertions, 27 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/watch/mux.go b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
index 0ac8dc4ef..0aaf01adc 100644
--- a/vendor/k8s.io/apimachinery/pkg/watch/mux.go
+++ b/vendor/k8s.io/apimachinery/pkg/watch/mux.go
@@ -40,15 +40,12 @@ const incomingQueueLength = 25
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
- // TODO: see if this lock is needed now that new watchers go through
- // the incoming channel.
- lock sync.Mutex
-
watchers map[int64]*broadcasterWatcher
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
+ stopped chan struct{}
// How large to make watcher's channel.
watchQueueLength int
@@ -68,6 +65,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
+ stopped: make(chan struct{}),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
@@ -96,10 +94,15 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
// The purpose of this terrible hack is so that watchers added after an event
// won't ever see that event, and will always see any event after they are
// added.
-func (b *Broadcaster) blockQueue(f func()) {
+func (m *Broadcaster) blockQueue(f func()) {
+ select {
+ case <-m.stopped:
+ return
+ default:
+ }
var wg sync.WaitGroup
wg.Add(1)
- b.incoming <- Event{
+ m.incoming <- Event{
Type: internalRunFunctionMarker,
Object: functionFakeRuntimeObject(func() {
defer wg.Done()
@@ -111,12 +114,11 @@ func (b *Broadcaster) blockQueue(f func()) {
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
-// of previous events.
+// of previous events. It will block until the watcher is actually added to the
+// broadcaster.
func (m *Broadcaster) Watch() Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
- m.lock.Lock()
- defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
@@ -127,18 +129,22 @@ func (m *Broadcaster) Watch() Interface {
}
m.watchers[id] = w
})
+ if w == nil {
+ // The panic here is to be consistent with the previous interface behavior
+ // we are willing to re-evaluate in the future.
+ panic("broadcaster already stopped")
+ }
return w
}
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
// The returned watch will have a queue length that is at least large enough to accommodate
-// all of the items in queuedEvents.
+// all of the items in queuedEvents. It will block until the watcher is actually added to
+// the broadcaster.
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
- m.lock.Lock()
- defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
length := m.watchQueueLength
@@ -156,26 +162,29 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
w.result <- e
}
})
+ if w == nil {
+ // The panic here is to be consistent with the previous interface behavior
+ // we are willing to re-evaluate in the future.
+ panic("broadcaster already stopped")
+ }
return w
}
// stopWatching stops the given watcher and removes it from the list.
func (m *Broadcaster) stopWatching(id int64) {
- m.lock.Lock()
- defer m.lock.Unlock()
- w, ok := m.watchers[id]
- if !ok {
- // No need to do anything, it's already been removed from the list.
- return
- }
- delete(m.watchers, id)
- close(w.result)
+ m.blockQueue(func() {
+ w, ok := m.watchers[id]
+ if !ok {
+ // No need to do anything, it's already been removed from the list.
+ return
+ }
+ delete(m.watchers, id)
+ close(w.result)
+ })
}
// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func (m *Broadcaster) closeAll() {
- m.lock.Lock()
- defer m.lock.Unlock()
for _, w := range m.watchers {
close(w.result)
}
@@ -194,9 +203,12 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
// until all events have been distributed through the outbound channels. Note
// that since they can be buffered, this means that the watchers might not
// have received the data yet as it can remain sitting in the buffered
-// channel.
+// channel. It will block until the broadcaster stop request is actually executed
func (m *Broadcaster) Shutdown() {
- close(m.incoming)
+ m.blockQueue(func() {
+ close(m.stopped)
+ close(m.incoming)
+ })
m.distributing.Wait()
}
@@ -217,8 +229,6 @@ func (m *Broadcaster) loop() {
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
- m.lock.Lock()
- defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
@@ -252,6 +262,7 @@ func (mw *broadcasterWatcher) ResultChan() <-chan Event {
}
// Stop stops watching and removes mw from its list.
+// It will block until the watcher stop request is actually executed
func (mw *broadcasterWatcher) Stop() {
mw.stop.Do(func() {
close(mw.stopped)