1
2
3
4
5
6
7
8 package cache
9
10 import (
11 "sort"
12 "sync"
13 "sync/atomic"
14 "unsafe"
15
16 "github.com/syndtr/goleveldb/leveldb/util"
17 )
18
19
20
21 type Cacher interface {
22
23 Capacity() int
24
25
26 SetCapacity(capacity int)
27
28
29 Promote(n *Node)
30
31
32 Ban(n *Node)
33
34
35 Evict(n *Node)
36 }
37
38
39
40 type Value interface{}
41
42
43 type NamespaceGetter struct {
44 Cache *Cache
45 NS uint64
46 }
47
48
49 func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
50 return g.Cache.Get(g.NS, key, setFunc)
51 }
52
53
54
55
56
57
58 const (
59 mInitialSize = 1 << 4
60 mOverflowThreshold = 1 << 5
61 mOverflowGrowThreshold = 1 << 7
62 )
63
64 const (
65 bucketUninitialized = iota
66 bucketInitialized
67 bucketFrozen
68 )
69
70 type mNodes []*Node
71
72 func (x mNodes) Len() int { return len(x) }
73 func (x mNodes) Less(i, j int) bool {
74 a, b := x[i].ns, x[j].ns
75 if a == b {
76 return x[i].key < x[j].key
77 }
78 return a < b
79 }
80 func (x mNodes) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
81
82 func (x mNodes) sort() { sort.Sort(x) }
83
84 func (x mNodes) search(ns, key uint64) int {
85 return sort.Search(len(x), func(i int) bool {
86 a := x[i].ns
87 if a == ns {
88 return x[i].key >= key
89 }
90 return a > ns
91 })
92 }
93
94 type mBucket struct {
95 mu sync.Mutex
96 nodes mNodes
97 state int8
98 }
99
100 func (b *mBucket) freeze() mNodes {
101 b.mu.Lock()
102 defer b.mu.Unlock()
103 if b.state == bucketInitialized {
104 b.state = bucketFrozen
105 } else if b.state == bucketUninitialized {
106 panic("BUG: freeze uninitialized bucket")
107 }
108 return b.nodes
109 }
110
111 func (b *mBucket) frozen() bool {
112 if b.state == bucketFrozen {
113 return true
114 }
115 if b.state == bucketUninitialized {
116 panic("BUG: accessing uninitialized bucket")
117 }
118 return false
119 }
120
121 func (b *mBucket) get(r *Cache, h *mHead, hash uint32, ns, key uint64, getOnly bool) (done, created bool, n *Node) {
122 b.mu.Lock()
123
124 if b.frozen() {
125 b.mu.Unlock()
126 return
127 }
128
129
130 i := b.nodes.search(ns, key)
131 if i < len(b.nodes) {
132 n = b.nodes[i]
133 if n.ns == ns && n.key == key {
134 atomic.AddInt32(&n.ref, 1)
135 b.mu.Unlock()
136 return true, false, n
137 }
138 }
139
140
141 if getOnly {
142 b.mu.Unlock()
143 return true, false, nil
144 }
145
146
147 n = &Node{
148 r: r,
149 hash: hash,
150 ns: ns,
151 key: key,
152 ref: 1,
153 }
154
155 if i == len(b.nodes) {
156 b.nodes = append(b.nodes, n)
157 } else {
158 b.nodes = append(b.nodes[:i+1], b.nodes[i:]...)
159 b.nodes[i] = n
160 }
161 bLen := len(b.nodes)
162 b.mu.Unlock()
163
164
165 grow := atomic.AddInt64(&r.statNodes, 1) >= h.growThreshold
166 if bLen > mOverflowThreshold {
167 grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
168 }
169
170
171 if grow && atomic.CompareAndSwapInt32(&h.resizeInProgress, 0, 1) {
172 nhLen := len(h.buckets) << 1
173 nh := &mHead{
174 buckets: make([]mBucket, nhLen),
175 mask: uint32(nhLen) - 1,
176 predecessor: unsafe.Pointer(h),
177 growThreshold: int64(nhLen * mOverflowThreshold),
178 shrinkThreshold: int64(nhLen >> 1),
179 }
180 ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
181 if !ok {
182 panic("BUG: failed swapping head")
183 }
184 atomic.AddInt32(&r.statGrow, 1)
185 go nh.initBuckets()
186 }
187
188 return true, true, n
189 }
190
191 func (b *mBucket) delete(r *Cache, h *mHead, hash uint32, ns, key uint64) (done, deleted bool) {
192 b.mu.Lock()
193
194 if b.frozen() {
195 b.mu.Unlock()
196 return
197 }
198
199
200 i := b.nodes.search(ns, key)
201 if i == len(b.nodes) {
202 b.mu.Unlock()
203 return true, false
204 }
205 n := b.nodes[i]
206 var bLen int
207 if n.ns == ns && n.key == key {
208 if atomic.LoadInt32(&n.ref) == 0 {
209 deleted = true
210
211
212 if n.value != nil {
213
214 if r, ok := n.value.(util.Releaser); ok {
215 r.Release()
216 }
217 n.value = nil
218 }
219
220
221 b.nodes = append(b.nodes[:i], b.nodes[i+1:]...)
222 bLen = len(b.nodes)
223 }
224 }
225 b.mu.Unlock()
226
227 if deleted {
228
229 for _, f := range n.delFuncs {
230 f()
231 }
232
233
234 atomic.AddInt64(&r.statSize, int64(n.size)*-1)
235 shrink := atomic.AddInt64(&r.statNodes, -1) < h.shrinkThreshold
236 if bLen >= mOverflowThreshold {
237 atomic.AddInt32(&h.overflow, -1)
238 }
239
240
241 if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgress, 0, 1) {
242 nhLen := len(h.buckets) >> 1
243 nh := &mHead{
244 buckets: make([]mBucket, nhLen),
245 mask: uint32(nhLen) - 1,
246 predecessor: unsafe.Pointer(h),
247 growThreshold: int64(nhLen * mOverflowThreshold),
248 shrinkThreshold: int64(nhLen >> 1),
249 }
250 ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
251 if !ok {
252 panic("BUG: failed swapping head")
253 }
254 atomic.AddInt32(&r.statShrink, 1)
255 go nh.initBuckets()
256 }
257 }
258
259 return true, deleted
260 }
261
262 type mHead struct {
263 buckets []mBucket
264 mask uint32
265 predecessor unsafe.Pointer
266 resizeInProgress int32
267
268 overflow int32
269 growThreshold int64
270 shrinkThreshold int64
271 }
272
273 func (h *mHead) initBucket(i uint32) *mBucket {
274 b := &h.buckets[i]
275 b.mu.Lock()
276 if b.state >= bucketInitialized {
277 b.mu.Unlock()
278 return b
279 }
280
281 p := (*mHead)(atomic.LoadPointer(&h.predecessor))
282 if p == nil {
283 panic("BUG: uninitialized bucket doesn't have predecessor")
284 }
285
286 var nodes mNodes
287 if h.mask > p.mask {
288
289 m := p.initBucket(i & p.mask).freeze()
290
291 for _, x := range m {
292 if x.hash&h.mask == i {
293 nodes = append(nodes, x)
294 }
295 }
296 } else {
297
298 m0 := p.initBucket(i).freeze()
299 m1 := p.initBucket(i + uint32(len(h.buckets))).freeze()
300
301 nodes = make(mNodes, 0, len(m0)+len(m1))
302 nodes = append(nodes, m0...)
303 nodes = append(nodes, m1...)
304 nodes.sort()
305 }
306 b.nodes = nodes
307 b.state = bucketInitialized
308 b.mu.Unlock()
309 return b
310 }
311
312 func (h *mHead) initBuckets() {
313 for i := range h.buckets {
314 h.initBucket(uint32(i))
315 }
316 atomic.StorePointer(&h.predecessor, nil)
317 }
318
319 func (h *mHead) enumerateNodesWithCB(f func([]*Node)) {
320 var nodes []*Node
321 for x := range h.buckets {
322 b := h.initBucket(uint32(x))
323
324 b.mu.Lock()
325 nodes = append(nodes, b.nodes...)
326 b.mu.Unlock()
327 f(nodes)
328 }
329 }
330
331 func (h *mHead) enumerateNodesByNS(ns uint64) []*Node {
332 var nodes []*Node
333 for x := range h.buckets {
334 b := h.initBucket(uint32(x))
335
336 b.mu.Lock()
337 i := b.nodes.search(ns, 0)
338 for ; i < len(b.nodes); i++ {
339 n := b.nodes[i]
340 if n.ns != ns {
341 break
342 }
343 nodes = append(nodes, n)
344 }
345 b.mu.Unlock()
346 }
347 return nodes
348 }
349
350 type Stats struct {
351 Buckets int
352 Nodes int64
353 Size int64
354 GrowCount int32
355 ShrinkCount int32
356 HitCount int64
357 MissCount int64
358 SetCount int64
359 DelCount int64
360 }
361
362
363 type Cache struct {
364 mu sync.RWMutex
365 mHead unsafe.Pointer
366 cacher Cacher
367 closed bool
368
369 statNodes int64
370 statSize int64
371 statGrow int32
372 statShrink int32
373 statHit int64
374 statMiss int64
375 statSet int64
376 statDel int64
377 }
378
379
380
381 func NewCache(cacher Cacher) *Cache {
382 h := &mHead{
383 buckets: make([]mBucket, mInitialSize),
384 mask: mInitialSize - 1,
385 growThreshold: int64(mInitialSize * mOverflowThreshold),
386 shrinkThreshold: 0,
387 }
388 for i := range h.buckets {
389 h.buckets[i].state = bucketInitialized
390 }
391 r := &Cache{
392 mHead: unsafe.Pointer(h),
393 cacher: cacher,
394 }
395 return r
396 }
397
398 func (r *Cache) getBucket(hash uint32) (*mHead, *mBucket) {
399 h := (*mHead)(atomic.LoadPointer(&r.mHead))
400 i := hash & h.mask
401 return h, h.initBucket(i)
402 }
403
404 func (r *Cache) enumerateNodesWithCB(f func([]*Node)) {
405 h := (*mHead)(atomic.LoadPointer(&r.mHead))
406 h.enumerateNodesWithCB(f)
407 }
408
409 func (r *Cache) enumerateNodesByNS(ns uint64) []*Node {
410 h := (*mHead)(atomic.LoadPointer(&r.mHead))
411 return h.enumerateNodesByNS(ns)
412 }
413
414 func (r *Cache) delete(n *Node) bool {
415 for {
416 h, b := r.getBucket(n.hash)
417 done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
418 if done {
419 return deleted
420 }
421 }
422 }
423
424
425 func (r *Cache) GetStats() Stats {
426 return Stats{
427 Buckets: len((*mHead)(atomic.LoadPointer(&r.mHead)).buckets),
428 Nodes: atomic.LoadInt64(&r.statNodes),
429 Size: atomic.LoadInt64(&r.statSize),
430 GrowCount: atomic.LoadInt32(&r.statGrow),
431 ShrinkCount: atomic.LoadInt32(&r.statShrink),
432 HitCount: atomic.LoadInt64(&r.statHit),
433 MissCount: atomic.LoadInt64(&r.statMiss),
434 SetCount: atomic.LoadInt64(&r.statSet),
435 DelCount: atomic.LoadInt64(&r.statDel),
436 }
437 }
438
439
440 func (r *Cache) Nodes() int {
441 return int(atomic.LoadInt64(&r.statNodes))
442 }
443
444
445 func (r *Cache) Size() int {
446 return int(atomic.LoadInt64(&r.statSize))
447 }
448
449
450 func (r *Cache) Capacity() int {
451 if r.cacher == nil {
452 return 0
453 }
454 return r.cacher.Capacity()
455 }
456
457
458 func (r *Cache) SetCapacity(capacity int) {
459 if r.cacher != nil {
460 r.cacher.SetCapacity(capacity)
461 }
462 }
463
464
465
466
467
468
469
470 func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
471 r.mu.RLock()
472 defer r.mu.RUnlock()
473 if r.closed {
474 return nil
475 }
476
477 hash := murmur32(ns, key, 0xf00)
478 for {
479 h, b := r.getBucket(hash)
480 done, created, n := b.get(r, h, hash, ns, key, setFunc == nil)
481 if done {
482 if created || n == nil {
483 atomic.AddInt64(&r.statMiss, 1)
484 } else {
485 atomic.AddInt64(&r.statHit, 1)
486 }
487
488 if n != nil {
489 n.mu.Lock()
490 if n.value == nil {
491 if setFunc == nil {
492 n.mu.Unlock()
493 n.unRefInternal(false)
494 return nil
495 }
496
497 n.size, n.value = setFunc()
498 if n.value == nil {
499 n.size = 0
500 n.mu.Unlock()
501 n.unRefInternal(false)
502 return nil
503 }
504 atomic.AddInt64(&r.statSet, 1)
505 atomic.AddInt64(&r.statSize, int64(n.size))
506 }
507 n.mu.Unlock()
508 if r.cacher != nil {
509 r.cacher.Promote(n)
510 }
511 return &Handle{unsafe.Pointer(n)}
512 }
513
514 break
515 }
516 }
517 return nil
518 }
519
520
521
522
523
524
525
526
527
528
529 func (r *Cache) Delete(ns, key uint64, delFunc func()) bool {
530 r.mu.RLock()
531 defer r.mu.RUnlock()
532 if r.closed {
533 return false
534 }
535
536 hash := murmur32(ns, key, 0xf00)
537 for {
538 h, b := r.getBucket(hash)
539 done, _, n := b.get(r, h, hash, ns, key, true)
540 if done {
541 if n != nil {
542 if delFunc != nil {
543 n.mu.Lock()
544 n.delFuncs = append(n.delFuncs, delFunc)
545 n.mu.Unlock()
546 }
547 if r.cacher != nil {
548 r.cacher.Ban(n)
549 }
550 n.unRefInternal(true)
551 return true
552 }
553
554 break
555 }
556 }
557
558 if delFunc != nil {
559 delFunc()
560 }
561
562 return false
563 }
564
565
566
567
568
569 func (r *Cache) Evict(ns, key uint64) bool {
570 r.mu.RLock()
571 defer r.mu.RUnlock()
572 if r.closed {
573 return false
574 }
575
576 hash := murmur32(ns, key, 0xf00)
577 for {
578 h, b := r.getBucket(hash)
579 done, _, n := b.get(r, h, hash, ns, key, true)
580 if done {
581 if n != nil {
582 if r.cacher != nil {
583 r.cacher.Evict(n)
584 }
585 n.unRefInternal(true)
586 return true
587 }
588
589 break
590 }
591 }
592
593 return false
594 }
595
596
597
598 func (r *Cache) EvictNS(ns uint64) {
599 r.mu.RLock()
600 defer r.mu.RUnlock()
601 if r.closed {
602 return
603 }
604
605 if r.cacher != nil {
606 nodes := r.enumerateNodesByNS(ns)
607 for _, n := range nodes {
608 r.cacher.Evict(n)
609 }
610 }
611 }
612
613 func (r *Cache) evictAll() {
614 r.enumerateNodesWithCB(func(nodes []*Node) {
615 for _, n := range nodes {
616 r.cacher.Evict(n)
617 }
618 })
619 }
620
621
622 func (r *Cache) EvictAll() {
623 r.mu.RLock()
624 defer r.mu.RUnlock()
625 if r.closed {
626 return
627 }
628
629 if r.cacher != nil {
630 r.evictAll()
631 }
632 }
633
634
635
636
637
638
639
640 func (r *Cache) Close(force bool) {
641 var head *mHead
642
643 r.mu.Lock()
644 if !r.closed {
645 r.closed = true
646 head = (*mHead)(atomic.LoadPointer(&r.mHead))
647 atomic.StorePointer(&r.mHead, nil)
648 }
649 r.mu.Unlock()
650
651 if head != nil {
652 head.enumerateNodesWithCB(func(nodes []*Node) {
653 for _, n := range nodes {
654
655 if force {
656 atomic.StoreInt32(&n.ref, 0)
657 }
658
659
660 if r.cacher != nil {
661 r.cacher.Evict(n)
662 }
663
664 if force {
665 n.callFinalizer()
666 }
667 }
668 })
669 }
670 }
671
672
673 type Node struct {
674 r *Cache
675
676 hash uint32
677 ns, key uint64
678
679 mu sync.Mutex
680 size int
681 value Value
682
683 ref int32
684 delFuncs []func()
685
686 CacheData unsafe.Pointer
687 }
688
689
690 func (n *Node) NS() uint64 {
691 return n.ns
692 }
693
694
695 func (n *Node) Key() uint64 {
696 return n.key
697 }
698
699
700 func (n *Node) Size() int {
701 return n.size
702 }
703
704
705 func (n *Node) Value() Value {
706 return n.value
707 }
708
709
710 func (n *Node) Ref() int32 {
711 return atomic.LoadInt32(&n.ref)
712 }
713
714
715 func (n *Node) GetHandle() *Handle {
716 if atomic.AddInt32(&n.ref, 1) <= 1 {
717 panic("BUG: Node.GetHandle on zero ref")
718 }
719 return &Handle{unsafe.Pointer(n)}
720 }
721
722 func (n *Node) callFinalizer() {
723
724 if n.value != nil {
725 if r, ok := n.value.(util.Releaser); ok {
726 r.Release()
727 }
728 n.value = nil
729 }
730
731
732 for _, f := range n.delFuncs {
733 f()
734 }
735 n.delFuncs = nil
736 }
737
738 func (n *Node) unRefInternal(updateStat bool) {
739 if atomic.AddInt32(&n.ref, -1) == 0 {
740 n.r.delete(n)
741 if updateStat {
742 atomic.AddInt64(&n.r.statDel, 1)
743 }
744 }
745 }
746
747 func (n *Node) unRefExternal() {
748 if atomic.AddInt32(&n.ref, -1) == 0 {
749 n.r.mu.RLock()
750 if n.r.closed {
751 n.callFinalizer()
752 } else {
753 n.r.delete(n)
754 atomic.AddInt64(&n.r.statDel, 1)
755 }
756 n.r.mu.RUnlock()
757 }
758 }
759
760
761 type Handle struct {
762 n unsafe.Pointer
763 }
764
765
766 func (h *Handle) Value() Value {
767 n := (*Node)(atomic.LoadPointer(&h.n))
768 if n != nil {
769 return n.value
770 }
771 return nil
772 }
773
774
775
776 func (h *Handle) Release() {
777 nPtr := atomic.LoadPointer(&h.n)
778 if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
779 n := (*Node)(nPtr)
780 n.unRefExternal()
781 }
782 }
783
784 func murmur32(ns, key uint64, seed uint32) uint32 {
785 const (
786 m = uint32(0x5bd1e995)
787 r = 24
788 )
789
790 k1 := uint32(ns >> 32)
791 k2 := uint32(ns)
792 k3 := uint32(key >> 32)
793 k4 := uint32(key)
794
795 k1 *= m
796 k1 ^= k1 >> r
797 k1 *= m
798
799 k2 *= m
800 k2 ^= k2 >> r
801 k2 *= m
802
803 k3 *= m
804 k3 ^= k3 >> r
805 k3 *= m
806
807 k4 *= m
808 k4 ^= k4 >> r
809 k4 *= m
810
811 h := seed
812
813 h *= m
814 h ^= k1
815 h *= m
816 h ^= k2
817 h *= m
818 h ^= k3
819 h *= m
820 h ^= k4
821
822 h ^= h >> 13
823 h *= m
824 h ^= h >> 15
825
826 return h
827 }
828
View as plain text