diff options
author | Valentin Rothberg <vrothberg@redhat.com> | 2022-06-08 11:55:13 +0200 |
---|---|---|
committer | Valentin Rothberg <vrothberg@redhat.com> | 2022-06-10 09:42:19 +0200 |
commit | 46c8da7d9acd6011f318ce8fa9c38591888654f0 (patch) | |
tree | 54e097a523e9d67eb97b65e963a7c46bf2f7973d /vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go | |
parent | 9f1bd0a0a1494f46a49ca7f22511c5a78006afd8 (diff) | |
download | podman-46c8da7d9acd6011f318ce8fa9c38591888654f0.tar.gz podman-46c8da7d9acd6011f318ce8fa9c38591888654f0.tar.bz2 podman-46c8da7d9acd6011f318ce8fa9c38591888654f0.zip |
vendor buildah@main
Note that the bud-logfile-with-split-logfile-by-platform test is skipped
on the remote client (see #14544).
Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
Diffstat (limited to 'vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go')
-rw-r--r-- | vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go b/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go new file mode 100644 index 000000000..e177c9a62 --- /dev/null +++ b/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go @@ -0,0 +1,111 @@ +package queue + +import ( + "errors" + "sync" +) + +var ( + ErrQueueClosed = errors.New("the queue is closed for reading and writing") + ErrQueueEmpty = errors.New("the queue is empty") +) + +// MessageQueue represents a threadsafe message queue to be used to retrieve or +// write messages to. +type MessageQueue struct { + m *sync.RWMutex + c *sync.Cond + messages []interface{} + closed bool +} + +// NewMessageQueue returns a new MessageQueue. +func NewMessageQueue() *MessageQueue { + m := &sync.RWMutex{} + return &MessageQueue{ + m: m, + c: sync.NewCond(m), + messages: []interface{}{}, + } +} + +// Write writes `msg` to the queue. +func (mq *MessageQueue) Write(msg interface{}) error { + mq.m.Lock() + defer mq.m.Unlock() + + if mq.closed { + return ErrQueueClosed + } + mq.messages = append(mq.messages, msg) + // Signal a waiter that there is now a value available in the queue. + mq.c.Signal() + return nil +} + +// Read will read a value from the queue if available, otherwise return an error. +func (mq *MessageQueue) Read() (interface{}, error) { + mq.m.Lock() + defer mq.m.Unlock() + if mq.closed { + return nil, ErrQueueClosed + } + if mq.isEmpty() { + return nil, ErrQueueEmpty + } + val := mq.messages[0] + mq.messages[0] = nil + mq.messages = mq.messages[1:] + return val, nil +} + +// ReadOrWait will read a value from the queue if available, else it will wait for a +// value to become available. This will block forever if nothing gets written or until +// the queue gets closed. +func (mq *MessageQueue) ReadOrWait() (interface{}, error) { + mq.m.Lock() + if mq.closed { + mq.m.Unlock() + return nil, ErrQueueClosed + } + if mq.isEmpty() { + for !mq.closed && mq.isEmpty() { + mq.c.Wait() + } + mq.m.Unlock() + return mq.Read() + } + val := mq.messages[0] + mq.messages[0] = nil + mq.messages = mq.messages[1:] + mq.m.Unlock() + return val, nil +} + +// IsEmpty returns if the queue is empty +func (mq *MessageQueue) IsEmpty() bool { + mq.m.RLock() + defer mq.m.RUnlock() + return len(mq.messages) == 0 +} + +// Nonexported empty check that doesn't lock so we can call this in Read and Write. +func (mq *MessageQueue) isEmpty() bool { + return len(mq.messages) == 0 +} + +// Close closes the queue for future writes or reads. Any attempts to read or write from the +// queue after close will return ErrQueueClosed. This is safe to call multiple times. +func (mq *MessageQueue) Close() { + mq.m.Lock() + defer mq.m.Unlock() + // Already closed + if mq.closed { + return + } + mq.messages = nil + mq.closed = true + // If there's anybody currently waiting on a value from ReadOrWait, we need to + // broadcast so the read(s) can return ErrQueueClosed. + mq.c.Broadcast() +} |