aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go')
-rw-r--r--vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go61
1 files changed, 21 insertions, 40 deletions
diff --git a/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go b/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
index e177c9a62..4eb9bb9f1 100644
--- a/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
+++ b/vendor/github.com/Microsoft/hcsshim/internal/queue/mq.go
@@ -5,10 +5,7 @@ import (
"sync"
)
-var (
- ErrQueueClosed = errors.New("the queue is closed for reading and writing")
- ErrQueueEmpty = errors.New("the queue is empty")
-)
+var ErrQueueClosed = errors.New("the queue is closed for reading and writing")
// MessageQueue represents a threadsafe message queue to be used to retrieve or
// write messages to.
@@ -29,8 +26,8 @@ func NewMessageQueue() *MessageQueue {
}
}
-// Write writes `msg` to the queue.
-func (mq *MessageQueue) Write(msg interface{}) error {
+// Enqueue writes `msg` to the queue.
+func (mq *MessageQueue) Enqueue(msg interface{}) error {
mq.m.Lock()
defer mq.m.Unlock()
@@ -43,55 +40,37 @@ func (mq *MessageQueue) Write(msg interface{}) error {
return nil
}
-// Read will read a value from the queue if available, otherwise return an error.
-func (mq *MessageQueue) Read() (interface{}, error) {
+// Dequeue will read a value from the queue and remove it. If the queue
+// is empty, this will block until the queue is closed or a value gets enqueued.
+func (mq *MessageQueue) Dequeue() (interface{}, error) {
mq.m.Lock()
defer mq.m.Unlock()
- if mq.closed {
- return nil, ErrQueueClosed
- }
- if mq.isEmpty() {
- return nil, ErrQueueEmpty
+
+ for !mq.closed && mq.size() == 0 {
+ mq.c.Wait()
}
- val := mq.messages[0]
- mq.messages[0] = nil
- mq.messages = mq.messages[1:]
- return val, nil
-}
-// ReadOrWait will read a value from the queue if available, else it will wait for a
-// value to become available. This will block forever if nothing gets written or until
-// the queue gets closed.
-func (mq *MessageQueue) ReadOrWait() (interface{}, error) {
- mq.m.Lock()
+ // We got woken up, check if it's because the queue got closed.
if mq.closed {
- mq.m.Unlock()
return nil, ErrQueueClosed
}
- if mq.isEmpty() {
- for !mq.closed && mq.isEmpty() {
- mq.c.Wait()
- }
- mq.m.Unlock()
- return mq.Read()
- }
+
val := mq.messages[0]
mq.messages[0] = nil
mq.messages = mq.messages[1:]
- mq.m.Unlock()
return val, nil
}
-// IsEmpty returns if the queue is empty
-func (mq *MessageQueue) IsEmpty() bool {
+// Size returns the size of the queue.
+func (mq *MessageQueue) Size() int {
mq.m.RLock()
defer mq.m.RUnlock()
- return len(mq.messages) == 0
+ return mq.size()
}
-// Nonexported empty check that doesn't lock so we can call this in Read and Write.
-func (mq *MessageQueue) isEmpty() bool {
- return len(mq.messages) == 0
+// Nonexported size check to check if the queue is empty inside already locked functions.
+func (mq *MessageQueue) size() int {
+ return len(mq.messages)
}
// Close closes the queue for future writes or reads. Any attempts to read or write from the
@@ -99,13 +78,15 @@ func (mq *MessageQueue) isEmpty() bool {
func (mq *MessageQueue) Close() {
mq.m.Lock()
defer mq.m.Unlock()
- // Already closed
+
+ // Already closed, noop
if mq.closed {
return
}
+
mq.messages = nil
mq.closed = true
- // If there's anybody currently waiting on a value from ReadOrWait, we need to
+ // If there's anybody currently waiting on a value from Dequeue, we need to
// broadcast so the read(s) can return ErrQueueClosed.
mq.c.Broadcast()
}