diff options
Diffstat (limited to 'vendor/github.com/digitalocean/go-libvirt/internal/event/stream.go')
-rw-r--r-- | vendor/github.com/digitalocean/go-libvirt/internal/event/stream.go | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/vendor/github.com/digitalocean/go-libvirt/internal/event/stream.go b/vendor/github.com/digitalocean/go-libvirt/internal/event/stream.go new file mode 100644 index 000000000..b14e41b64 --- /dev/null +++ b/vendor/github.com/digitalocean/go-libvirt/internal/event/stream.go @@ -0,0 +1,145 @@ +// Copyright 2020 The go-libvirt Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import "context" + +// Stream is an unbounded buffered event channel. The implementation +// consists of a pair of unbuffered channels and a goroutine to manage them. +// Client behavior will not cause incoming events to block. +type Stream struct { + // Program specifies the source of the events - libvirt or QEMU. + Program uint32 + + // CallbackID is returned by the event registration call. + CallbackID int32 + + // manage unbounded channel behavior. + queue []Event + in, out chan Event + + // terminates processing + shutdown context.CancelFunc +} + +// Recv returns the next available event from the Stream's queue. +func (s *Stream) Recv() chan Event { + return s.out +} + +// Push appends a new event to the queue. +func (s *Stream) Push(e Event) { + s.in <- e +} + +// Shutdown gracefully terminates Stream processing, releasing all +// internal resources. Events which have not yet been received by the client +// will be dropped. Subsequent calls to Shutdown() are idempotent. +func (s *Stream) Shutdown() { + if s.shutdown != nil { + s.shutdown() + } +} + +// start starts the event processing loop, which will continue to run until +// terminated by the returned context.CancelFunc. Starting a previously started +// Stream is an idempotent operation. +func (s *Stream) start() context.CancelFunc { + ctx, cancel := context.WithCancel(context.Background()) + + go s.process(ctx) + + return cancel +} + +// process manages an Stream's lifecycle until canceled by the provided +// context. Incoming events are appended to a queue which is then relayed to +// the a listening client. New events pushed onto the queue will not block due +// to client behavior. +func (s *Stream) process(ctx context.Context) { + defer func() { + close(s.in) + close(s.out) + }() + + for { + // informs send() to stop trying + nctx, next := context.WithCancel(ctx) + defer next() + + select { + // new event received, append to queue + case e := <-s.in: + s.queue = append(s.queue, e) + + // client recieved an event, pop from queue + case <-s.send(nctx): + if len(s.queue) > 1 { + s.queue = s.queue[1:] + } else { + s.queue = []Event{} + } + + // shutdown requested + case <-ctx.Done(): + return + + } + + next() + } +} + +// send returns a channel which blocks until either the first item on the queue +// (if existing) is sent to the client, or the provided context is canceled. +// The stream's queue is never modified. +func (s *Stream) send(ctx context.Context) <-chan struct{} { + ch := make(chan struct{}) + + go func() { + defer close(ch) + + // do nothing and block if the queue is empty + if len(s.queue) == 0 { + <-ctx.Done() + return + } + + // otherwise, attempt to send the event + select { + case s.out <- s.queue[0]: + case <-ctx.Done(): + } + }() + + return ch +} + +// NewStream configures a new Event Stream. Incoming events are appended to a +// queue, which is then relayed to the listening client. Client behavior will +// not cause incoming events to block. It is the responsibility of the caller +// to terminate the Stream via Shutdown() when no longer in use. +func NewStream(program uint32, cbID int32) *Stream { + ic := &Stream{ + Program: program, + CallbackID: cbID, + in: make(chan Event), + out: make(chan Event), + } + + ic.shutdown = ic.start() + + return ic +} |