aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/docker/spdystream/priority.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/docker/spdystream/priority.go')
-rw-r--r--vendor/github.com/docker/spdystream/priority.go98
1 files changed, 98 insertions, 0 deletions
diff --git a/vendor/github.com/docker/spdystream/priority.go b/vendor/github.com/docker/spdystream/priority.go
new file mode 100644
index 000000000..fc8582b5c
--- /dev/null
+++ b/vendor/github.com/docker/spdystream/priority.go
@@ -0,0 +1,98 @@
+package spdystream
+
+import (
+ "container/heap"
+ "sync"
+
+ "github.com/docker/spdystream/spdy"
+)
+
+type prioritizedFrame struct {
+ frame spdy.Frame
+ priority uint8
+ insertId uint64
+}
+
+type frameQueue []*prioritizedFrame
+
+func (fq frameQueue) Len() int {
+ return len(fq)
+}
+
+func (fq frameQueue) Less(i, j int) bool {
+ if fq[i].priority == fq[j].priority {
+ return fq[i].insertId < fq[j].insertId
+ }
+ return fq[i].priority < fq[j].priority
+}
+
+func (fq frameQueue) Swap(i, j int) {
+ fq[i], fq[j] = fq[j], fq[i]
+}
+
+func (fq *frameQueue) Push(x interface{}) {
+ *fq = append(*fq, x.(*prioritizedFrame))
+}
+
+func (fq *frameQueue) Pop() interface{} {
+ old := *fq
+ n := len(old)
+ *fq = old[0 : n-1]
+ return old[n-1]
+}
+
+type PriorityFrameQueue struct {
+ queue *frameQueue
+ c *sync.Cond
+ size int
+ nextInsertId uint64
+ drain bool
+}
+
+func NewPriorityFrameQueue(size int) *PriorityFrameQueue {
+ queue := make(frameQueue, 0, size)
+ heap.Init(&queue)
+
+ return &PriorityFrameQueue{
+ queue: &queue,
+ size: size,
+ c: sync.NewCond(&sync.Mutex{}),
+ }
+}
+
+func (q *PriorityFrameQueue) Push(frame spdy.Frame, priority uint8) {
+ q.c.L.Lock()
+ defer q.c.L.Unlock()
+ for q.queue.Len() >= q.size {
+ q.c.Wait()
+ }
+ pFrame := &prioritizedFrame{
+ frame: frame,
+ priority: priority,
+ insertId: q.nextInsertId,
+ }
+ q.nextInsertId = q.nextInsertId + 1
+ heap.Push(q.queue, pFrame)
+ q.c.Signal()
+}
+
+func (q *PriorityFrameQueue) Pop() spdy.Frame {
+ q.c.L.Lock()
+ defer q.c.L.Unlock()
+ for q.queue.Len() == 0 {
+ if q.drain {
+ return nil
+ }
+ q.c.Wait()
+ }
+ frame := heap.Pop(q.queue).(*prioritizedFrame).frame
+ q.c.Signal()
+ return frame
+}
+
+func (q *PriorityFrameQueue) Drain() {
+ q.c.L.Lock()
+ defer q.c.L.Unlock()
+ q.drain = true
+ q.c.Broadcast()
+}