...

Source file src/github.com/peterbourgon/diskv/v3/diskv.go

Documentation: github.com/peterbourgon/diskv/v3

     1  // Diskv (disk-vee) is a simple, persistent, key-value store.
     2  // It stores all data flatly on the filesystem.
     3  
     4  package diskv
     5  
     6  import (
     7  	"bytes"
     8  	"errors"
     9  	"fmt"
    10  	"io"
    11  	"io/ioutil"
    12  	"os"
    13  	"path/filepath"
    14  	"strings"
    15  	"sync"
    16  	"syscall"
    17  )
    18  
    19  const (
    20  	defaultBasePath             = "diskv"
    21  	defaultFilePerm os.FileMode = 0666
    22  	defaultPathPerm os.FileMode = 0777
    23  )
    24  
    25  // PathKey represents a string key that has been transformed to
    26  // a directory and file name where the content will eventually
    27  // be stored
    28  type PathKey struct {
    29  	Path        []string
    30  	FileName    string
    31  	originalKey string
    32  }
    33  
    34  var (
    35  	defaultAdvancedTransform = func(s string) *PathKey { return &PathKey{Path: []string{}, FileName: s} }
    36  	defaultInverseTransform  = func(pathKey *PathKey) string { return pathKey.FileName }
    37  	errCanceled              = errors.New("canceled")
    38  	errEmptyKey              = errors.New("empty key")
    39  	errBadKey                = errors.New("bad key")
    40  	errImportDirectory       = errors.New("can't import a directory")
    41  )
    42  
    43  // TransformFunction transforms a key into a slice of strings, with each
    44  // element in the slice representing a directory in the file path where the
    45  // key's entry will eventually be stored.
    46  //
    47  // For example, if TransformFunc transforms "abcdef" to ["ab", "cde", "f"],
    48  // the final location of the data file will be <basedir>/ab/cde/f/abcdef
    49  type TransformFunction func(s string) []string
    50  
    51  // AdvancedTransformFunction transforms a key into a PathKey.
    52  //
    53  // A PathKey contains a slice of strings, where each element in the slice
    54  // represents a directory in the file path where the key's entry will eventually
    55  // be stored, as well as the filename.
    56  //
    57  // For example, if AdvancedTransformFunc transforms "abcdef/file.txt" to the
    58  // PathKey {Path: ["ab", "cde", "f"], FileName: "file.txt"}, the final location
    59  // of the data file will be <basedir>/ab/cde/f/file.txt.
    60  //
    61  // You must provide an InverseTransformFunction if you use an
    62  // AdvancedTransformFunction.
    63  type AdvancedTransformFunction func(s string) *PathKey
    64  
    65  // InverseTransformFunction takes a PathKey and converts it back to a Diskv key.
    66  // In effect, it's the opposite of an AdvancedTransformFunction.
    67  type InverseTransformFunction func(pathKey *PathKey) string
    68  
    69  // Options define a set of properties that dictate Diskv behavior.
    70  // All values are optional.
    71  type Options struct {
    72  	BasePath          string
    73  	Transform         TransformFunction
    74  	AdvancedTransform AdvancedTransformFunction
    75  	InverseTransform  InverseTransformFunction
    76  	CacheSizeMax      uint64 // bytes
    77  	PathPerm          os.FileMode
    78  	FilePerm          os.FileMode
    79  	// If TempDir is set, it will enable filesystem atomic writes by
    80  	// writing temporary files to that location before being moved
    81  	// to BasePath.
    82  	// Note that TempDir MUST be on the same device/partition as
    83  	// BasePath.
    84  	TempDir string
    85  
    86  	Index     Index
    87  	IndexLess LessFunction
    88  
    89  	Compression Compression
    90  }
    91  
    92  // Diskv implements the Diskv interface. You shouldn't construct Diskv
    93  // structures directly; instead, use the New constructor.
    94  type Diskv struct {
    95  	Options
    96  	mu        sync.RWMutex
    97  	cache     map[string][]byte
    98  	cacheSize uint64
    99  }
   100  
   101  // New returns an initialized Diskv structure, ready to use.
   102  // If the path identified by baseDir already contains data,
   103  // it will be accessible, but not yet cached.
   104  func New(o Options) *Diskv {
   105  	if o.BasePath == "" {
   106  		o.BasePath = defaultBasePath
   107  	}
   108  
   109  	if o.AdvancedTransform == nil {
   110  		if o.Transform == nil {
   111  			o.AdvancedTransform = defaultAdvancedTransform
   112  		} else {
   113  			o.AdvancedTransform = convertToAdvancedTransform(o.Transform)
   114  		}
   115  		if o.InverseTransform == nil {
   116  			o.InverseTransform = defaultInverseTransform
   117  		}
   118  	} else {
   119  		if o.InverseTransform == nil {
   120  			panic("You must provide an InverseTransform function in advanced mode")
   121  		}
   122  	}
   123  
   124  	if o.PathPerm == 0 {
   125  		o.PathPerm = defaultPathPerm
   126  	}
   127  	if o.FilePerm == 0 {
   128  		o.FilePerm = defaultFilePerm
   129  	}
   130  
   131  	d := &Diskv{
   132  		Options:   o,
   133  		cache:     map[string][]byte{},
   134  		cacheSize: 0,
   135  	}
   136  
   137  	if d.Index != nil && d.IndexLess != nil {
   138  		d.Index.Initialize(d.IndexLess, d.Keys(nil))
   139  	}
   140  
   141  	return d
   142  }
   143  
   144  // convertToAdvancedTransform takes a classic Transform function and
   145  // converts it to the new AdvancedTransform
   146  func convertToAdvancedTransform(oldFunc func(s string) []string) AdvancedTransformFunction {
   147  	return func(s string) *PathKey {
   148  		return &PathKey{Path: oldFunc(s), FileName: s}
   149  	}
   150  }
   151  
   152  // Write synchronously writes the key-value pair to disk, making it immediately
   153  // available for reads. Write relies on the filesystem to perform an eventual
   154  // sync to physical media. If you need stronger guarantees, see WriteStream.
   155  func (d *Diskv) Write(key string, val []byte) error {
   156  	return d.WriteStream(key, bytes.NewReader(val), false)
   157  }
   158  
   159  // WriteString writes a string key-value pair to disk
   160  func (d *Diskv) WriteString(key string, val string) error {
   161  	return d.Write(key, []byte(val))
   162  }
   163  
   164  func (d *Diskv) transform(key string) (pathKey *PathKey) {
   165  	pathKey = d.AdvancedTransform(key)
   166  	pathKey.originalKey = key
   167  	return pathKey
   168  }
   169  
   170  // WriteStream writes the data represented by the io.Reader to the disk, under
   171  // the provided key. If sync is true, WriteStream performs an explicit sync on
   172  // the file as soon as it's written.
   173  //
   174  // bytes.Buffer provides io.Reader semantics for basic data types.
   175  func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error {
   176  	if len(key) <= 0 {
   177  		return errEmptyKey
   178  	}
   179  
   180  	pathKey := d.transform(key)
   181  
   182  	// Ensure keys cannot evaluate to paths that would not exist
   183  	for _, pathPart := range pathKey.Path {
   184  		if strings.ContainsRune(pathPart, os.PathSeparator) {
   185  			return errBadKey
   186  		}
   187  	}
   188  
   189  	if strings.ContainsRune(pathKey.FileName, os.PathSeparator) {
   190  		return errBadKey
   191  	}
   192  
   193  	d.mu.Lock()
   194  	defer d.mu.Unlock()
   195  
   196  	return d.writeStreamWithLock(pathKey, r, sync)
   197  }
   198  
   199  // createKeyFileWithLock either creates the key file directly, or
   200  // creates a temporary file in TempDir if it is set.
   201  func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) {
   202  	if d.TempDir != "" {
   203  		if err := os.MkdirAll(d.TempDir, d.PathPerm); err != nil {
   204  			return nil, fmt.Errorf("temp mkdir: %s", err)
   205  		}
   206  		f, err := ioutil.TempFile(d.TempDir, "")
   207  		if err != nil {
   208  			return nil, fmt.Errorf("temp file: %s", err)
   209  		}
   210  
   211  		if err := os.Chmod(f.Name(), d.FilePerm); err != nil {
   212  			f.Close()           // error deliberately ignored
   213  			os.Remove(f.Name()) // error deliberately ignored
   214  			return nil, fmt.Errorf("chmod: %s", err)
   215  		}
   216  		return f, nil
   217  	}
   218  
   219  	mode := os.O_WRONLY | os.O_CREATE | os.O_TRUNC // overwrite if exists
   220  	f, err := os.OpenFile(d.completeFilename(pathKey), mode, d.FilePerm)
   221  	if err != nil {
   222  		return nil, fmt.Errorf("open file: %s", err)
   223  	}
   224  	return f, nil
   225  }
   226  
   227  // writeStream does no input validation checking.
   228  func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) error {
   229  	if err := d.ensurePathWithLock(pathKey); err != nil {
   230  		return fmt.Errorf("ensure path: %s", err)
   231  	}
   232  
   233  	f, err := d.createKeyFileWithLock(pathKey)
   234  	if err != nil {
   235  		return fmt.Errorf("create key file: %s", err)
   236  	}
   237  
   238  	wc := io.WriteCloser(&nopWriteCloser{f})
   239  	if d.Compression != nil {
   240  		wc, err = d.Compression.Writer(f)
   241  		if err != nil {
   242  			f.Close()           // error deliberately ignored
   243  			os.Remove(f.Name()) // error deliberately ignored
   244  			return fmt.Errorf("compression writer: %s", err)
   245  		}
   246  	}
   247  
   248  	if _, err := io.Copy(wc, r); err != nil {
   249  		f.Close()           // error deliberately ignored
   250  		os.Remove(f.Name()) // error deliberately ignored
   251  		return fmt.Errorf("i/o copy: %s", err)
   252  	}
   253  
   254  	if err := wc.Close(); err != nil {
   255  		f.Close()           // error deliberately ignored
   256  		os.Remove(f.Name()) // error deliberately ignored
   257  		return fmt.Errorf("compression close: %s", err)
   258  	}
   259  
   260  	if sync {
   261  		if err := f.Sync(); err != nil {
   262  			f.Close()           // error deliberately ignored
   263  			os.Remove(f.Name()) // error deliberately ignored
   264  			return fmt.Errorf("file sync: %s", err)
   265  		}
   266  	}
   267  
   268  	if err := f.Close(); err != nil {
   269  		return fmt.Errorf("file close: %s", err)
   270  	}
   271  
   272  	fullPath := d.completeFilename(pathKey)
   273  	if f.Name() != fullPath {
   274  		if err := os.Rename(f.Name(), fullPath); err != nil {
   275  			os.Remove(f.Name()) // error deliberately ignored
   276  			return fmt.Errorf("rename: %s", err)
   277  		}
   278  	}
   279  
   280  	if d.Index != nil {
   281  		d.Index.Insert(pathKey.originalKey)
   282  	}
   283  
   284  	d.bustCacheWithLock(pathKey.originalKey) // cache only on read
   285  
   286  	return nil
   287  }
   288  
   289  // Import imports the source file into diskv under the destination key. If the
   290  // destination key already exists, it's overwritten. If move is true, the
   291  // source file is removed after a successful import.
   292  func (d *Diskv) Import(srcFilename, dstKey string, move bool) (err error) {
   293  	if dstKey == "" {
   294  		return errEmptyKey
   295  	}
   296  
   297  	if fi, err := os.Stat(srcFilename); err != nil {
   298  		return err
   299  	} else if fi.IsDir() {
   300  		return errImportDirectory
   301  	}
   302  
   303  	dstPathKey := d.transform(dstKey)
   304  
   305  	d.mu.Lock()
   306  	defer d.mu.Unlock()
   307  
   308  	if err := d.ensurePathWithLock(dstPathKey); err != nil {
   309  		return fmt.Errorf("ensure path: %s", err)
   310  	}
   311  
   312  	if move {
   313  		if err := syscall.Rename(srcFilename, d.completeFilename(dstPathKey)); err == nil {
   314  			d.bustCacheWithLock(dstPathKey.originalKey)
   315  			return nil
   316  		} else if err != syscall.EXDEV {
   317  			// If it failed due to being on a different device, fall back to copying
   318  			return err
   319  		}
   320  	}
   321  
   322  	f, err := os.Open(srcFilename)
   323  	if err != nil {
   324  		return err
   325  	}
   326  	defer f.Close()
   327  	err = d.writeStreamWithLock(dstPathKey, f, false)
   328  	if err == nil && move {
   329  		err = os.Remove(srcFilename)
   330  	}
   331  	return err
   332  }
   333  
   334  // Read reads the key and returns the value.
   335  // If the key is available in the cache, Read won't touch the disk.
   336  // If the key is not in the cache, Read will have the side-effect of
   337  // lazily caching the value.
   338  func (d *Diskv) Read(key string) ([]byte, error) {
   339  	rc, err := d.ReadStream(key, false)
   340  	if err != nil {
   341  		return []byte{}, err
   342  	}
   343  	defer rc.Close()
   344  	return ioutil.ReadAll(rc)
   345  }
   346  
   347  // ReadString reads the key and returns a string value
   348  // In case of error, an empty string is returned
   349  func (d *Diskv) ReadString(key string) string {
   350  	value, _ := d.Read(key)
   351  	return string(value)
   352  }
   353  
   354  // ReadStream reads the key and returns the value (data) as an io.ReadCloser.
   355  // If the value is cached from a previous read, and direct is false,
   356  // ReadStream will use the cached value. Otherwise, it will return a handle to
   357  // the file on disk, and cache the data on read.
   358  //
   359  // If direct is true, ReadStream will lazily delete any cached value for the
   360  // key, and return a direct handle to the file on disk.
   361  //
   362  // If compression is enabled, ReadStream taps into the io.Reader stream prior
   363  // to decompression, and caches the compressed data.
   364  func (d *Diskv) ReadStream(key string, direct bool) (io.ReadCloser, error) {
   365  
   366  	pathKey := d.transform(key)
   367  	d.mu.RLock()
   368  	defer d.mu.RUnlock()
   369  
   370  	if val, ok := d.cache[key]; ok {
   371  		if !direct {
   372  			buf := bytes.NewReader(val)
   373  			if d.Compression != nil {
   374  				return d.Compression.Reader(buf)
   375  			}
   376  			return ioutil.NopCloser(buf), nil
   377  		}
   378  
   379  		go func() {
   380  			d.mu.Lock()
   381  			defer d.mu.Unlock()
   382  			d.uncacheWithLock(key, uint64(len(val)))
   383  		}()
   384  	}
   385  
   386  	return d.readWithRLock(pathKey)
   387  }
   388  
   389  // read ignores the cache, and returns an io.ReadCloser representing the
   390  // decompressed data for the given key, streamed from the disk. Clients should
   391  // acquire a read lock on the Diskv and check the cache themselves before
   392  // calling read.
   393  func (d *Diskv) readWithRLock(pathKey *PathKey) (io.ReadCloser, error) {
   394  	filename := d.completeFilename(pathKey)
   395  
   396  	fi, err := os.Stat(filename)
   397  	if err != nil {
   398  		return nil, err
   399  	}
   400  	if fi.IsDir() {
   401  		return nil, os.ErrNotExist
   402  	}
   403  
   404  	f, err := os.Open(filename)
   405  	if err != nil {
   406  		return nil, err
   407  	}
   408  
   409  	var r io.Reader
   410  	if d.CacheSizeMax > 0 {
   411  		r = newSiphon(f, d, pathKey.originalKey)
   412  	} else {
   413  		r = &closingReader{f}
   414  	}
   415  
   416  	var rc = io.ReadCloser(ioutil.NopCloser(r))
   417  	if d.Compression != nil {
   418  		rc, err = d.Compression.Reader(r)
   419  		if err != nil {
   420  			return nil, err
   421  		}
   422  	}
   423  
   424  	return rc, nil
   425  }
   426  
   427  // closingReader provides a Reader that automatically closes the
   428  // embedded ReadCloser when it reaches EOF
   429  type closingReader struct {
   430  	rc io.ReadCloser
   431  }
   432  
   433  func (cr closingReader) Read(p []byte) (int, error) {
   434  	n, err := cr.rc.Read(p)
   435  	if err == io.EOF {
   436  		if closeErr := cr.rc.Close(); closeErr != nil {
   437  			return n, closeErr // close must succeed for Read to succeed
   438  		}
   439  	}
   440  	return n, err
   441  }
   442  
   443  // siphon is like a TeeReader: it copies all data read through it to an
   444  // internal buffer, and moves that buffer to the cache at EOF.
   445  type siphon struct {
   446  	f   *os.File
   447  	d   *Diskv
   448  	key string
   449  	buf *bytes.Buffer
   450  }
   451  
   452  // newSiphon constructs a siphoning reader that represents the passed file.
   453  // When a successful series of reads ends in an EOF, the siphon will write
   454  // the buffered data to Diskv's cache under the given key.
   455  func newSiphon(f *os.File, d *Diskv, key string) io.Reader {
   456  	return &siphon{
   457  		f:   f,
   458  		d:   d,
   459  		key: key,
   460  		buf: &bytes.Buffer{},
   461  	}
   462  }
   463  
   464  // Read implements the io.Reader interface for siphon.
   465  func (s *siphon) Read(p []byte) (int, error) {
   466  	n, err := s.f.Read(p)
   467  
   468  	if err == nil {
   469  		return s.buf.Write(p[0:n]) // Write must succeed for Read to succeed
   470  	}
   471  
   472  	if err == io.EOF {
   473  		s.d.cacheWithoutLock(s.key, s.buf.Bytes()) // cache may fail
   474  		if closeErr := s.f.Close(); closeErr != nil {
   475  			return n, closeErr // close must succeed for Read to succeed
   476  		}
   477  		return n, err
   478  	}
   479  
   480  	return n, err
   481  }
   482  
   483  // Erase synchronously erases the given key from the disk and the cache.
   484  func (d *Diskv) Erase(key string) error {
   485  	pathKey := d.transform(key)
   486  	d.mu.Lock()
   487  	defer d.mu.Unlock()
   488  
   489  	d.bustCacheWithLock(key)
   490  
   491  	// erase from index
   492  	if d.Index != nil {
   493  		d.Index.Delete(key)
   494  	}
   495  
   496  	// erase from disk
   497  	filename := d.completeFilename(pathKey)
   498  	if s, err := os.Stat(filename); err == nil {
   499  		if s.IsDir() {
   500  			return errBadKey
   501  		}
   502  		if err = os.Remove(filename); err != nil {
   503  			return err
   504  		}
   505  	} else {
   506  		// Return err as-is so caller can do os.IsNotExist(err).
   507  		return err
   508  	}
   509  
   510  	// clean up and return
   511  	d.pruneDirsWithLock(key)
   512  	return nil
   513  }
   514  
   515  // EraseAll will delete all of the data from the store, both in the cache and on
   516  // the disk. Note that EraseAll doesn't distinguish diskv-related data from non-
   517  // diskv-related data. Care should be taken to always specify a diskv base
   518  // directory that is exclusively for diskv data.
   519  func (d *Diskv) EraseAll() error {
   520  	d.mu.Lock()
   521  	defer d.mu.Unlock()
   522  	d.cache = make(map[string][]byte)
   523  	d.cacheSize = 0
   524  	if d.TempDir != "" {
   525  		os.RemoveAll(d.TempDir) // errors ignored
   526  	}
   527  	return os.RemoveAll(d.BasePath)
   528  }
   529  
   530  // Has returns true if the given key exists.
   531  func (d *Diskv) Has(key string) bool {
   532  	pathKey := d.transform(key)
   533  	d.mu.Lock()
   534  	defer d.mu.Unlock()
   535  
   536  	if _, ok := d.cache[key]; ok {
   537  		return true
   538  	}
   539  
   540  	filename := d.completeFilename(pathKey)
   541  	s, err := os.Stat(filename)
   542  	if err != nil {
   543  		return false
   544  	}
   545  	if s.IsDir() {
   546  		return false
   547  	}
   548  
   549  	return true
   550  }
   551  
   552  // Keys returns a channel that will yield every key accessible by the store,
   553  // in undefined order. If a cancel channel is provided, closing it will
   554  // terminate and close the keys channel.
   555  func (d *Diskv) Keys(cancel <-chan struct{}) <-chan string {
   556  	return d.KeysPrefix("", cancel)
   557  }
   558  
   559  // KeysPrefix returns a channel that will yield every key accessible by the
   560  // store with the given prefix, in undefined order. If a cancel channel is
   561  // provided, closing it will terminate and close the keys channel. If the
   562  // provided prefix is the empty string, all keys will be yielded.
   563  func (d *Diskv) KeysPrefix(prefix string, cancel <-chan struct{}) <-chan string {
   564  	var prepath string
   565  	if prefix == "" {
   566  		prepath = d.BasePath
   567  	} else {
   568  		prefixKey := d.transform(prefix)
   569  		prepath = d.pathFor(prefixKey)
   570  	}
   571  	c := make(chan string)
   572  	go func() {
   573  		filepath.Walk(prepath, d.walker(c, prefix, cancel))
   574  		close(c)
   575  	}()
   576  	return c
   577  }
   578  
   579  // walker returns a function which satisfies the filepath.WalkFunc interface.
   580  // It sends every non-directory file entry down the channel c.
   581  func (d *Diskv) walker(c chan<- string, prefix string, cancel <-chan struct{}) filepath.WalkFunc {
   582  	return func(path string, info os.FileInfo, err error) error {
   583  		if err != nil {
   584  			return err
   585  		}
   586  
   587  		relPath, _ := filepath.Rel(d.BasePath, path)
   588  		dir, file := filepath.Split(relPath)
   589  		pathSplit := strings.Split(dir, string(filepath.Separator))
   590  		pathSplit = pathSplit[:len(pathSplit)-1]
   591  
   592  		pathKey := &PathKey{
   593  			Path:     pathSplit,
   594  			FileName: file,
   595  		}
   596  
   597  		key := d.InverseTransform(pathKey)
   598  
   599  		if info.IsDir() || !strings.HasPrefix(key, prefix) {
   600  			return nil // "pass"
   601  		}
   602  
   603  		select {
   604  		case c <- key:
   605  		case <-cancel:
   606  			return errCanceled
   607  		}
   608  
   609  		return nil
   610  	}
   611  }
   612  
   613  // pathFor returns the absolute path for location on the filesystem where the
   614  // data for the given key will be stored.
   615  func (d *Diskv) pathFor(pathKey *PathKey) string {
   616  	return filepath.Join(d.BasePath, filepath.Join(pathKey.Path...))
   617  }
   618  
   619  // ensurePathWithLock is a helper function that generates all necessary
   620  // directories on the filesystem for the given key.
   621  func (d *Diskv) ensurePathWithLock(pathKey *PathKey) error {
   622  	return os.MkdirAll(d.pathFor(pathKey), d.PathPerm)
   623  }
   624  
   625  // completeFilename returns the absolute path to the file for the given key.
   626  func (d *Diskv) completeFilename(pathKey *PathKey) string {
   627  	return filepath.Join(d.pathFor(pathKey), pathKey.FileName)
   628  }
   629  
   630  // cacheWithLock attempts to cache the given key-value pair in the store's
   631  // cache. It can fail if the value is larger than the cache's maximum size.
   632  func (d *Diskv) cacheWithLock(key string, val []byte) error {
   633  	// If the key already exists, delete it.
   634  	d.bustCacheWithLock(key)
   635  
   636  	valueSize := uint64(len(val))
   637  	if err := d.ensureCacheSpaceWithLock(valueSize); err != nil {
   638  		return fmt.Errorf("%s; not caching", err)
   639  	}
   640  
   641  	// be very strict about memory guarantees
   642  	if (d.cacheSize + valueSize) > d.CacheSizeMax {
   643  		panic(fmt.Sprintf("failed to make room for value (%d/%d)", valueSize, d.CacheSizeMax))
   644  	}
   645  
   646  	d.cache[key] = val
   647  	d.cacheSize += valueSize
   648  	return nil
   649  }
   650  
   651  // cacheWithoutLock acquires the store's (write) mutex and calls cacheWithLock.
   652  func (d *Diskv) cacheWithoutLock(key string, val []byte) error {
   653  	d.mu.Lock()
   654  	defer d.mu.Unlock()
   655  	return d.cacheWithLock(key, val)
   656  }
   657  
   658  func (d *Diskv) bustCacheWithLock(key string) {
   659  	if val, ok := d.cache[key]; ok {
   660  		d.uncacheWithLock(key, uint64(len(val)))
   661  	}
   662  }
   663  
   664  func (d *Diskv) uncacheWithLock(key string, sz uint64) {
   665  	d.cacheSize -= sz
   666  	delete(d.cache, key)
   667  }
   668  
   669  // pruneDirsWithLock deletes empty directories in the path walk leading to the
   670  // key k. Typically this function is called after an Erase is made.
   671  func (d *Diskv) pruneDirsWithLock(key string) error {
   672  	pathlist := d.transform(key).Path
   673  	for i := range pathlist {
   674  		dir := filepath.Join(d.BasePath, filepath.Join(pathlist[:len(pathlist)-i]...))
   675  
   676  		// thanks to Steven Blenkinsop for this snippet
   677  		switch fi, err := os.Stat(dir); true {
   678  		case err != nil:
   679  			return err
   680  		case !fi.IsDir():
   681  			panic(fmt.Sprintf("corrupt dirstate at %s", dir))
   682  		}
   683  
   684  		nlinks, err := filepath.Glob(filepath.Join(dir, "*"))
   685  		if err != nil {
   686  			return err
   687  		} else if len(nlinks) > 0 {
   688  			return nil // has subdirs -- do not prune
   689  		}
   690  		if err = os.Remove(dir); err != nil {
   691  			return err
   692  		}
   693  	}
   694  
   695  	return nil
   696  }
   697  
   698  // ensureCacheSpaceWithLock deletes entries from the cache in arbitrary order
   699  // until the cache has at least valueSize bytes available.
   700  func (d *Diskv) ensureCacheSpaceWithLock(valueSize uint64) error {
   701  	if valueSize > d.CacheSizeMax {
   702  		return fmt.Errorf("value size (%d bytes) too large for cache (%d bytes)", valueSize, d.CacheSizeMax)
   703  	}
   704  
   705  	safe := func() bool { return (d.cacheSize + valueSize) <= d.CacheSizeMax }
   706  
   707  	for key, val := range d.cache {
   708  		if safe() {
   709  			break
   710  		}
   711  
   712  		d.uncacheWithLock(key, uint64(len(val)))
   713  	}
   714  
   715  	if !safe() {
   716  		panic(fmt.Sprintf("%d bytes still won't fit in the cache! (max %d bytes)", valueSize, d.CacheSizeMax))
   717  	}
   718  
   719  	return nil
   720  }
   721  
   722  // nopWriteCloser wraps an io.Writer and provides a no-op Close method to
   723  // satisfy the io.WriteCloser interface.
   724  type nopWriteCloser struct {
   725  	io.Writer
   726  }
   727  
   728  func (wc *nopWriteCloser) Write(p []byte) (int, error) { return wc.Writer.Write(p) }
   729  func (wc *nopWriteCloser) Close() error                { return nil }
   730  

View as plain text