1
2
3
4 package diskv
5
6 import (
7 "bytes"
8 "errors"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "os"
13 "path/filepath"
14 "strings"
15 "sync"
16 "syscall"
17 )
18
19 const (
20 defaultBasePath = "diskv"
21 defaultFilePerm os.FileMode = 0666
22 defaultPathPerm os.FileMode = 0777
23 )
24
25
26
27
28 type PathKey struct {
29 Path []string
30 FileName string
31 originalKey string
32 }
33
34 var (
35 defaultAdvancedTransform = func(s string) *PathKey { return &PathKey{Path: []string{}, FileName: s} }
36 defaultInverseTransform = func(pathKey *PathKey) string { return pathKey.FileName }
37 errCanceled = errors.New("canceled")
38 errEmptyKey = errors.New("empty key")
39 errBadKey = errors.New("bad key")
40 errImportDirectory = errors.New("can't import a directory")
41 )
42
43
44
45
46
47
48
49 type TransformFunction func(s string) []string
50
51
52
53
54
55
56
57
58
59
60
61
62
63 type AdvancedTransformFunction func(s string) *PathKey
64
65
66
67 type InverseTransformFunction func(pathKey *PathKey) string
68
69
70
71 type Options struct {
72 BasePath string
73 Transform TransformFunction
74 AdvancedTransform AdvancedTransformFunction
75 InverseTransform InverseTransformFunction
76 CacheSizeMax uint64
77 PathPerm os.FileMode
78 FilePerm os.FileMode
79
80
81
82
83
84 TempDir string
85
86 Index Index
87 IndexLess LessFunction
88
89 Compression Compression
90 }
91
92
93
94 type Diskv struct {
95 Options
96 mu sync.RWMutex
97 cache map[string][]byte
98 cacheSize uint64
99 }
100
101
102
103
104 func New(o Options) *Diskv {
105 if o.BasePath == "" {
106 o.BasePath = defaultBasePath
107 }
108
109 if o.AdvancedTransform == nil {
110 if o.Transform == nil {
111 o.AdvancedTransform = defaultAdvancedTransform
112 } else {
113 o.AdvancedTransform = convertToAdvancedTransform(o.Transform)
114 }
115 if o.InverseTransform == nil {
116 o.InverseTransform = defaultInverseTransform
117 }
118 } else {
119 if o.InverseTransform == nil {
120 panic("You must provide an InverseTransform function in advanced mode")
121 }
122 }
123
124 if o.PathPerm == 0 {
125 o.PathPerm = defaultPathPerm
126 }
127 if o.FilePerm == 0 {
128 o.FilePerm = defaultFilePerm
129 }
130
131 d := &Diskv{
132 Options: o,
133 cache: map[string][]byte{},
134 cacheSize: 0,
135 }
136
137 if d.Index != nil && d.IndexLess != nil {
138 d.Index.Initialize(d.IndexLess, d.Keys(nil))
139 }
140
141 return d
142 }
143
144
145
146 func convertToAdvancedTransform(oldFunc func(s string) []string) AdvancedTransformFunction {
147 return func(s string) *PathKey {
148 return &PathKey{Path: oldFunc(s), FileName: s}
149 }
150 }
151
152
153
154
155 func (d *Diskv) Write(key string, val []byte) error {
156 return d.WriteStream(key, bytes.NewReader(val), false)
157 }
158
159
160 func (d *Diskv) WriteString(key string, val string) error {
161 return d.Write(key, []byte(val))
162 }
163
164 func (d *Diskv) transform(key string) (pathKey *PathKey) {
165 pathKey = d.AdvancedTransform(key)
166 pathKey.originalKey = key
167 return pathKey
168 }
169
170
171
172
173
174
175 func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error {
176 if len(key) <= 0 {
177 return errEmptyKey
178 }
179
180 pathKey := d.transform(key)
181
182
183 for _, pathPart := range pathKey.Path {
184 if strings.ContainsRune(pathPart, os.PathSeparator) {
185 return errBadKey
186 }
187 }
188
189 if strings.ContainsRune(pathKey.FileName, os.PathSeparator) {
190 return errBadKey
191 }
192
193 d.mu.Lock()
194 defer d.mu.Unlock()
195
196 return d.writeStreamWithLock(pathKey, r, sync)
197 }
198
199
200
201 func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) {
202 if d.TempDir != "" {
203 if err := os.MkdirAll(d.TempDir, d.PathPerm); err != nil {
204 return nil, fmt.Errorf("temp mkdir: %s", err)
205 }
206 f, err := ioutil.TempFile(d.TempDir, "")
207 if err != nil {
208 return nil, fmt.Errorf("temp file: %s", err)
209 }
210
211 if err := os.Chmod(f.Name(), d.FilePerm); err != nil {
212 f.Close()
213 os.Remove(f.Name())
214 return nil, fmt.Errorf("chmod: %s", err)
215 }
216 return f, nil
217 }
218
219 mode := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
220 f, err := os.OpenFile(d.completeFilename(pathKey), mode, d.FilePerm)
221 if err != nil {
222 return nil, fmt.Errorf("open file: %s", err)
223 }
224 return f, nil
225 }
226
227
228 func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) error {
229 if err := d.ensurePathWithLock(pathKey); err != nil {
230 return fmt.Errorf("ensure path: %s", err)
231 }
232
233 f, err := d.createKeyFileWithLock(pathKey)
234 if err != nil {
235 return fmt.Errorf("create key file: %s", err)
236 }
237
238 wc := io.WriteCloser(&nopWriteCloser{f})
239 if d.Compression != nil {
240 wc, err = d.Compression.Writer(f)
241 if err != nil {
242 f.Close()
243 os.Remove(f.Name())
244 return fmt.Errorf("compression writer: %s", err)
245 }
246 }
247
248 if _, err := io.Copy(wc, r); err != nil {
249 f.Close()
250 os.Remove(f.Name())
251 return fmt.Errorf("i/o copy: %s", err)
252 }
253
254 if err := wc.Close(); err != nil {
255 f.Close()
256 os.Remove(f.Name())
257 return fmt.Errorf("compression close: %s", err)
258 }
259
260 if sync {
261 if err := f.Sync(); err != nil {
262 f.Close()
263 os.Remove(f.Name())
264 return fmt.Errorf("file sync: %s", err)
265 }
266 }
267
268 if err := f.Close(); err != nil {
269 return fmt.Errorf("file close: %s", err)
270 }
271
272 fullPath := d.completeFilename(pathKey)
273 if f.Name() != fullPath {
274 if err := os.Rename(f.Name(), fullPath); err != nil {
275 os.Remove(f.Name())
276 return fmt.Errorf("rename: %s", err)
277 }
278 }
279
280 if d.Index != nil {
281 d.Index.Insert(pathKey.originalKey)
282 }
283
284 d.bustCacheWithLock(pathKey.originalKey)
285
286 return nil
287 }
288
289
290
291
292 func (d *Diskv) Import(srcFilename, dstKey string, move bool) (err error) {
293 if dstKey == "" {
294 return errEmptyKey
295 }
296
297 if fi, err := os.Stat(srcFilename); err != nil {
298 return err
299 } else if fi.IsDir() {
300 return errImportDirectory
301 }
302
303 dstPathKey := d.transform(dstKey)
304
305 d.mu.Lock()
306 defer d.mu.Unlock()
307
308 if err := d.ensurePathWithLock(dstPathKey); err != nil {
309 return fmt.Errorf("ensure path: %s", err)
310 }
311
312 if move {
313 if err := syscall.Rename(srcFilename, d.completeFilename(dstPathKey)); err == nil {
314 d.bustCacheWithLock(dstPathKey.originalKey)
315 return nil
316 } else if err != syscall.EXDEV {
317
318 return err
319 }
320 }
321
322 f, err := os.Open(srcFilename)
323 if err != nil {
324 return err
325 }
326 defer f.Close()
327 err = d.writeStreamWithLock(dstPathKey, f, false)
328 if err == nil && move {
329 err = os.Remove(srcFilename)
330 }
331 return err
332 }
333
334
335
336
337
338 func (d *Diskv) Read(key string) ([]byte, error) {
339 rc, err := d.ReadStream(key, false)
340 if err != nil {
341 return []byte{}, err
342 }
343 defer rc.Close()
344 return ioutil.ReadAll(rc)
345 }
346
347
348
349 func (d *Diskv) ReadString(key string) string {
350 value, _ := d.Read(key)
351 return string(value)
352 }
353
354
355
356
357
358
359
360
361
362
363
364 func (d *Diskv) ReadStream(key string, direct bool) (io.ReadCloser, error) {
365
366 pathKey := d.transform(key)
367 d.mu.RLock()
368 defer d.mu.RUnlock()
369
370 if val, ok := d.cache[key]; ok {
371 if !direct {
372 buf := bytes.NewReader(val)
373 if d.Compression != nil {
374 return d.Compression.Reader(buf)
375 }
376 return ioutil.NopCloser(buf), nil
377 }
378
379 go func() {
380 d.mu.Lock()
381 defer d.mu.Unlock()
382 d.uncacheWithLock(key, uint64(len(val)))
383 }()
384 }
385
386 return d.readWithRLock(pathKey)
387 }
388
389
390
391
392
393 func (d *Diskv) readWithRLock(pathKey *PathKey) (io.ReadCloser, error) {
394 filename := d.completeFilename(pathKey)
395
396 fi, err := os.Stat(filename)
397 if err != nil {
398 return nil, err
399 }
400 if fi.IsDir() {
401 return nil, os.ErrNotExist
402 }
403
404 f, err := os.Open(filename)
405 if err != nil {
406 return nil, err
407 }
408
409 var r io.Reader
410 if d.CacheSizeMax > 0 {
411 r = newSiphon(f, d, pathKey.originalKey)
412 } else {
413 r = &closingReader{f}
414 }
415
416 var rc = io.ReadCloser(ioutil.NopCloser(r))
417 if d.Compression != nil {
418 rc, err = d.Compression.Reader(r)
419 if err != nil {
420 return nil, err
421 }
422 }
423
424 return rc, nil
425 }
426
427
428
429 type closingReader struct {
430 rc io.ReadCloser
431 }
432
433 func (cr closingReader) Read(p []byte) (int, error) {
434 n, err := cr.rc.Read(p)
435 if err == io.EOF {
436 if closeErr := cr.rc.Close(); closeErr != nil {
437 return n, closeErr
438 }
439 }
440 return n, err
441 }
442
443
444
445 type siphon struct {
446 f *os.File
447 d *Diskv
448 key string
449 buf *bytes.Buffer
450 }
451
452
453
454
455 func newSiphon(f *os.File, d *Diskv, key string) io.Reader {
456 return &siphon{
457 f: f,
458 d: d,
459 key: key,
460 buf: &bytes.Buffer{},
461 }
462 }
463
464
465 func (s *siphon) Read(p []byte) (int, error) {
466 n, err := s.f.Read(p)
467
468 if err == nil {
469 return s.buf.Write(p[0:n])
470 }
471
472 if err == io.EOF {
473 s.d.cacheWithoutLock(s.key, s.buf.Bytes())
474 if closeErr := s.f.Close(); closeErr != nil {
475 return n, closeErr
476 }
477 return n, err
478 }
479
480 return n, err
481 }
482
483
484 func (d *Diskv) Erase(key string) error {
485 pathKey := d.transform(key)
486 d.mu.Lock()
487 defer d.mu.Unlock()
488
489 d.bustCacheWithLock(key)
490
491
492 if d.Index != nil {
493 d.Index.Delete(key)
494 }
495
496
497 filename := d.completeFilename(pathKey)
498 if s, err := os.Stat(filename); err == nil {
499 if s.IsDir() {
500 return errBadKey
501 }
502 if err = os.Remove(filename); err != nil {
503 return err
504 }
505 } else {
506
507 return err
508 }
509
510
511 d.pruneDirsWithLock(key)
512 return nil
513 }
514
515
516
517
518
519 func (d *Diskv) EraseAll() error {
520 d.mu.Lock()
521 defer d.mu.Unlock()
522 d.cache = make(map[string][]byte)
523 d.cacheSize = 0
524 if d.TempDir != "" {
525 os.RemoveAll(d.TempDir)
526 }
527 return os.RemoveAll(d.BasePath)
528 }
529
530
531 func (d *Diskv) Has(key string) bool {
532 pathKey := d.transform(key)
533 d.mu.Lock()
534 defer d.mu.Unlock()
535
536 if _, ok := d.cache[key]; ok {
537 return true
538 }
539
540 filename := d.completeFilename(pathKey)
541 s, err := os.Stat(filename)
542 if err != nil {
543 return false
544 }
545 if s.IsDir() {
546 return false
547 }
548
549 return true
550 }
551
552
553
554
555 func (d *Diskv) Keys(cancel <-chan struct{}) <-chan string {
556 return d.KeysPrefix("", cancel)
557 }
558
559
560
561
562
563 func (d *Diskv) KeysPrefix(prefix string, cancel <-chan struct{}) <-chan string {
564 var prepath string
565 if prefix == "" {
566 prepath = d.BasePath
567 } else {
568 prefixKey := d.transform(prefix)
569 prepath = d.pathFor(prefixKey)
570 }
571 c := make(chan string)
572 go func() {
573 filepath.Walk(prepath, d.walker(c, prefix, cancel))
574 close(c)
575 }()
576 return c
577 }
578
579
580
581 func (d *Diskv) walker(c chan<- string, prefix string, cancel <-chan struct{}) filepath.WalkFunc {
582 return func(path string, info os.FileInfo, err error) error {
583 if err != nil {
584 return err
585 }
586
587 relPath, _ := filepath.Rel(d.BasePath, path)
588 dir, file := filepath.Split(relPath)
589 pathSplit := strings.Split(dir, string(filepath.Separator))
590 pathSplit = pathSplit[:len(pathSplit)-1]
591
592 pathKey := &PathKey{
593 Path: pathSplit,
594 FileName: file,
595 }
596
597 key := d.InverseTransform(pathKey)
598
599 if info.IsDir() || !strings.HasPrefix(key, prefix) {
600 return nil
601 }
602
603 select {
604 case c <- key:
605 case <-cancel:
606 return errCanceled
607 }
608
609 return nil
610 }
611 }
612
613
614
615 func (d *Diskv) pathFor(pathKey *PathKey) string {
616 return filepath.Join(d.BasePath, filepath.Join(pathKey.Path...))
617 }
618
619
620
621 func (d *Diskv) ensurePathWithLock(pathKey *PathKey) error {
622 return os.MkdirAll(d.pathFor(pathKey), d.PathPerm)
623 }
624
625
626 func (d *Diskv) completeFilename(pathKey *PathKey) string {
627 return filepath.Join(d.pathFor(pathKey), pathKey.FileName)
628 }
629
630
631
632 func (d *Diskv) cacheWithLock(key string, val []byte) error {
633
634 d.bustCacheWithLock(key)
635
636 valueSize := uint64(len(val))
637 if err := d.ensureCacheSpaceWithLock(valueSize); err != nil {
638 return fmt.Errorf("%s; not caching", err)
639 }
640
641
642 if (d.cacheSize + valueSize) > d.CacheSizeMax {
643 panic(fmt.Sprintf("failed to make room for value (%d/%d)", valueSize, d.CacheSizeMax))
644 }
645
646 d.cache[key] = val
647 d.cacheSize += valueSize
648 return nil
649 }
650
651
652 func (d *Diskv) cacheWithoutLock(key string, val []byte) error {
653 d.mu.Lock()
654 defer d.mu.Unlock()
655 return d.cacheWithLock(key, val)
656 }
657
658 func (d *Diskv) bustCacheWithLock(key string) {
659 if val, ok := d.cache[key]; ok {
660 d.uncacheWithLock(key, uint64(len(val)))
661 }
662 }
663
664 func (d *Diskv) uncacheWithLock(key string, sz uint64) {
665 d.cacheSize -= sz
666 delete(d.cache, key)
667 }
668
669
670
671 func (d *Diskv) pruneDirsWithLock(key string) error {
672 pathlist := d.transform(key).Path
673 for i := range pathlist {
674 dir := filepath.Join(d.BasePath, filepath.Join(pathlist[:len(pathlist)-i]...))
675
676
677 switch fi, err := os.Stat(dir); true {
678 case err != nil:
679 return err
680 case !fi.IsDir():
681 panic(fmt.Sprintf("corrupt dirstate at %s", dir))
682 }
683
684 nlinks, err := filepath.Glob(filepath.Join(dir, "*"))
685 if err != nil {
686 return err
687 } else if len(nlinks) > 0 {
688 return nil
689 }
690 if err = os.Remove(dir); err != nil {
691 return err
692 }
693 }
694
695 return nil
696 }
697
698
699
700 func (d *Diskv) ensureCacheSpaceWithLock(valueSize uint64) error {
701 if valueSize > d.CacheSizeMax {
702 return fmt.Errorf("value size (%d bytes) too large for cache (%d bytes)", valueSize, d.CacheSizeMax)
703 }
704
705 safe := func() bool { return (d.cacheSize + valueSize) <= d.CacheSizeMax }
706
707 for key, val := range d.cache {
708 if safe() {
709 break
710 }
711
712 d.uncacheWithLock(key, uint64(len(val)))
713 }
714
715 if !safe() {
716 panic(fmt.Sprintf("%d bytes still won't fit in the cache! (max %d bytes)", valueSize, d.CacheSizeMax))
717 }
718
719 return nil
720 }
721
722
723
724 type nopWriteCloser struct {
725 io.Writer
726 }
727
728 func (wc *nopWriteCloser) Write(p []byte) (int, error) { return wc.Writer.Write(p) }
729 func (wc *nopWriteCloser) Close() error { return nil }
730
View as plain text