aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
diff options
context:
space:
mode:
authorValentin Rothberg <vrothberg@redhat.com>2022-06-08 11:55:13 +0200
committerValentin Rothberg <vrothberg@redhat.com>2022-06-10 09:42:19 +0200
commit46c8da7d9acd6011f318ce8fa9c38591888654f0 (patch)
tree54e097a523e9d67eb97b65e963a7c46bf2f7973d /vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
parent9f1bd0a0a1494f46a49ca7f22511c5a78006afd8 (diff)
downloadpodman-46c8da7d9acd6011f318ce8fa9c38591888654f0.tar.gz
podman-46c8da7d9acd6011f318ce8fa9c38591888654f0.tar.bz2
podman-46c8da7d9acd6011f318ce8fa9c38591888654f0.zip
vendor buildah@main
Note that the bud-logfile-with-split-logfile-by-platform test is skipped on the remote client (see #14544). Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
Diffstat (limited to 'vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go')
-rw-r--r--vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go111
1 files changed, 111 insertions, 0 deletions
diff --git a/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go b/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
new file mode 100644
index 000000000..e177c9a62
--- /dev/null
+++ b/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
@@ -0,0 +1,111 @@
+package queue
+
+import (
+ "errors"
+ "sync"
+)
+
+var (
+ ErrQueueClosed = errors.New("the queue is closed for reading and writing")
+ ErrQueueEmpty = errors.New("the queue is empty")
+)
+
+// MessageQueue represents a threadsafe message queue to be used to retrieve or
+// write messages to.
+type MessageQueue struct {
+ m *sync.RWMutex
+ c *sync.Cond
+ messages []interface{}
+ closed bool
+}
+
+// NewMessageQueue returns a new MessageQueue.
+func NewMessageQueue() *MessageQueue {
+ m := &sync.RWMutex{}
+ return &MessageQueue{
+ m: m,
+ c: sync.NewCond(m),
+ messages: []interface{}{},
+ }
+}
+
+// Write writes `msg` to the queue.
+func (mq *MessageQueue) Write(msg interface{}) error {
+ mq.m.Lock()
+ defer mq.m.Unlock()
+
+ if mq.closed {
+ return ErrQueueClosed
+ }
+ mq.messages = append(mq.messages, msg)
+ // Signal a waiter that there is now a value available in the queue.
+ mq.c.Signal()
+ return nil
+}
+
+// Read will read a value from the queue if available, otherwise return an error.
+func (mq *MessageQueue) Read() (interface{}, error) {
+ mq.m.Lock()
+ defer mq.m.Unlock()
+ if mq.closed {
+ return nil, ErrQueueClosed
+ }
+ if mq.isEmpty() {
+ return nil, ErrQueueEmpty
+ }
+ val := mq.messages[0]
+ mq.messages[0] = nil
+ mq.messages = mq.messages[1:]
+ return val, nil
+}
+
+// ReadOrWait will read a value from the queue if available, else it will wait for a
+// value to become available. This will block forever if nothing gets written or until
+// the queue gets closed.
+func (mq *MessageQueue) ReadOrWait() (interface{}, error) {
+ mq.m.Lock()
+ if mq.closed {
+ mq.m.Unlock()
+ return nil, ErrQueueClosed
+ }
+ if mq.isEmpty() {
+ for !mq.closed && mq.isEmpty() {
+ mq.c.Wait()
+ }
+ mq.m.Unlock()
+ return mq.Read()
+ }
+ val := mq.messages[0]
+ mq.messages[0] = nil
+ mq.messages = mq.messages[1:]
+ mq.m.Unlock()
+ return val, nil
+}
+
+// IsEmpty returns if the queue is empty
+func (mq *MessageQueue) IsEmpty() bool {
+ mq.m.RLock()
+ defer mq.m.RUnlock()
+ return len(mq.messages) == 0
+}
+
+// Nonexported empty check that doesn't lock so we can call this in Read and Write.
+func (mq *MessageQueue) isEmpty() bool {
+ return len(mq.messages) == 0
+}
+
+// Close closes the queue for future writes or reads. Any attempts to read or write from the
+// queue after close will return ErrQueueClosed. This is safe to call multiple times.
+func (mq *MessageQueue) Close() {
+ mq.m.Lock()
+ defer mq.m.Unlock()
+ // Already closed
+ if mq.closed {
+ return
+ }
+ mq.messages = nil
+ mq.closed = true
+ // If there's anybody currently waiting on a value from ReadOrWait, we need to
+ // broadcast so the read(s) can return ErrQueueClosed.
+ mq.c.Broadcast()
+}