1
16
17
18
19 package cache
20
21 import (
22 "container/heap"
23 "fmt"
24 "sync"
25 )
26
27 const (
28 closedMsg = "heap is closed"
29 )
30
31
32 type LessFunc func(interface{}, interface{}) bool
33
34 type heapItem struct {
35 obj interface{}
36 index int
37 }
38
39 type itemKeyValue struct {
40 key string
41 obj interface{}
42 }
43
44
45
46 type heapData struct {
47
48
49 items map[string]*heapItem
50
51
52
53 queue []string
54
55
56
57 keyFunc KeyFunc
58
59 lessFunc LessFunc
60 }
61
62 var (
63 _ = heap.Interface(&heapData{})
64 )
65
66
67
68 func (h *heapData) Less(i, j int) bool {
69 if i > len(h.queue) || j > len(h.queue) {
70 return false
71 }
72 itemi, ok := h.items[h.queue[i]]
73 if !ok {
74 return false
75 }
76 itemj, ok := h.items[h.queue[j]]
77 if !ok {
78 return false
79 }
80 return h.lessFunc(itemi.obj, itemj.obj)
81 }
82
83
84 func (h *heapData) Len() int { return len(h.queue) }
85
86
87
88 func (h *heapData) Swap(i, j int) {
89 h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
90 item := h.items[h.queue[i]]
91 item.index = i
92 item = h.items[h.queue[j]]
93 item.index = j
94 }
95
96
97 func (h *heapData) Push(kv interface{}) {
98 keyValue := kv.(*itemKeyValue)
99 n := len(h.queue)
100 h.items[keyValue.key] = &heapItem{keyValue.obj, n}
101 h.queue = append(h.queue, keyValue.key)
102 }
103
104
105 func (h *heapData) Pop() interface{} {
106 key := h.queue[len(h.queue)-1]
107 h.queue = h.queue[0 : len(h.queue)-1]
108 item, ok := h.items[key]
109 if !ok {
110
111 return nil
112 }
113 delete(h.items, key)
114 return item.obj
115 }
116
117
118
119 type Heap struct {
120 lock sync.RWMutex
121 cond sync.Cond
122
123
124
125 data *heapData
126
127
128
129 closed bool
130 }
131
132
133
134 func (h *Heap) Close() {
135 h.lock.Lock()
136 defer h.lock.Unlock()
137 h.closed = true
138 h.cond.Broadcast()
139 }
140
141
142
143 func (h *Heap) Add(obj interface{}) error {
144 key, err := h.data.keyFunc(obj)
145 if err != nil {
146 return KeyError{obj, err}
147 }
148 h.lock.Lock()
149 defer h.lock.Unlock()
150 if h.closed {
151 return fmt.Errorf(closedMsg)
152 }
153 if _, exists := h.data.items[key]; exists {
154 h.data.items[key].obj = obj
155 heap.Fix(h.data, h.data.items[key].index)
156 } else {
157 h.addIfNotPresentLocked(key, obj)
158 }
159 h.cond.Broadcast()
160 return nil
161 }
162
163
164
165
166 func (h *Heap) BulkAdd(list []interface{}) error {
167 h.lock.Lock()
168 defer h.lock.Unlock()
169 if h.closed {
170 return fmt.Errorf(closedMsg)
171 }
172 for _, obj := range list {
173 key, err := h.data.keyFunc(obj)
174 if err != nil {
175 return KeyError{obj, err}
176 }
177 if _, exists := h.data.items[key]; exists {
178 h.data.items[key].obj = obj
179 heap.Fix(h.data, h.data.items[key].index)
180 } else {
181 h.addIfNotPresentLocked(key, obj)
182 }
183 }
184 h.cond.Broadcast()
185 return nil
186 }
187
188
189
190
191
192
193
194 func (h *Heap) AddIfNotPresent(obj interface{}) error {
195 id, err := h.data.keyFunc(obj)
196 if err != nil {
197 return KeyError{obj, err}
198 }
199 h.lock.Lock()
200 defer h.lock.Unlock()
201 if h.closed {
202 return fmt.Errorf(closedMsg)
203 }
204 h.addIfNotPresentLocked(id, obj)
205 h.cond.Broadcast()
206 return nil
207 }
208
209
210
211 func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
212 if _, exists := h.data.items[key]; exists {
213 return
214 }
215 heap.Push(h.data, &itemKeyValue{key, obj})
216 }
217
218
219
220 func (h *Heap) Update(obj interface{}) error {
221 return h.Add(obj)
222 }
223
224
225 func (h *Heap) Delete(obj interface{}) error {
226 key, err := h.data.keyFunc(obj)
227 if err != nil {
228 return KeyError{obj, err}
229 }
230 h.lock.Lock()
231 defer h.lock.Unlock()
232 if item, ok := h.data.items[key]; ok {
233 heap.Remove(h.data, item.index)
234 return nil
235 }
236 return fmt.Errorf("object not found")
237 }
238
239
240
241 func (h *Heap) Pop() (interface{}, error) {
242 h.lock.Lock()
243 defer h.lock.Unlock()
244 for len(h.data.queue) == 0 {
245
246
247
248 if h.closed {
249 return nil, fmt.Errorf("heap is closed")
250 }
251 h.cond.Wait()
252 }
253 obj := heap.Pop(h.data)
254 if obj == nil {
255 return nil, fmt.Errorf("object was removed from heap data")
256 }
257
258 return obj, nil
259 }
260
261
262 func (h *Heap) List() []interface{} {
263 h.lock.RLock()
264 defer h.lock.RUnlock()
265 list := make([]interface{}, 0, len(h.data.items))
266 for _, item := range h.data.items {
267 list = append(list, item.obj)
268 }
269 return list
270 }
271
272
273 func (h *Heap) ListKeys() []string {
274 h.lock.RLock()
275 defer h.lock.RUnlock()
276 list := make([]string, 0, len(h.data.items))
277 for key := range h.data.items {
278 list = append(list, key)
279 }
280 return list
281 }
282
283
284 func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
285 key, err := h.data.keyFunc(obj)
286 if err != nil {
287 return nil, false, KeyError{obj, err}
288 }
289 return h.GetByKey(key)
290 }
291
292
293 func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
294 h.lock.RLock()
295 defer h.lock.RUnlock()
296 item, exists := h.data.items[key]
297 if !exists {
298 return nil, false, nil
299 }
300 return item.obj, true, nil
301 }
302
303
304 func (h *Heap) IsClosed() bool {
305 h.lock.RLock()
306 defer h.lock.RUnlock()
307 return h.closed
308 }
309
310
311 func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
312 h := &Heap{
313 data: &heapData{
314 items: map[string]*heapItem{},
315 queue: []string{},
316 keyFunc: keyFn,
317 lessFunc: lessFn,
318 },
319 }
320 h.cond.L = &h.lock
321 return h
322 }
323
View as plain text