...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package cache
18
19 import (
20 "errors"
21 "sync"
22
23 "github.com/golang/groupcache/lru"
24 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
25 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
26 "go.etcd.io/etcd/pkg/v3/adt"
27 )
28
29 var (
30 DefaultMaxEntries = 2048
31 ErrCompacted = rpctypes.ErrGRPCCompacted
32 )
33
34 type Cache interface {
35 Add(req *pb.RangeRequest, resp *pb.RangeResponse)
36 Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
37 Compact(revision int64)
38 Invalidate(key []byte, endkey []byte)
39 Size() int
40 Close()
41 }
42
43
44 func keyFunc(req *pb.RangeRequest) string {
45
46 b, err := req.Marshal()
47 if err != nil {
48 panic(err)
49 }
50 return string(b)
51 }
52
53 func NewCache(maxCacheEntries int) Cache {
54 return &cache{
55 lru: lru.New(maxCacheEntries),
56 cachedRanges: adt.NewIntervalTree(),
57 compactedRev: -1,
58 }
59 }
60
61 func (c *cache) Close() {}
62
63
64 type cache struct {
65 mu sync.RWMutex
66 lru *lru.Cache
67
68
69 cachedRanges adt.IntervalTree
70
71 compactedRev int64
72 }
73
74
75 func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
76 key := keyFunc(req)
77
78 c.mu.Lock()
79 defer c.mu.Unlock()
80
81 if req.Revision > c.compactedRev {
82 c.lru.Add(key, resp)
83 }
84
85
86 if req.Revision != 0 {
87 return
88 }
89
90 var (
91 iv *adt.IntervalValue
92 ivl adt.Interval
93 )
94 if len(req.RangeEnd) != 0 {
95 ivl = adt.NewStringAffineInterval(string(req.Key), string(req.RangeEnd))
96 } else {
97 ivl = adt.NewStringAffinePoint(string(req.Key))
98 }
99
100 iv = c.cachedRanges.Find(ivl)
101
102 if iv == nil {
103 val := map[string]struct{}{key: {}}
104 c.cachedRanges.Insert(ivl, val)
105 } else {
106 val := iv.Val.(map[string]struct{})
107 val[key] = struct{}{}
108 iv.Val = val
109 }
110 }
111
112
113
114 func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
115 key := keyFunc(req)
116
117 c.mu.Lock()
118 defer c.mu.Unlock()
119
120 if req.Revision > 0 && req.Revision < c.compactedRev {
121 c.lru.Remove(key)
122 return nil, ErrCompacted
123 }
124
125 if resp, ok := c.lru.Get(key); ok {
126 return resp.(*pb.RangeResponse), nil
127 }
128 return nil, errors.New("not exist")
129 }
130
131
132 func (c *cache) Invalidate(key, endkey []byte) {
133 c.mu.Lock()
134 defer c.mu.Unlock()
135
136 var (
137 ivs []*adt.IntervalValue
138 ivl adt.Interval
139 )
140 if len(endkey) == 0 {
141 ivl = adt.NewStringAffinePoint(string(key))
142 } else {
143 ivl = adt.NewStringAffineInterval(string(key), string(endkey))
144 }
145
146 ivs = c.cachedRanges.Stab(ivl)
147 for _, iv := range ivs {
148 keys := iv.Val.(map[string]struct{})
149 for key := range keys {
150 c.lru.Remove(key)
151 }
152 }
153
154 c.cachedRanges.Delete(ivl)
155 }
156
157
158
159 func (c *cache) Compact(revision int64) {
160 c.mu.Lock()
161 defer c.mu.Unlock()
162
163 if revision > c.compactedRev {
164 c.compactedRev = revision
165 }
166 }
167
168 func (c *cache) Size() int {
169 c.mu.RLock()
170 defer c.mu.RUnlock()
171 return c.lru.Len()
172 }
173
View as plain text