...
1
17
18 package wrr
19
20 import (
21 "container/heap"
22 "sync"
23 )
24
25
26 type edfWrr struct {
27 lock sync.Mutex
28 items edfPriorityQueue
29 currentOrderOffset uint64
30 currentTime float64
31 }
32
33
34
35
36
37 func NewEDF() WRR {
38 return &edfWrr{}
39 }
40
41
42 type edfEntry struct {
43 deadline float64
44 weight int64
45 orderOffset uint64
46 item any
47 }
48
49
50 type edfPriorityQueue []*edfEntry
51
52 func (pq edfPriorityQueue) Len() int { return len(pq) }
53 func (pq edfPriorityQueue) Less(i, j int) bool {
54 return pq[i].deadline < pq[j].deadline || pq[i].deadline == pq[j].deadline && pq[i].orderOffset < pq[j].orderOffset
55 }
56 func (pq edfPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
57
58 func (pq *edfPriorityQueue) Push(x any) {
59 *pq = append(*pq, x.(*edfEntry))
60 }
61
62 func (pq *edfPriorityQueue) Pop() any {
63 old := *pq
64 *pq = old[0 : len(old)-1]
65 return old[len(old)-1]
66 }
67
68 func (edf *edfWrr) Add(item any, weight int64) {
69 edf.lock.Lock()
70 defer edf.lock.Unlock()
71 entry := edfEntry{
72 deadline: edf.currentTime + 1.0/float64(weight),
73 weight: weight,
74 item: item,
75 orderOffset: edf.currentOrderOffset,
76 }
77 edf.currentOrderOffset++
78 heap.Push(&edf.items, &entry)
79 }
80
81 func (edf *edfWrr) Next() any {
82 edf.lock.Lock()
83 defer edf.lock.Unlock()
84 if len(edf.items) == 0 {
85 return nil
86 }
87 item := edf.items[0]
88 edf.currentTime = item.deadline
89 item.deadline = edf.currentTime + 1.0/float64(item.weight)
90 heap.Fix(&edf.items, 0)
91 return item.item
92 }
93
View as plain text