...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package lease
16
17 import (
18 "container/heap"
19 "time"
20 )
21
22
23
24
25 type LeaseWithTime struct {
26 id LeaseID
27 time time.Time
28 index int
29 }
30
31 type LeaseQueue []*LeaseWithTime
32
33 func (pq LeaseQueue) Len() int { return len(pq) }
34
35 func (pq LeaseQueue) Less(i, j int) bool {
36 return pq[i].time.Before(pq[j].time)
37 }
38
39 func (pq LeaseQueue) Swap(i, j int) {
40 pq[i], pq[j] = pq[j], pq[i]
41 pq[i].index = i
42 pq[j].index = j
43 }
44
45 func (pq *LeaseQueue) Push(x interface{}) {
46 n := len(*pq)
47 item := x.(*LeaseWithTime)
48 item.index = n
49 *pq = append(*pq, item)
50 }
51
52 func (pq *LeaseQueue) Pop() interface{} {
53 old := *pq
54 n := len(old)
55 item := old[n-1]
56 item.index = -1
57 *pq = old[0 : n-1]
58 return item
59 }
60
61
62
63 type LeaseExpiredNotifier struct {
64 m map[LeaseID]*LeaseWithTime
65 queue LeaseQueue
66 }
67
68 func newLeaseExpiredNotifier() *LeaseExpiredNotifier {
69 return &LeaseExpiredNotifier{
70 m: make(map[LeaseID]*LeaseWithTime),
71 queue: make(LeaseQueue, 0),
72 }
73 }
74
75 func (mq *LeaseExpiredNotifier) Init() {
76 heap.Init(&mq.queue)
77 mq.m = make(map[LeaseID]*LeaseWithTime)
78 for _, item := range mq.queue {
79 mq.m[item.id] = item
80 }
81 }
82
83 func (mq *LeaseExpiredNotifier) RegisterOrUpdate(item *LeaseWithTime) {
84 if old, ok := mq.m[item.id]; ok {
85 old.time = item.time
86 heap.Fix(&mq.queue, old.index)
87 } else {
88 heap.Push(&mq.queue, item)
89 mq.m[item.id] = item
90 }
91 }
92
93 func (mq *LeaseExpiredNotifier) Unregister() *LeaseWithTime {
94 item := heap.Pop(&mq.queue).(*LeaseWithTime)
95 delete(mq.m, item.id)
96 return item
97 }
98
99 func (mq *LeaseExpiredNotifier) Poll() *LeaseWithTime {
100 if mq.Len() == 0 {
101 return nil
102 }
103 return mq.queue[0]
104 }
105
106 func (mq *LeaseExpiredNotifier) Len() int {
107 return len(mq.m)
108 }
109
View as plain text