1
16
17
18
19
20
21
22
23
24
25 package groupcache
26
27 import (
28 "context"
29 "errors"
30 "math/rand"
31 "strconv"
32 "sync"
33 "sync/atomic"
34
35 pb "github.com/golang/groupcache/groupcachepb"
36 "github.com/golang/groupcache/lru"
37 "github.com/golang/groupcache/singleflight"
38 )
39
40
41 type Getter interface {
42
43
44
45
46
47
48 Get(ctx context.Context, key string, dest Sink) error
49 }
50
51
52 type GetterFunc func(ctx context.Context, key string, dest Sink) error
53
54 func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
55 return f(ctx, key, dest)
56 }
57
58 var (
59 mu sync.RWMutex
60 groups = make(map[string]*Group)
61
62 initPeerServerOnce sync.Once
63 initPeerServer func()
64 )
65
66
67
68 func GetGroup(name string) *Group {
69 mu.RLock()
70 g := groups[name]
71 mu.RUnlock()
72 return g
73 }
74
75
76
77
78
79
80
81
82
83
84 func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
85 return newGroup(name, cacheBytes, getter, nil)
86 }
87
88
89 func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
90 if getter == nil {
91 panic("nil Getter")
92 }
93 mu.Lock()
94 defer mu.Unlock()
95 initPeerServerOnce.Do(callInitPeerServer)
96 if _, dup := groups[name]; dup {
97 panic("duplicate registration of group " + name)
98 }
99 g := &Group{
100 name: name,
101 getter: getter,
102 peers: peers,
103 cacheBytes: cacheBytes,
104 loadGroup: &singleflight.Group{},
105 }
106 if fn := newGroupHook; fn != nil {
107 fn(g)
108 }
109 groups[name] = g
110 return g
111 }
112
113
114 var newGroupHook func(*Group)
115
116
117
118 func RegisterNewGroupHook(fn func(*Group)) {
119 if newGroupHook != nil {
120 panic("RegisterNewGroupHook called more than once")
121 }
122 newGroupHook = fn
123 }
124
125
126
127 func RegisterServerStart(fn func()) {
128 if initPeerServer != nil {
129 panic("RegisterServerStart called more than once")
130 }
131 initPeerServer = fn
132 }
133
134 func callInitPeerServer() {
135 if initPeerServer != nil {
136 initPeerServer()
137 }
138 }
139
140
141
142 type Group struct {
143 name string
144 getter Getter
145 peersOnce sync.Once
146 peers PeerPicker
147 cacheBytes int64
148
149
150
151
152
153 mainCache cache
154
155
156
157
158
159
160
161
162
163 hotCache cache
164
165
166
167
168 loadGroup flightGroup
169
170 _ int32
171
172
173 Stats Stats
174 }
175
176
177
178
179 type flightGroup interface {
180
181 Do(key string, fn func() (interface{}, error)) (interface{}, error)
182 }
183
184
185 type Stats struct {
186 Gets AtomicInt
187 CacheHits AtomicInt
188 PeerLoads AtomicInt
189 PeerErrors AtomicInt
190 Loads AtomicInt
191 LoadsDeduped AtomicInt
192 LocalLoads AtomicInt
193 LocalLoadErrs AtomicInt
194 ServerRequests AtomicInt
195 }
196
197
198 func (g *Group) Name() string {
199 return g.name
200 }
201
202 func (g *Group) initPeers() {
203 if g.peers == nil {
204 g.peers = getPeers(g.name)
205 }
206 }
207
208 func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
209 g.peersOnce.Do(g.initPeers)
210 g.Stats.Gets.Add(1)
211 if dest == nil {
212 return errors.New("groupcache: nil dest Sink")
213 }
214 value, cacheHit := g.lookupCache(key)
215
216 if cacheHit {
217 g.Stats.CacheHits.Add(1)
218 return setSinkView(dest, value)
219 }
220
221
222
223
224
225 destPopulated := false
226 value, destPopulated, err := g.load(ctx, key, dest)
227 if err != nil {
228 return err
229 }
230 if destPopulated {
231 return nil
232 }
233 return setSinkView(dest, value)
234 }
235
236
237 func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
238 g.Stats.Loads.Add(1)
239 viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261 if value, cacheHit := g.lookupCache(key); cacheHit {
262 g.Stats.CacheHits.Add(1)
263 return value, nil
264 }
265 g.Stats.LoadsDeduped.Add(1)
266 var value ByteView
267 var err error
268 if peer, ok := g.peers.PickPeer(key); ok {
269 value, err = g.getFromPeer(ctx, peer, key)
270 if err == nil {
271 g.Stats.PeerLoads.Add(1)
272 return value, nil
273 }
274 g.Stats.PeerErrors.Add(1)
275
276
277
278
279 }
280 value, err = g.getLocally(ctx, key, dest)
281 if err != nil {
282 g.Stats.LocalLoadErrs.Add(1)
283 return nil, err
284 }
285 g.Stats.LocalLoads.Add(1)
286 destPopulated = true
287 g.populateCache(key, value, &g.mainCache)
288 return value, nil
289 })
290 if err == nil {
291 value = viewi.(ByteView)
292 }
293 return
294 }
295
296 func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
297 err := g.getter.Get(ctx, key, dest)
298 if err != nil {
299 return ByteView{}, err
300 }
301 return dest.view()
302 }
303
304 func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
305 req := &pb.GetRequest{
306 Group: &g.name,
307 Key: &key,
308 }
309 res := &pb.GetResponse{}
310 err := peer.Get(ctx, req, res)
311 if err != nil {
312 return ByteView{}, err
313 }
314 value := ByteView{b: res.Value}
315
316
317
318 if rand.Intn(10) == 0 {
319 g.populateCache(key, value, &g.hotCache)
320 }
321 return value, nil
322 }
323
324 func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
325 if g.cacheBytes <= 0 {
326 return
327 }
328 value, ok = g.mainCache.get(key)
329 if ok {
330 return
331 }
332 value, ok = g.hotCache.get(key)
333 return
334 }
335
336 func (g *Group) populateCache(key string, value ByteView, cache *cache) {
337 if g.cacheBytes <= 0 {
338 return
339 }
340 cache.add(key, value)
341
342
343 for {
344 mainBytes := g.mainCache.bytes()
345 hotBytes := g.hotCache.bytes()
346 if mainBytes+hotBytes <= g.cacheBytes {
347 return
348 }
349
350
351
352
353 victim := &g.mainCache
354 if hotBytes > mainBytes/8 {
355 victim = &g.hotCache
356 }
357 victim.removeOldest()
358 }
359 }
360
361
362 type CacheType int
363
364 const (
365
366
367 MainCache CacheType = iota + 1
368
369
370
371
372 HotCache
373 )
374
375
376 func (g *Group) CacheStats(which CacheType) CacheStats {
377 switch which {
378 case MainCache:
379 return g.mainCache.stats()
380 case HotCache:
381 return g.hotCache.stats()
382 default:
383 return CacheStats{}
384 }
385 }
386
387
388
389
390 type cache struct {
391 mu sync.RWMutex
392 nbytes int64
393 lru *lru.Cache
394 nhit, nget int64
395 nevict int64
396 }
397
398 func (c *cache) stats() CacheStats {
399 c.mu.RLock()
400 defer c.mu.RUnlock()
401 return CacheStats{
402 Bytes: c.nbytes,
403 Items: c.itemsLocked(),
404 Gets: c.nget,
405 Hits: c.nhit,
406 Evictions: c.nevict,
407 }
408 }
409
410 func (c *cache) add(key string, value ByteView) {
411 c.mu.Lock()
412 defer c.mu.Unlock()
413 if c.lru == nil {
414 c.lru = &lru.Cache{
415 OnEvicted: func(key lru.Key, value interface{}) {
416 val := value.(ByteView)
417 c.nbytes -= int64(len(key.(string))) + int64(val.Len())
418 c.nevict++
419 },
420 }
421 }
422 c.lru.Add(key, value)
423 c.nbytes += int64(len(key)) + int64(value.Len())
424 }
425
426 func (c *cache) get(key string) (value ByteView, ok bool) {
427 c.mu.Lock()
428 defer c.mu.Unlock()
429 c.nget++
430 if c.lru == nil {
431 return
432 }
433 vi, ok := c.lru.Get(key)
434 if !ok {
435 return
436 }
437 c.nhit++
438 return vi.(ByteView), true
439 }
440
441 func (c *cache) removeOldest() {
442 c.mu.Lock()
443 defer c.mu.Unlock()
444 if c.lru != nil {
445 c.lru.RemoveOldest()
446 }
447 }
448
449 func (c *cache) bytes() int64 {
450 c.mu.RLock()
451 defer c.mu.RUnlock()
452 return c.nbytes
453 }
454
455 func (c *cache) items() int64 {
456 c.mu.RLock()
457 defer c.mu.RUnlock()
458 return c.itemsLocked()
459 }
460
461 func (c *cache) itemsLocked() int64 {
462 if c.lru == nil {
463 return 0
464 }
465 return int64(c.lru.Len())
466 }
467
468
469 type AtomicInt int64
470
471
472 func (i *AtomicInt) Add(n int64) {
473 atomic.AddInt64((*int64)(i), n)
474 }
475
476
477 func (i *AtomicInt) Get() int64 {
478 return atomic.LoadInt64((*int64)(i))
479 }
480
481 func (i *AtomicInt) String() string {
482 return strconv.FormatInt(i.Get(), 10)
483 }
484
485
486 type CacheStats struct {
487 Bytes int64
488 Items int64
489 Gets int64
490 Hits int64
491 Evictions int64
492 }
493
View as plain text