1
2
3
4 package diskv
5
6 import (
7 "bytes"
8 "errors"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "os"
13 "path/filepath"
14 "strings"
15 "sync"
16 "syscall"
17 )
18
19 const (
20 defaultBasePath = "diskv"
21 defaultFilePerm os.FileMode = 0666
22 defaultPathPerm os.FileMode = 0777
23 )
24
25 var (
26 defaultTransform = func(s string) []string { return []string{} }
27 errCanceled = errors.New("canceled")
28 errEmptyKey = errors.New("empty key")
29 errBadKey = errors.New("bad key")
30 errImportDirectory = errors.New("can't import a directory")
31 )
32
33
34
35
36
37
38
39 type TransformFunction func(s string) []string
40
41
42
43 type Options struct {
44 BasePath string
45 Transform TransformFunction
46 CacheSizeMax uint64
47 PathPerm os.FileMode
48 FilePerm os.FileMode
49
50
51
52
53
54 TempDir string
55
56 Index Index
57 IndexLess LessFunction
58
59 Compression Compression
60 }
61
62
63
64 type Diskv struct {
65 Options
66 mu sync.RWMutex
67 cache map[string][]byte
68 cacheSize uint64
69 }
70
71
72
73
74 func New(o Options) *Diskv {
75 if o.BasePath == "" {
76 o.BasePath = defaultBasePath
77 }
78 if o.Transform == nil {
79 o.Transform = defaultTransform
80 }
81 if o.PathPerm == 0 {
82 o.PathPerm = defaultPathPerm
83 }
84 if o.FilePerm == 0 {
85 o.FilePerm = defaultFilePerm
86 }
87
88 d := &Diskv{
89 Options: o,
90 cache: map[string][]byte{},
91 cacheSize: 0,
92 }
93
94 if d.Index != nil && d.IndexLess != nil {
95 d.Index.Initialize(d.IndexLess, d.Keys(nil))
96 }
97
98 return d
99 }
100
101
102
103
104 func (d *Diskv) Write(key string, val []byte) error {
105 return d.WriteStream(key, bytes.NewBuffer(val), false)
106 }
107
108
109
110
111
112
113 func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error {
114 if len(key) <= 0 {
115 return errEmptyKey
116 }
117
118 d.mu.Lock()
119 defer d.mu.Unlock()
120
121 return d.writeStreamWithLock(key, r, sync)
122 }
123
124
125
126 func (d *Diskv) createKeyFileWithLock(key string) (*os.File, error) {
127 if d.TempDir != "" {
128 if err := os.MkdirAll(d.TempDir, d.PathPerm); err != nil {
129 return nil, fmt.Errorf("temp mkdir: %s", err)
130 }
131 f, err := ioutil.TempFile(d.TempDir, "")
132 if err != nil {
133 return nil, fmt.Errorf("temp file: %s", err)
134 }
135
136 if err := f.Chmod(d.FilePerm); err != nil {
137 f.Close()
138 os.Remove(f.Name())
139 return nil, fmt.Errorf("chmod: %s", err)
140 }
141 return f, nil
142 }
143
144 mode := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
145 f, err := os.OpenFile(d.completeFilename(key), mode, d.FilePerm)
146 if err != nil {
147 return nil, fmt.Errorf("open file: %s", err)
148 }
149 return f, nil
150 }
151
152
153 func (d *Diskv) writeStreamWithLock(key string, r io.Reader, sync bool) error {
154 if err := d.ensurePathWithLock(key); err != nil {
155 return fmt.Errorf("ensure path: %s", err)
156 }
157
158 f, err := d.createKeyFileWithLock(key)
159 if err != nil {
160 return fmt.Errorf("create key file: %s", err)
161 }
162
163 wc := io.WriteCloser(&nopWriteCloser{f})
164 if d.Compression != nil {
165 wc, err = d.Compression.Writer(f)
166 if err != nil {
167 f.Close()
168 os.Remove(f.Name())
169 return fmt.Errorf("compression writer: %s", err)
170 }
171 }
172
173 if _, err := io.Copy(wc, r); err != nil {
174 f.Close()
175 os.Remove(f.Name())
176 return fmt.Errorf("i/o copy: %s", err)
177 }
178
179 if err := wc.Close(); err != nil {
180 f.Close()
181 os.Remove(f.Name())
182 return fmt.Errorf("compression close: %s", err)
183 }
184
185 if sync {
186 if err := f.Sync(); err != nil {
187 f.Close()
188 os.Remove(f.Name())
189 return fmt.Errorf("file sync: %s", err)
190 }
191 }
192
193 if err := f.Close(); err != nil {
194 return fmt.Errorf("file close: %s", err)
195 }
196
197 if f.Name() != d.completeFilename(key) {
198 if err := os.Rename(f.Name(), d.completeFilename(key)); err != nil {
199 os.Remove(f.Name())
200 return fmt.Errorf("rename: %s", err)
201 }
202 }
203
204 if d.Index != nil {
205 d.Index.Insert(key)
206 }
207
208 d.bustCacheWithLock(key)
209
210 return nil
211 }
212
213
214
215
216 func (d *Diskv) Import(srcFilename, dstKey string, move bool) (err error) {
217 if dstKey == "" {
218 return errEmptyKey
219 }
220
221 if fi, err := os.Stat(srcFilename); err != nil {
222 return err
223 } else if fi.IsDir() {
224 return errImportDirectory
225 }
226
227 d.mu.Lock()
228 defer d.mu.Unlock()
229
230 if err := d.ensurePathWithLock(dstKey); err != nil {
231 return fmt.Errorf("ensure path: %s", err)
232 }
233
234 if move {
235 if err := syscall.Rename(srcFilename, d.completeFilename(dstKey)); err == nil {
236 d.bustCacheWithLock(dstKey)
237 return nil
238 } else if err != syscall.EXDEV {
239
240 return err
241 }
242 }
243
244 f, err := os.Open(srcFilename)
245 if err != nil {
246 return err
247 }
248 defer f.Close()
249 err = d.writeStreamWithLock(dstKey, f, false)
250 if err == nil && move {
251 err = os.Remove(srcFilename)
252 }
253 return err
254 }
255
256
257
258
259
260 func (d *Diskv) Read(key string) ([]byte, error) {
261 rc, err := d.ReadStream(key, false)
262 if err != nil {
263 return []byte{}, err
264 }
265 defer rc.Close()
266 return ioutil.ReadAll(rc)
267 }
268
269
270
271
272
273
274
275
276
277
278
279 func (d *Diskv) ReadStream(key string, direct bool) (io.ReadCloser, error) {
280 d.mu.RLock()
281 defer d.mu.RUnlock()
282
283 if val, ok := d.cache[key]; ok {
284 if !direct {
285 buf := bytes.NewBuffer(val)
286 if d.Compression != nil {
287 return d.Compression.Reader(buf)
288 }
289 return ioutil.NopCloser(buf), nil
290 }
291
292 go func() {
293 d.mu.Lock()
294 defer d.mu.Unlock()
295 d.uncacheWithLock(key, uint64(len(val)))
296 }()
297 }
298
299 return d.readWithRLock(key)
300 }
301
302
303
304
305
306 func (d *Diskv) readWithRLock(key string) (io.ReadCloser, error) {
307 filename := d.completeFilename(key)
308
309 fi, err := os.Stat(filename)
310 if err != nil {
311 return nil, err
312 }
313 if fi.IsDir() {
314 return nil, os.ErrNotExist
315 }
316
317 f, err := os.Open(filename)
318 if err != nil {
319 return nil, err
320 }
321
322 var r io.Reader
323 if d.CacheSizeMax > 0 {
324 r = newSiphon(f, d, key)
325 } else {
326 r = &closingReader{f}
327 }
328
329 var rc = io.ReadCloser(ioutil.NopCloser(r))
330 if d.Compression != nil {
331 rc, err = d.Compression.Reader(r)
332 if err != nil {
333 return nil, err
334 }
335 }
336
337 return rc, nil
338 }
339
340
341
342 type closingReader struct {
343 rc io.ReadCloser
344 }
345
346 func (cr closingReader) Read(p []byte) (int, error) {
347 n, err := cr.rc.Read(p)
348 if err == io.EOF {
349 if closeErr := cr.rc.Close(); closeErr != nil {
350 return n, closeErr
351 }
352 }
353 return n, err
354 }
355
356
357
358 type siphon struct {
359 f *os.File
360 d *Diskv
361 key string
362 buf *bytes.Buffer
363 }
364
365
366
367
368 func newSiphon(f *os.File, d *Diskv, key string) io.Reader {
369 return &siphon{
370 f: f,
371 d: d,
372 key: key,
373 buf: &bytes.Buffer{},
374 }
375 }
376
377
378 func (s *siphon) Read(p []byte) (int, error) {
379 n, err := s.f.Read(p)
380
381 if err == nil {
382 return s.buf.Write(p[0:n])
383 }
384
385 if err == io.EOF {
386 s.d.cacheWithoutLock(s.key, s.buf.Bytes())
387 if closeErr := s.f.Close(); closeErr != nil {
388 return n, closeErr
389 }
390 return n, err
391 }
392
393 return n, err
394 }
395
396
397 func (d *Diskv) Erase(key string) error {
398 d.mu.Lock()
399 defer d.mu.Unlock()
400
401 d.bustCacheWithLock(key)
402
403
404 if d.Index != nil {
405 d.Index.Delete(key)
406 }
407
408
409 filename := d.completeFilename(key)
410 if s, err := os.Stat(filename); err == nil {
411 if s.IsDir() {
412 return errBadKey
413 }
414 if err = os.Remove(filename); err != nil {
415 return err
416 }
417 } else {
418
419 return err
420 }
421
422
423 d.pruneDirsWithLock(key)
424 return nil
425 }
426
427
428
429
430
431 func (d *Diskv) EraseAll() error {
432 d.mu.Lock()
433 defer d.mu.Unlock()
434 d.cache = make(map[string][]byte)
435 d.cacheSize = 0
436 if d.TempDir != "" {
437 os.RemoveAll(d.TempDir)
438 }
439 return os.RemoveAll(d.BasePath)
440 }
441
442
443 func (d *Diskv) Has(key string) bool {
444 d.mu.Lock()
445 defer d.mu.Unlock()
446
447 if _, ok := d.cache[key]; ok {
448 return true
449 }
450
451 filename := d.completeFilename(key)
452 s, err := os.Stat(filename)
453 if err != nil {
454 return false
455 }
456 if s.IsDir() {
457 return false
458 }
459
460 return true
461 }
462
463
464
465
466 func (d *Diskv) Keys(cancel <-chan struct{}) <-chan string {
467 return d.KeysPrefix("", cancel)
468 }
469
470
471
472
473
474 func (d *Diskv) KeysPrefix(prefix string, cancel <-chan struct{}) <-chan string {
475 var prepath string
476 if prefix == "" {
477 prepath = d.BasePath
478 } else {
479 prepath = d.pathFor(prefix)
480 }
481 c := make(chan string)
482 go func() {
483 filepath.Walk(prepath, walker(c, prefix, cancel))
484 close(c)
485 }()
486 return c
487 }
488
489
490
491 func walker(c chan<- string, prefix string, cancel <-chan struct{}) filepath.WalkFunc {
492 return func(path string, info os.FileInfo, err error) error {
493 if err != nil {
494 return err
495 }
496
497 if info.IsDir() || !strings.HasPrefix(info.Name(), prefix) {
498 return nil
499 }
500
501 select {
502 case c <- info.Name():
503 case <-cancel:
504 return errCanceled
505 }
506
507 return nil
508 }
509 }
510
511
512
513 func (d *Diskv) pathFor(key string) string {
514 return filepath.Join(d.BasePath, filepath.Join(d.Transform(key)...))
515 }
516
517
518
519 func (d *Diskv) ensurePathWithLock(key string) error {
520 return os.MkdirAll(d.pathFor(key), d.PathPerm)
521 }
522
523
524 func (d *Diskv) completeFilename(key string) string {
525 return filepath.Join(d.pathFor(key), key)
526 }
527
528
529
530 func (d *Diskv) cacheWithLock(key string, val []byte) error {
531 valueSize := uint64(len(val))
532 if err := d.ensureCacheSpaceWithLock(valueSize); err != nil {
533 return fmt.Errorf("%s; not caching", err)
534 }
535
536
537 if (d.cacheSize + valueSize) > d.CacheSizeMax {
538 panic(fmt.Sprintf("failed to make room for value (%d/%d)", valueSize, d.CacheSizeMax))
539 }
540
541 d.cache[key] = val
542 d.cacheSize += valueSize
543 return nil
544 }
545
546
547 func (d *Diskv) cacheWithoutLock(key string, val []byte) error {
548 d.mu.Lock()
549 defer d.mu.Unlock()
550 return d.cacheWithLock(key, val)
551 }
552
553 func (d *Diskv) bustCacheWithLock(key string) {
554 if val, ok := d.cache[key]; ok {
555 d.uncacheWithLock(key, uint64(len(val)))
556 }
557 }
558
559 func (d *Diskv) uncacheWithLock(key string, sz uint64) {
560 d.cacheSize -= sz
561 delete(d.cache, key)
562 }
563
564
565
566 func (d *Diskv) pruneDirsWithLock(key string) error {
567 pathlist := d.Transform(key)
568 for i := range pathlist {
569 dir := filepath.Join(d.BasePath, filepath.Join(pathlist[:len(pathlist)-i]...))
570
571
572 switch fi, err := os.Stat(dir); true {
573 case err != nil:
574 return err
575 case !fi.IsDir():
576 panic(fmt.Sprintf("corrupt dirstate at %s", dir))
577 }
578
579 nlinks, err := filepath.Glob(filepath.Join(dir, "*"))
580 if err != nil {
581 return err
582 } else if len(nlinks) > 0 {
583 return nil
584 }
585 if err = os.Remove(dir); err != nil {
586 return err
587 }
588 }
589
590 return nil
591 }
592
593
594
595 func (d *Diskv) ensureCacheSpaceWithLock(valueSize uint64) error {
596 if valueSize > d.CacheSizeMax {
597 return fmt.Errorf("value size (%d bytes) too large for cache (%d bytes)", valueSize, d.CacheSizeMax)
598 }
599
600 safe := func() bool { return (d.cacheSize + valueSize) <= d.CacheSizeMax }
601
602 for key, val := range d.cache {
603 if safe() {
604 break
605 }
606
607 d.uncacheWithLock(key, uint64(len(val)))
608 }
609
610 if !safe() {
611 panic(fmt.Sprintf("%d bytes still won't fit in the cache! (max %d bytes)", valueSize, d.CacheSizeMax))
612 }
613
614 return nil
615 }
616
617
618
619 type nopWriteCloser struct {
620 io.Writer
621 }
622
623 func (wc *nopWriteCloser) Write(p []byte) (int, error) { return wc.Writer.Write(p) }
624 func (wc *nopWriteCloser) Close() error { return nil }
625
View as plain text