...
1
16
17 package spdystream
18
19 import (
20 "container/heap"
21 "sync"
22
23 "github.com/moby/spdystream/spdy"
24 )
25
26 type prioritizedFrame struct {
27 frame spdy.Frame
28 priority uint8
29 insertId uint64
30 }
31
32 type frameQueue []*prioritizedFrame
33
34 func (fq frameQueue) Len() int {
35 return len(fq)
36 }
37
38 func (fq frameQueue) Less(i, j int) bool {
39 if fq[i].priority == fq[j].priority {
40 return fq[i].insertId < fq[j].insertId
41 }
42 return fq[i].priority < fq[j].priority
43 }
44
45 func (fq frameQueue) Swap(i, j int) {
46 fq[i], fq[j] = fq[j], fq[i]
47 }
48
49 func (fq *frameQueue) Push(x interface{}) {
50 *fq = append(*fq, x.(*prioritizedFrame))
51 }
52
53 func (fq *frameQueue) Pop() interface{} {
54 old := *fq
55 n := len(old)
56 *fq = old[0 : n-1]
57 return old[n-1]
58 }
59
60 type PriorityFrameQueue struct {
61 queue *frameQueue
62 c *sync.Cond
63 size int
64 nextInsertId uint64
65 drain bool
66 }
67
68 func NewPriorityFrameQueue(size int) *PriorityFrameQueue {
69 queue := make(frameQueue, 0, size)
70 heap.Init(&queue)
71
72 return &PriorityFrameQueue{
73 queue: &queue,
74 size: size,
75 c: sync.NewCond(&sync.Mutex{}),
76 }
77 }
78
79 func (q *PriorityFrameQueue) Push(frame spdy.Frame, priority uint8) {
80 q.c.L.Lock()
81 defer q.c.L.Unlock()
82 for q.queue.Len() >= q.size {
83 q.c.Wait()
84 }
85 pFrame := &prioritizedFrame{
86 frame: frame,
87 priority: priority,
88 insertId: q.nextInsertId,
89 }
90 q.nextInsertId = q.nextInsertId + 1
91 heap.Push(q.queue, pFrame)
92 q.c.Signal()
93 }
94
95 func (q *PriorityFrameQueue) Pop() spdy.Frame {
96 q.c.L.Lock()
97 defer q.c.L.Unlock()
98 for q.queue.Len() == 0 {
99 if q.drain {
100 return nil
101 }
102 q.c.Wait()
103 }
104 frame := heap.Pop(q.queue).(*prioritizedFrame).frame
105 q.c.Signal()
106 return frame
107 }
108
109 func (q *PriorityFrameQueue) Drain() {
110 q.c.L.Lock()
111 defer q.c.L.Unlock()
112 q.drain = true
113 q.c.Broadcast()
114 }
115
View as plain text