...

Source file src/github.com/dgraph-io/ristretto/cache_test.go

Documentation: github.com/dgraph-io/ristretto

     1  package ristretto
     2  
     3  import (
     4  	"fmt"
     5  	"math/rand"
     6  	"strconv"
     7  	"strings"
     8  	"sync"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/dgraph-io/ristretto/z"
    13  	"github.com/stretchr/testify/require"
    14  )
    15  
    16  var wait = time.Millisecond * 10
    17  
    18  func TestCacheKeyToHash(t *testing.T) {
    19  	keyToHashCount := 0
    20  	c, err := NewCache(&Config{
    21  		NumCounters: 100,
    22  		MaxCost:     10,
    23  		BufferItems: 64,
    24  		KeyToHash: func(key interface{}) (uint64, uint64) {
    25  			keyToHashCount++
    26  			return z.KeyToHash(key)
    27  		},
    28  	})
    29  	require.NoError(t, err)
    30  	if c.Set(1, 1, 1) {
    31  		time.Sleep(wait)
    32  		val, ok := c.Get(1)
    33  		require.True(t, ok)
    34  		require.NotNil(t, val)
    35  		c.Del(1)
    36  	}
    37  	require.Equal(t, 3, keyToHashCount)
    38  }
    39  
    40  func TestCacheMaxCost(t *testing.T) {
    41  	charset := "abcdefghijklmnopqrstuvwxyz0123456789"
    42  	key := func() []byte {
    43  		k := make([]byte, 2)
    44  		for i := range k {
    45  			k[i] = charset[rand.Intn(len(charset))]
    46  		}
    47  		return k
    48  	}
    49  	c, err := NewCache(&Config{
    50  		NumCounters: 12960, // 36^2 * 10
    51  		MaxCost:     1e6,   // 1mb
    52  		BufferItems: 64,
    53  		Metrics:     true,
    54  	})
    55  	require.NoError(t, err)
    56  	stop := make(chan struct{}, 8)
    57  	for i := 0; i < 8; i++ {
    58  		go func() {
    59  			for {
    60  				select {
    61  				case <-stop:
    62  					return
    63  				default:
    64  					time.Sleep(time.Millisecond)
    65  
    66  					k := key()
    67  					if _, ok := c.Get(k); !ok {
    68  						val := ""
    69  						if rand.Intn(100) < 10 {
    70  							val = "test"
    71  						} else {
    72  							val = strings.Repeat("a", 1000)
    73  						}
    74  						c.Set(key(), val, int64(2+len(val)))
    75  					}
    76  				}
    77  			}
    78  		}()
    79  	}
    80  	for i := 0; i < 20; i++ {
    81  		time.Sleep(time.Second)
    82  		cacheCost := c.Metrics.CostAdded() - c.Metrics.CostEvicted()
    83  		t.Logf("total cache cost: %d\n", cacheCost)
    84  		require.True(t, float64(cacheCost) <= float64(1e6*1.05))
    85  	}
    86  	for i := 0; i < 8; i++ {
    87  		stop <- struct{}{}
    88  	}
    89  }
    90  
    91  func TestNewCache(t *testing.T) {
    92  	_, err := NewCache(&Config{
    93  		NumCounters: 0,
    94  	})
    95  	require.Error(t, err)
    96  
    97  	_, err = NewCache(&Config{
    98  		NumCounters: 100,
    99  		MaxCost:     0,
   100  	})
   101  	require.Error(t, err)
   102  
   103  	_, err = NewCache(&Config{
   104  		NumCounters: 100,
   105  		MaxCost:     10,
   106  		BufferItems: 0,
   107  	})
   108  	require.Error(t, err)
   109  
   110  	c, err := NewCache(&Config{
   111  		NumCounters: 100,
   112  		MaxCost:     10,
   113  		BufferItems: 64,
   114  		Metrics:     true,
   115  	})
   116  	require.NoError(t, err)
   117  	require.NotNil(t, c)
   118  }
   119  
   120  func TestNilCache(t *testing.T) {
   121  	var c *Cache
   122  	val, ok := c.Get(1)
   123  	require.False(t, ok)
   124  	require.Nil(t, val)
   125  
   126  	require.False(t, c.Set(1, 1, 1))
   127  	c.Del(1)
   128  	c.Clear()
   129  	c.Close()
   130  }
   131  
   132  func TestMultipleClose(t *testing.T) {
   133  	var c *Cache
   134  	c.Close()
   135  
   136  	var err error
   137  	c, err = NewCache(&Config{
   138  		NumCounters: 100,
   139  		MaxCost:     10,
   140  		BufferItems: 64,
   141  		Metrics:     true,
   142  	})
   143  	require.NoError(t, err)
   144  	c.Close()
   145  	c.Close()
   146  }
   147  
   148  func TestCacheProcessItems(t *testing.T) {
   149  	m := &sync.Mutex{}
   150  	evicted := make(map[uint64]struct{})
   151  	c, err := NewCache(&Config{
   152  		NumCounters: 100,
   153  		MaxCost:     10,
   154  		BufferItems: 64,
   155  		Cost: func(value interface{}) int64 {
   156  			return int64(value.(int))
   157  		},
   158  		OnEvict: func(key, conflict uint64, value interface{}, cost int64) {
   159  			m.Lock()
   160  			defer m.Unlock()
   161  			evicted[key] = struct{}{}
   162  		},
   163  	})
   164  	require.NoError(t, err)
   165  
   166  	var key uint64
   167  	var conflict uint64
   168  
   169  	key, conflict = z.KeyToHash(1)
   170  	c.setBuf <- &item{
   171  		flag:     itemNew,
   172  		key:      key,
   173  		conflict: conflict,
   174  		value:    1,
   175  		cost:     0,
   176  	}
   177  	time.Sleep(wait)
   178  	require.True(t, c.policy.Has(1))
   179  	require.Equal(t, int64(1), c.policy.Cost(1))
   180  
   181  	key, conflict = z.KeyToHash(1)
   182  	c.setBuf <- &item{
   183  		flag:     itemUpdate,
   184  		key:      key,
   185  		conflict: conflict,
   186  		value:    2,
   187  		cost:     0,
   188  	}
   189  	time.Sleep(wait)
   190  	require.Equal(t, int64(2), c.policy.Cost(1))
   191  
   192  	key, conflict = z.KeyToHash(1)
   193  	c.setBuf <- &item{
   194  		flag:     itemDelete,
   195  		key:      key,
   196  		conflict: conflict,
   197  	}
   198  	time.Sleep(wait)
   199  	key, conflict = z.KeyToHash(1)
   200  	val, ok := c.store.Get(key, conflict)
   201  	require.False(t, ok)
   202  	require.Nil(t, val)
   203  	require.False(t, c.policy.Has(1))
   204  
   205  	key, conflict = z.KeyToHash(2)
   206  	c.setBuf <- &item{
   207  		flag:     itemNew,
   208  		key:      key,
   209  		conflict: conflict,
   210  		value:    2,
   211  		cost:     3,
   212  	}
   213  	key, conflict = z.KeyToHash(3)
   214  	c.setBuf <- &item{
   215  		flag:     itemNew,
   216  		key:      key,
   217  		conflict: conflict,
   218  		value:    3,
   219  		cost:     3,
   220  	}
   221  	key, conflict = z.KeyToHash(4)
   222  	c.setBuf <- &item{
   223  		flag:     itemNew,
   224  		key:      key,
   225  		conflict: conflict,
   226  		value:    3,
   227  		cost:     3,
   228  	}
   229  	key, conflict = z.KeyToHash(5)
   230  	c.setBuf <- &item{
   231  		flag:     itemNew,
   232  		key:      key,
   233  		conflict: conflict,
   234  		value:    3,
   235  		cost:     5,
   236  	}
   237  	time.Sleep(wait)
   238  	m.Lock()
   239  	require.NotEqual(t, 0, len(evicted))
   240  	m.Unlock()
   241  
   242  	defer func() {
   243  		require.NotNil(t, recover())
   244  	}()
   245  	c.Close()
   246  	c.setBuf <- &item{flag: itemNew}
   247  }
   248  
   249  func TestCacheGet(t *testing.T) {
   250  	c, err := NewCache(&Config{
   251  		NumCounters: 100,
   252  		MaxCost:     10,
   253  		BufferItems: 64,
   254  		Metrics:     true,
   255  	})
   256  	require.NoError(t, err)
   257  
   258  	key, conflict := z.KeyToHash(1)
   259  	i := item{
   260  		key:      key,
   261  		conflict: conflict,
   262  		value:    1,
   263  	}
   264  	c.store.Set(&i)
   265  	val, ok := c.Get(1)
   266  	require.True(t, ok)
   267  	require.NotNil(t, val)
   268  
   269  	val, ok = c.Get(2)
   270  	require.False(t, ok)
   271  	require.Nil(t, val)
   272  
   273  	// 0.5 and not 1.0 because we tried Getting each item twice
   274  	require.Equal(t, 0.5, c.Metrics.Ratio())
   275  
   276  	c = nil
   277  	val, ok = c.Get(0)
   278  	require.False(t, ok)
   279  	require.Nil(t, val)
   280  }
   281  
   282  // retrySet calls SetWithTTL until the item is accepted by the cache.
   283  func retrySet(t *testing.T, c *Cache, key, value int, cost int64, ttl time.Duration) {
   284  	for {
   285  		if set := c.SetWithTTL(key, value, cost, ttl); !set {
   286  			time.Sleep(wait)
   287  			continue
   288  		}
   289  
   290  		time.Sleep(wait)
   291  		val, ok := c.Get(key)
   292  		require.True(t, ok)
   293  		require.NotNil(t, val)
   294  		require.Equal(t, value, val.(int))
   295  		return
   296  	}
   297  }
   298  
   299  func TestCacheSet(t *testing.T) {
   300  	c, err := NewCache(&Config{
   301  		NumCounters: 100,
   302  		MaxCost:     10,
   303  		BufferItems: 64,
   304  		Metrics:     true,
   305  	})
   306  	require.NoError(t, err)
   307  
   308  	retrySet(t, c, 1, 1, 1, 0)
   309  
   310  	c.Set(1, 2, 2)
   311  	val, ok := c.store.Get(z.KeyToHash(1))
   312  	require.True(t, ok)
   313  	require.Equal(t, 2, val.(int))
   314  
   315  	c.stop <- struct{}{}
   316  	for i := 0; i < setBufSize; i++ {
   317  		key, conflict := z.KeyToHash(1)
   318  		c.setBuf <- &item{
   319  			flag:     itemUpdate,
   320  			key:      key,
   321  			conflict: conflict,
   322  			value:    1,
   323  			cost:     1,
   324  		}
   325  	}
   326  	require.False(t, c.Set(2, 2, 1))
   327  	require.Equal(t, uint64(1), c.Metrics.SetsDropped())
   328  	close(c.setBuf)
   329  	close(c.stop)
   330  
   331  	c = nil
   332  	require.False(t, c.Set(1, 1, 1))
   333  }
   334  
   335  func TestRecacheWithTTL(t *testing.T) {
   336  	c, err := NewCache(&Config{
   337  		NumCounters: 100,
   338  		MaxCost:     10,
   339  		BufferItems: 64,
   340  		Metrics:     true,
   341  	})
   342  
   343  	require.NoError(t, err)
   344  
   345  	// Set initial value for key = 1
   346  	insert := c.SetWithTTL(1, 1, 1, 5*time.Second)
   347  	require.True(t, insert)
   348  	time.Sleep(2 * time.Second)
   349  
   350  	// Get value from cache for key = 1
   351  	val, ok := c.Get(1)
   352  	require.True(t, ok)
   353  	require.NotNil(t, val)
   354  	require.Equal(t, 1, val)
   355  
   356  	// Wait for expiration
   357  	time.Sleep(5 * time.Second)
   358  
   359  	// The cached value for key = 1 should be gone
   360  	val, ok = c.Get(1)
   361  	require.False(t, ok)
   362  	require.Nil(t, val)
   363  
   364  	// Set new value for key = 1
   365  	insert = c.SetWithTTL(1, 2, 1, 5*time.Second)
   366  	require.True(t, insert)
   367  	time.Sleep(2 * time.Second)
   368  
   369  	// Get value from cache for key = 1
   370  	val, ok = c.Get(1)
   371  	require.True(t, ok)
   372  	require.NotNil(t, val)
   373  	require.Equal(t, 2, val)
   374  }
   375  
   376  func TestCacheSetWithTTL(t *testing.T) {
   377  	m := &sync.Mutex{}
   378  	evicted := make(map[uint64]struct{})
   379  	c, err := NewCache(&Config{
   380  		NumCounters: 100,
   381  		MaxCost:     10,
   382  		BufferItems: 64,
   383  		Metrics:     true,
   384  		OnEvict: func(key, conflict uint64, value interface{}, cost int64) {
   385  			m.Lock()
   386  			defer m.Unlock()
   387  			evicted[key] = struct{}{}
   388  		},
   389  	})
   390  	require.NoError(t, err)
   391  
   392  	retrySet(t, c, 1, 1, 1, time.Second)
   393  
   394  	// Sleep to make sure the item has expired after execution resumes.
   395  	time.Sleep(2 * time.Second)
   396  	val, ok := c.Get(1)
   397  	require.False(t, ok)
   398  	require.Nil(t, val)
   399  
   400  	// Sleep to ensure that the bucket where the item was stored has been cleared
   401  	// from the expiraton map.
   402  	time.Sleep(5 * time.Second)
   403  	m.Lock()
   404  	require.Equal(t, 1, len(evicted))
   405  	_, ok = evicted[1]
   406  	require.True(t, ok)
   407  	m.Unlock()
   408  
   409  	// Verify that expiration times are overwritten.
   410  	retrySet(t, c, 2, 1, 1, time.Second)
   411  	retrySet(t, c, 2, 2, 1, 100*time.Second)
   412  	time.Sleep(3 * time.Second)
   413  	val, ok = c.Get(2)
   414  	require.True(t, ok)
   415  	require.Equal(t, 2, val.(int))
   416  
   417  	// Verify that entries with no expiration are overwritten.
   418  	retrySet(t, c, 3, 1, 1, 0)
   419  	retrySet(t, c, 3, 2, 1, time.Second)
   420  	time.Sleep(3 * time.Second)
   421  	val, ok = c.Get(3)
   422  	require.False(t, ok)
   423  	require.Nil(t, val)
   424  }
   425  
   426  func TestCacheDel(t *testing.T) {
   427  	c, err := NewCache(&Config{
   428  		NumCounters: 100,
   429  		MaxCost:     10,
   430  		BufferItems: 64,
   431  	})
   432  	require.NoError(t, err)
   433  
   434  	c.Set(1, 1, 1)
   435  	c.Del(1)
   436  	// The deletes and sets are pushed through the setbuf. It might be possible
   437  	// that the delete is not processed before the following get is called. So
   438  	// wait for a millisecond for things to be processed.
   439  	time.Sleep(time.Millisecond)
   440  	val, ok := c.Get(1)
   441  	require.False(t, ok)
   442  	require.Nil(t, val)
   443  
   444  	c = nil
   445  	defer func() {
   446  		require.Nil(t, recover())
   447  	}()
   448  	c.Del(1)
   449  }
   450  
   451  func TestCacheDelWithTTL(t *testing.T) {
   452  	c, err := NewCache(&Config{
   453  		NumCounters: 100,
   454  		MaxCost:     10,
   455  		BufferItems: 64,
   456  	})
   457  	require.NoError(t, err)
   458  	retrySet(t, c, 3, 1, 1, 10*time.Second)
   459  	time.Sleep(1 * time.Second)
   460  	// Delete the item
   461  	c.Del(3)
   462  	// Ensure the key is deleted.
   463  	val, ok := c.Get(3)
   464  	require.False(t, ok)
   465  	require.Nil(t, val)
   466  }
   467  
   468  func TestCacheClear(t *testing.T) {
   469  	c, err := NewCache(&Config{
   470  		NumCounters: 100,
   471  		MaxCost:     10,
   472  		BufferItems: 64,
   473  		Metrics:     true,
   474  	})
   475  	require.NoError(t, err)
   476  
   477  	for i := 0; i < 10; i++ {
   478  		c.Set(i, i, 1)
   479  	}
   480  	time.Sleep(wait)
   481  	require.Equal(t, uint64(10), c.Metrics.KeysAdded())
   482  
   483  	c.Clear()
   484  	require.Equal(t, uint64(0), c.Metrics.KeysAdded())
   485  
   486  	for i := 0; i < 10; i++ {
   487  		val, ok := c.Get(i)
   488  		require.False(t, ok)
   489  		require.Nil(t, val)
   490  	}
   491  }
   492  
   493  func TestCacheMetrics(t *testing.T) {
   494  	c, err := NewCache(&Config{
   495  		NumCounters: 100,
   496  		MaxCost:     10,
   497  		BufferItems: 64,
   498  		Metrics:     true,
   499  	})
   500  	require.NoError(t, err)
   501  
   502  	for i := 0; i < 10; i++ {
   503  		c.Set(i, i, 1)
   504  	}
   505  	time.Sleep(wait)
   506  	m := c.Metrics
   507  	require.Equal(t, uint64(10), m.KeysAdded())
   508  }
   509  
   510  func TestMetrics(t *testing.T) {
   511  	newMetrics()
   512  }
   513  
   514  func TestNilMetrics(t *testing.T) {
   515  	var m *Metrics
   516  	for _, f := range []func() uint64{
   517  		m.Hits,
   518  		m.Misses,
   519  		m.KeysAdded,
   520  		m.KeysEvicted,
   521  		m.CostEvicted,
   522  		m.SetsDropped,
   523  		m.SetsRejected,
   524  		m.GetsDropped,
   525  		m.GetsKept,
   526  	} {
   527  		require.Equal(t, uint64(0), f())
   528  	}
   529  }
   530  
   531  func TestMetricsAddGet(t *testing.T) {
   532  	m := newMetrics()
   533  	m.add(hit, 1, 1)
   534  	m.add(hit, 2, 2)
   535  	m.add(hit, 3, 3)
   536  	require.Equal(t, uint64(6), m.Hits())
   537  
   538  	m = nil
   539  	m.add(hit, 1, 1)
   540  	require.Equal(t, uint64(0), m.Hits())
   541  }
   542  
   543  func TestMetricsRatio(t *testing.T) {
   544  	m := newMetrics()
   545  	require.Equal(t, float64(0), m.Ratio())
   546  
   547  	m.add(hit, 1, 1)
   548  	m.add(hit, 2, 2)
   549  	m.add(miss, 1, 1)
   550  	m.add(miss, 2, 2)
   551  	require.Equal(t, 0.5, m.Ratio())
   552  
   553  	m = nil
   554  	require.Equal(t, float64(0), m.Ratio())
   555  }
   556  
   557  func TestMetricsString(t *testing.T) {
   558  	m := newMetrics()
   559  	m.add(hit, 1, 1)
   560  	m.add(miss, 1, 1)
   561  	m.add(keyAdd, 1, 1)
   562  	m.add(keyUpdate, 1, 1)
   563  	m.add(keyEvict, 1, 1)
   564  	m.add(costAdd, 1, 1)
   565  	m.add(costEvict, 1, 1)
   566  	m.add(dropSets, 1, 1)
   567  	m.add(rejectSets, 1, 1)
   568  	m.add(dropGets, 1, 1)
   569  	m.add(keepGets, 1, 1)
   570  	require.Equal(t, uint64(1), m.Hits())
   571  	require.Equal(t, uint64(1), m.Misses())
   572  	require.Equal(t, 0.5, m.Ratio())
   573  	require.Equal(t, uint64(1), m.KeysAdded())
   574  	require.Equal(t, uint64(1), m.KeysUpdated())
   575  	require.Equal(t, uint64(1), m.KeysEvicted())
   576  	require.Equal(t, uint64(1), m.CostAdded())
   577  	require.Equal(t, uint64(1), m.CostEvicted())
   578  	require.Equal(t, uint64(1), m.SetsDropped())
   579  	require.Equal(t, uint64(1), m.SetsRejected())
   580  	require.Equal(t, uint64(1), m.GetsDropped())
   581  	require.Equal(t, uint64(1), m.GetsKept())
   582  
   583  	require.NotEqual(t, 0, len(m.String()))
   584  
   585  	m = nil
   586  	require.Equal(t, 0, len(m.String()))
   587  
   588  	require.Equal(t, "unidentified", stringFor(doNotUse))
   589  }
   590  
   591  func TestCacheMetricsClear(t *testing.T) {
   592  	c, err := NewCache(&Config{
   593  		NumCounters: 100,
   594  		MaxCost:     10,
   595  		BufferItems: 64,
   596  		Metrics:     true,
   597  	})
   598  	require.NoError(t, err)
   599  
   600  	c.Set(1, 1, 1)
   601  	stop := make(chan struct{})
   602  	go func() {
   603  		for {
   604  			select {
   605  			case <-stop:
   606  				return
   607  			default:
   608  				c.Get(1)
   609  			}
   610  		}
   611  	}()
   612  	time.Sleep(wait)
   613  	c.Clear()
   614  	stop <- struct{}{}
   615  	c.Metrics = nil
   616  	c.Metrics.Clear()
   617  }
   618  
   619  func init() {
   620  	// Set bucketSizeSecs to 1 to avoid waiting too much during the tests.
   621  	bucketDurationSecs = 1
   622  }
   623  
   624  // Regression test for bug https://github.com/dgraph-io/ristretto/issues/167
   625  func TestDropUpdates(t *testing.T) {
   626  	originalSetBugSize := setBufSize
   627  	defer func() { setBufSize = originalSetBugSize }()
   628  
   629  	test := func() {
   630  		// dropppedMap stores the items dropped from the cache.
   631  		droppedMap := make(map[int]struct{})
   632  		lastEvictedSet := int64(-1)
   633  
   634  		var err error
   635  		handler := func(_ interface{}, value interface{}) {
   636  			v := value.(string)
   637  			lastEvictedSet, err = strconv.ParseInt(string(v), 10, 32)
   638  			require.NoError(t, err)
   639  
   640  			_, ok := droppedMap[int(lastEvictedSet)]
   641  			if ok {
   642  				panic(fmt.Sprintf("val = %+v was dropped but it got evicted. Dropped items: %+v\n",
   643  					lastEvictedSet, droppedMap))
   644  			}
   645  		}
   646  
   647  		// This is important. The race condition shows up only when the setBuf
   648  		// is full and that's why we reduce the buf size here. The test will
   649  		// try to fill up the setbuf to it's capacity and then perform an
   650  		// update on a key.
   651  		setBufSize = 10
   652  
   653  		c, err := NewCache(&Config{
   654  			NumCounters: 100,
   655  			MaxCost:     10,
   656  			BufferItems: 64,
   657  			Metrics:     true,
   658  			OnEvict: func(_, _ uint64, value interface{}, _ int64) {
   659  				handler(nil, value)
   660  			},
   661  		})
   662  		require.NoError(t, err)
   663  
   664  		for i := 0; i < 5*setBufSize; i++ {
   665  			v := fmt.Sprintf("%0100d", i)
   666  			// We're updating the same key.
   667  			if !c.Set(0, v, 1) {
   668  				// The race condition doesn't show up without this sleep.
   669  				time.Sleep(time.Microsecond)
   670  				droppedMap[i] = struct{}{}
   671  			}
   672  		}
   673  		// Wait for all the items to be processed.
   674  		time.Sleep(time.Millisecond)
   675  		// This will cause eviction from the cache.
   676  		require.True(t, c.Set(1, nil, 10))
   677  		c.Close()
   678  	}
   679  
   680  	// Run the test 100 times since it's not reliable.
   681  	for i := 0; i < 100; i++ {
   682  		test()
   683  	}
   684  }
   685  

View as plain text