...

Source file src/github.com/golang/groupcache/groupcache.go

Documentation: github.com/golang/groupcache

     1  /*
     2  Copyright 2012 Google Inc.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8       http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package groupcache provides a data loading mechanism with caching
    18  // and de-duplication that works across a set of peer processes.
    19  //
    20  // Each data Get first consults its local cache, otherwise delegates
    21  // to the requested key's canonical owner, which then checks its cache
    22  // or finally gets the data.  In the common case, many concurrent
    23  // cache misses across a set of peers for the same key result in just
    24  // one cache fill.
    25  package groupcache
    26  
    27  import (
    28  	"context"
    29  	"errors"
    30  	"math/rand"
    31  	"strconv"
    32  	"sync"
    33  	"sync/atomic"
    34  
    35  	pb "github.com/golang/groupcache/groupcachepb"
    36  	"github.com/golang/groupcache/lru"
    37  	"github.com/golang/groupcache/singleflight"
    38  )
    39  
    40  // A Getter loads data for a key.
    41  type Getter interface {
    42  	// Get returns the value identified by key, populating dest.
    43  	//
    44  	// The returned data must be unversioned. That is, key must
    45  	// uniquely describe the loaded data, without an implicit
    46  	// current time, and without relying on cache expiration
    47  	// mechanisms.
    48  	Get(ctx context.Context, key string, dest Sink) error
    49  }
    50  
    51  // A GetterFunc implements Getter with a function.
    52  type GetterFunc func(ctx context.Context, key string, dest Sink) error
    53  
    54  func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
    55  	return f(ctx, key, dest)
    56  }
    57  
    58  var (
    59  	mu     sync.RWMutex
    60  	groups = make(map[string]*Group)
    61  
    62  	initPeerServerOnce sync.Once
    63  	initPeerServer     func()
    64  )
    65  
    66  // GetGroup returns the named group previously created with NewGroup, or
    67  // nil if there's no such group.
    68  func GetGroup(name string) *Group {
    69  	mu.RLock()
    70  	g := groups[name]
    71  	mu.RUnlock()
    72  	return g
    73  }
    74  
    75  // NewGroup creates a coordinated group-aware Getter from a Getter.
    76  //
    77  // The returned Getter tries (but does not guarantee) to run only one
    78  // Get call at once for a given key across an entire set of peer
    79  // processes. Concurrent callers both in the local process and in
    80  // other processes receive copies of the answer once the original Get
    81  // completes.
    82  //
    83  // The group name must be unique for each getter.
    84  func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
    85  	return newGroup(name, cacheBytes, getter, nil)
    86  }
    87  
    88  // If peers is nil, the peerPicker is called via a sync.Once to initialize it.
    89  func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
    90  	if getter == nil {
    91  		panic("nil Getter")
    92  	}
    93  	mu.Lock()
    94  	defer mu.Unlock()
    95  	initPeerServerOnce.Do(callInitPeerServer)
    96  	if _, dup := groups[name]; dup {
    97  		panic("duplicate registration of group " + name)
    98  	}
    99  	g := &Group{
   100  		name:       name,
   101  		getter:     getter,
   102  		peers:      peers,
   103  		cacheBytes: cacheBytes,
   104  		loadGroup:  &singleflight.Group{},
   105  	}
   106  	if fn := newGroupHook; fn != nil {
   107  		fn(g)
   108  	}
   109  	groups[name] = g
   110  	return g
   111  }
   112  
   113  // newGroupHook, if non-nil, is called right after a new group is created.
   114  var newGroupHook func(*Group)
   115  
   116  // RegisterNewGroupHook registers a hook that is run each time
   117  // a group is created.
   118  func RegisterNewGroupHook(fn func(*Group)) {
   119  	if newGroupHook != nil {
   120  		panic("RegisterNewGroupHook called more than once")
   121  	}
   122  	newGroupHook = fn
   123  }
   124  
   125  // RegisterServerStart registers a hook that is run when the first
   126  // group is created.
   127  func RegisterServerStart(fn func()) {
   128  	if initPeerServer != nil {
   129  		panic("RegisterServerStart called more than once")
   130  	}
   131  	initPeerServer = fn
   132  }
   133  
   134  func callInitPeerServer() {
   135  	if initPeerServer != nil {
   136  		initPeerServer()
   137  	}
   138  }
   139  
   140  // A Group is a cache namespace and associated data loaded spread over
   141  // a group of 1 or more machines.
   142  type Group struct {
   143  	name       string
   144  	getter     Getter
   145  	peersOnce  sync.Once
   146  	peers      PeerPicker
   147  	cacheBytes int64 // limit for sum of mainCache and hotCache size
   148  
   149  	// mainCache is a cache of the keys for which this process
   150  	// (amongst its peers) is authoritative. That is, this cache
   151  	// contains keys which consistent hash on to this process's
   152  	// peer number.
   153  	mainCache cache
   154  
   155  	// hotCache contains keys/values for which this peer is not
   156  	// authoritative (otherwise they would be in mainCache), but
   157  	// are popular enough to warrant mirroring in this process to
   158  	// avoid going over the network to fetch from a peer.  Having
   159  	// a hotCache avoids network hotspotting, where a peer's
   160  	// network card could become the bottleneck on a popular key.
   161  	// This cache is used sparingly to maximize the total number
   162  	// of key/value pairs that can be stored globally.
   163  	hotCache cache
   164  
   165  	// loadGroup ensures that each key is only fetched once
   166  	// (either locally or remotely), regardless of the number of
   167  	// concurrent callers.
   168  	loadGroup flightGroup
   169  
   170  	_ int32 // force Stats to be 8-byte aligned on 32-bit platforms
   171  
   172  	// Stats are statistics on the group.
   173  	Stats Stats
   174  }
   175  
   176  // flightGroup is defined as an interface which flightgroup.Group
   177  // satisfies.  We define this so that we may test with an alternate
   178  // implementation.
   179  type flightGroup interface {
   180  	// Done is called when Do is done.
   181  	Do(key string, fn func() (interface{}, error)) (interface{}, error)
   182  }
   183  
   184  // Stats are per-group statistics.
   185  type Stats struct {
   186  	Gets           AtomicInt // any Get request, including from peers
   187  	CacheHits      AtomicInt // either cache was good
   188  	PeerLoads      AtomicInt // either remote load or remote cache hit (not an error)
   189  	PeerErrors     AtomicInt
   190  	Loads          AtomicInt // (gets - cacheHits)
   191  	LoadsDeduped   AtomicInt // after singleflight
   192  	LocalLoads     AtomicInt // total good local loads
   193  	LocalLoadErrs  AtomicInt // total bad local loads
   194  	ServerRequests AtomicInt // gets that came over the network from peers
   195  }
   196  
   197  // Name returns the name of the group.
   198  func (g *Group) Name() string {
   199  	return g.name
   200  }
   201  
   202  func (g *Group) initPeers() {
   203  	if g.peers == nil {
   204  		g.peers = getPeers(g.name)
   205  	}
   206  }
   207  
   208  func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
   209  	g.peersOnce.Do(g.initPeers)
   210  	g.Stats.Gets.Add(1)
   211  	if dest == nil {
   212  		return errors.New("groupcache: nil dest Sink")
   213  	}
   214  	value, cacheHit := g.lookupCache(key)
   215  
   216  	if cacheHit {
   217  		g.Stats.CacheHits.Add(1)
   218  		return setSinkView(dest, value)
   219  	}
   220  
   221  	// Optimization to avoid double unmarshalling or copying: keep
   222  	// track of whether the dest was already populated. One caller
   223  	// (if local) will set this; the losers will not. The common
   224  	// case will likely be one caller.
   225  	destPopulated := false
   226  	value, destPopulated, err := g.load(ctx, key, dest)
   227  	if err != nil {
   228  		return err
   229  	}
   230  	if destPopulated {
   231  		return nil
   232  	}
   233  	return setSinkView(dest, value)
   234  }
   235  
   236  // load loads key either by invoking the getter locally or by sending it to another machine.
   237  func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
   238  	g.Stats.Loads.Add(1)
   239  	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
   240  		// Check the cache again because singleflight can only dedup calls
   241  		// that overlap concurrently.  It's possible for 2 concurrent
   242  		// requests to miss the cache, resulting in 2 load() calls.  An
   243  		// unfortunate goroutine scheduling would result in this callback
   244  		// being run twice, serially.  If we don't check the cache again,
   245  		// cache.nbytes would be incremented below even though there will
   246  		// be only one entry for this key.
   247  		//
   248  		// Consider the following serialized event ordering for two
   249  		// goroutines in which this callback gets called twice for the
   250  		// same key:
   251  		// 1: Get("key")
   252  		// 2: Get("key")
   253  		// 1: lookupCache("key")
   254  		// 2: lookupCache("key")
   255  		// 1: load("key")
   256  		// 2: load("key")
   257  		// 1: loadGroup.Do("key", fn)
   258  		// 1: fn()
   259  		// 2: loadGroup.Do("key", fn)
   260  		// 2: fn()
   261  		if value, cacheHit := g.lookupCache(key); cacheHit {
   262  			g.Stats.CacheHits.Add(1)
   263  			return value, nil
   264  		}
   265  		g.Stats.LoadsDeduped.Add(1)
   266  		var value ByteView
   267  		var err error
   268  		if peer, ok := g.peers.PickPeer(key); ok {
   269  			value, err = g.getFromPeer(ctx, peer, key)
   270  			if err == nil {
   271  				g.Stats.PeerLoads.Add(1)
   272  				return value, nil
   273  			}
   274  			g.Stats.PeerErrors.Add(1)
   275  			// TODO(bradfitz): log the peer's error? keep
   276  			// log of the past few for /groupcachez?  It's
   277  			// probably boring (normal task movement), so not
   278  			// worth logging I imagine.
   279  		}
   280  		value, err = g.getLocally(ctx, key, dest)
   281  		if err != nil {
   282  			g.Stats.LocalLoadErrs.Add(1)
   283  			return nil, err
   284  		}
   285  		g.Stats.LocalLoads.Add(1)
   286  		destPopulated = true // only one caller of load gets this return value
   287  		g.populateCache(key, value, &g.mainCache)
   288  		return value, nil
   289  	})
   290  	if err == nil {
   291  		value = viewi.(ByteView)
   292  	}
   293  	return
   294  }
   295  
   296  func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
   297  	err := g.getter.Get(ctx, key, dest)
   298  	if err != nil {
   299  		return ByteView{}, err
   300  	}
   301  	return dest.view()
   302  }
   303  
   304  func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
   305  	req := &pb.GetRequest{
   306  		Group: &g.name,
   307  		Key:   &key,
   308  	}
   309  	res := &pb.GetResponse{}
   310  	err := peer.Get(ctx, req, res)
   311  	if err != nil {
   312  		return ByteView{}, err
   313  	}
   314  	value := ByteView{b: res.Value}
   315  	// TODO(bradfitz): use res.MinuteQps or something smart to
   316  	// conditionally populate hotCache.  For now just do it some
   317  	// percentage of the time.
   318  	if rand.Intn(10) == 0 {
   319  		g.populateCache(key, value, &g.hotCache)
   320  	}
   321  	return value, nil
   322  }
   323  
   324  func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
   325  	if g.cacheBytes <= 0 {
   326  		return
   327  	}
   328  	value, ok = g.mainCache.get(key)
   329  	if ok {
   330  		return
   331  	}
   332  	value, ok = g.hotCache.get(key)
   333  	return
   334  }
   335  
   336  func (g *Group) populateCache(key string, value ByteView, cache *cache) {
   337  	if g.cacheBytes <= 0 {
   338  		return
   339  	}
   340  	cache.add(key, value)
   341  
   342  	// Evict items from cache(s) if necessary.
   343  	for {
   344  		mainBytes := g.mainCache.bytes()
   345  		hotBytes := g.hotCache.bytes()
   346  		if mainBytes+hotBytes <= g.cacheBytes {
   347  			return
   348  		}
   349  
   350  		// TODO(bradfitz): this is good-enough-for-now logic.
   351  		// It should be something based on measurements and/or
   352  		// respecting the costs of different resources.
   353  		victim := &g.mainCache
   354  		if hotBytes > mainBytes/8 {
   355  			victim = &g.hotCache
   356  		}
   357  		victim.removeOldest()
   358  	}
   359  }
   360  
   361  // CacheType represents a type of cache.
   362  type CacheType int
   363  
   364  const (
   365  	// The MainCache is the cache for items that this peer is the
   366  	// owner for.
   367  	MainCache CacheType = iota + 1
   368  
   369  	// The HotCache is the cache for items that seem popular
   370  	// enough to replicate to this node, even though it's not the
   371  	// owner.
   372  	HotCache
   373  )
   374  
   375  // CacheStats returns stats about the provided cache within the group.
   376  func (g *Group) CacheStats(which CacheType) CacheStats {
   377  	switch which {
   378  	case MainCache:
   379  		return g.mainCache.stats()
   380  	case HotCache:
   381  		return g.hotCache.stats()
   382  	default:
   383  		return CacheStats{}
   384  	}
   385  }
   386  
   387  // cache is a wrapper around an *lru.Cache that adds synchronization,
   388  // makes values always be ByteView, and counts the size of all keys and
   389  // values.
   390  type cache struct {
   391  	mu         sync.RWMutex
   392  	nbytes     int64 // of all keys and values
   393  	lru        *lru.Cache
   394  	nhit, nget int64
   395  	nevict     int64 // number of evictions
   396  }
   397  
   398  func (c *cache) stats() CacheStats {
   399  	c.mu.RLock()
   400  	defer c.mu.RUnlock()
   401  	return CacheStats{
   402  		Bytes:     c.nbytes,
   403  		Items:     c.itemsLocked(),
   404  		Gets:      c.nget,
   405  		Hits:      c.nhit,
   406  		Evictions: c.nevict,
   407  	}
   408  }
   409  
   410  func (c *cache) add(key string, value ByteView) {
   411  	c.mu.Lock()
   412  	defer c.mu.Unlock()
   413  	if c.lru == nil {
   414  		c.lru = &lru.Cache{
   415  			OnEvicted: func(key lru.Key, value interface{}) {
   416  				val := value.(ByteView)
   417  				c.nbytes -= int64(len(key.(string))) + int64(val.Len())
   418  				c.nevict++
   419  			},
   420  		}
   421  	}
   422  	c.lru.Add(key, value)
   423  	c.nbytes += int64(len(key)) + int64(value.Len())
   424  }
   425  
   426  func (c *cache) get(key string) (value ByteView, ok bool) {
   427  	c.mu.Lock()
   428  	defer c.mu.Unlock()
   429  	c.nget++
   430  	if c.lru == nil {
   431  		return
   432  	}
   433  	vi, ok := c.lru.Get(key)
   434  	if !ok {
   435  		return
   436  	}
   437  	c.nhit++
   438  	return vi.(ByteView), true
   439  }
   440  
   441  func (c *cache) removeOldest() {
   442  	c.mu.Lock()
   443  	defer c.mu.Unlock()
   444  	if c.lru != nil {
   445  		c.lru.RemoveOldest()
   446  	}
   447  }
   448  
   449  func (c *cache) bytes() int64 {
   450  	c.mu.RLock()
   451  	defer c.mu.RUnlock()
   452  	return c.nbytes
   453  }
   454  
   455  func (c *cache) items() int64 {
   456  	c.mu.RLock()
   457  	defer c.mu.RUnlock()
   458  	return c.itemsLocked()
   459  }
   460  
   461  func (c *cache) itemsLocked() int64 {
   462  	if c.lru == nil {
   463  		return 0
   464  	}
   465  	return int64(c.lru.Len())
   466  }
   467  
   468  // An AtomicInt is an int64 to be accessed atomically.
   469  type AtomicInt int64
   470  
   471  // Add atomically adds n to i.
   472  func (i *AtomicInt) Add(n int64) {
   473  	atomic.AddInt64((*int64)(i), n)
   474  }
   475  
   476  // Get atomically gets the value of i.
   477  func (i *AtomicInt) Get() int64 {
   478  	return atomic.LoadInt64((*int64)(i))
   479  }
   480  
   481  func (i *AtomicInt) String() string {
   482  	return strconv.FormatInt(i.Get(), 10)
   483  }
   484  
   485  // CacheStats are returned by stats accessors on Group.
   486  type CacheStats struct {
   487  	Bytes     int64
   488  	Items     int64
   489  	Gets      int64
   490  	Hits      int64
   491  	Evictions int64
   492  }
   493  

View as plain text