1
2
3
4
5
6
7 package storage
8
9 import (
10 "errors"
11 "fmt"
12 "io"
13 "io/ioutil"
14 "os"
15 "path/filepath"
16 "runtime"
17 "sort"
18 "strconv"
19 "strings"
20 "sync"
21 "time"
22 )
23
24 var (
25 errFileOpen = errors.New("leveldb/storage: file still open")
26 errReadOnly = errors.New("leveldb/storage: storage is read-only")
27 )
28
29 type fileLock interface {
30 release() error
31 }
32
33 type fileStorageLock struct {
34 fs *fileStorage
35 }
36
37 func (lock *fileStorageLock) Unlock() {
38 if lock.fs != nil {
39 lock.fs.mu.Lock()
40 defer lock.fs.mu.Unlock()
41 if lock.fs.slock == lock {
42 lock.fs.slock = nil
43 }
44 }
45 }
46
47 type int64Slice []int64
48
49 func (p int64Slice) Len() int { return len(p) }
50 func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
51 func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
52
53 func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
54 f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
55 if err != nil {
56 return err
57 }
58 n, err := f.Write(data)
59 if err == nil && n < len(data) {
60 err = io.ErrShortWrite
61 }
62 if err1 := f.Sync(); err == nil {
63 err = err1
64 }
65 if err1 := f.Close(); err == nil {
66 err = err1
67 }
68 return err
69 }
70
71 const logSizeThreshold = 1024 * 1024
72
73
74 type fileStorage struct {
75 path string
76 readOnly bool
77
78 mu sync.Mutex
79 flock fileLock
80 slock *fileStorageLock
81 logw *os.File
82 logSize int64
83 buf []byte
84
85 open int
86 day int
87 }
88
89
90
91
92
93
94 func OpenFile(path string, readOnly bool) (Storage, error) {
95 if fi, err := os.Stat(path); err == nil {
96 if !fi.IsDir() {
97 return nil, fmt.Errorf("leveldb/storage: open %s: not a directory", path)
98 }
99 } else if os.IsNotExist(err) && !readOnly {
100 if err := os.MkdirAll(path, 0755); err != nil {
101 return nil, err
102 }
103 } else {
104 return nil, err
105 }
106
107 flock, err := newFileLock(filepath.Join(path, "LOCK"), readOnly)
108 if err != nil {
109 return nil, err
110 }
111
112 defer func() {
113 if err != nil {
114 if ferr := flock.release(); ferr != nil {
115 err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, ferr)
116 }
117 }
118 }()
119
120 var (
121 logw *os.File
122 logSize int64
123 )
124 if !readOnly {
125 logw, err = os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
126 if err != nil {
127 return nil, err
128 }
129 logSize, err = logw.Seek(0, os.SEEK_END)
130 if err != nil {
131 logw.Close()
132 return nil, err
133 }
134 }
135
136 fs := &fileStorage{
137 path: path,
138 readOnly: readOnly,
139 flock: flock,
140 logw: logw,
141 logSize: logSize,
142 }
143 runtime.SetFinalizer(fs, (*fileStorage).Close)
144 return fs, nil
145 }
146
147 func (fs *fileStorage) Lock() (Locker, error) {
148 fs.mu.Lock()
149 defer fs.mu.Unlock()
150 if fs.open < 0 {
151 return nil, ErrClosed
152 }
153 if fs.readOnly {
154 return &fileStorageLock{}, nil
155 }
156 if fs.slock != nil {
157 return nil, ErrLocked
158 }
159 fs.slock = &fileStorageLock{fs: fs}
160 return fs.slock, nil
161 }
162
163 func itoa(buf []byte, i int, wid int) []byte {
164 u := uint(i)
165 if u == 0 && wid <= 1 {
166 return append(buf, '0')
167 }
168
169
170 var b [32]byte
171 bp := len(b)
172 for ; u > 0 || wid > 0; u /= 10 {
173 bp--
174 wid--
175 b[bp] = byte(u%10) + '0'
176 }
177 return append(buf, b[bp:]...)
178 }
179
180 func (fs *fileStorage) printDay(t time.Time) error {
181 if fs.day == t.Day() {
182 return nil
183 }
184 fs.day = t.Day()
185 _, err := fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n"))
186 return err
187 }
188
189 func (fs *fileStorage) doLog(t time.Time, str string) {
190 if fs.logSize > logSizeThreshold {
191
192 fs.logw.Close()
193 fs.logw = nil
194 fs.logSize = 0
195 if err := rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old")); err != nil {
196 return
197 }
198 }
199 if fs.logw == nil {
200 var err error
201 fs.logw, err = os.OpenFile(filepath.Join(fs.path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
202 if err != nil {
203 return
204 }
205
206 fs.day = 0
207 }
208 if err := fs.printDay(t); err != nil {
209 return
210 }
211 hour, min, sec := t.Clock()
212 msec := t.Nanosecond() / 1e3
213
214 fs.buf = itoa(fs.buf[:0], hour, 2)
215 fs.buf = append(fs.buf, ':')
216 fs.buf = itoa(fs.buf, min, 2)
217 fs.buf = append(fs.buf, ':')
218 fs.buf = itoa(fs.buf, sec, 2)
219 fs.buf = append(fs.buf, '.')
220 fs.buf = itoa(fs.buf, msec, 6)
221 fs.buf = append(fs.buf, ' ')
222
223 fs.buf = append(fs.buf, []byte(str)...)
224 fs.buf = append(fs.buf, '\n')
225 n, _ := fs.logw.Write(fs.buf)
226 fs.logSize += int64(n)
227 }
228
229 func (fs *fileStorage) Log(str string) {
230 if !fs.readOnly {
231 t := time.Now()
232 fs.mu.Lock()
233 defer fs.mu.Unlock()
234 if fs.open < 0 {
235 return
236 }
237 fs.doLog(t, str)
238 }
239 }
240
241 func (fs *fileStorage) log(str string) {
242 if !fs.readOnly {
243 fs.doLog(time.Now(), str)
244 }
245 }
246
247 func (fs *fileStorage) setMeta(fd FileDesc) error {
248 content := fsGenName(fd) + "\n"
249
250 currentPath := filepath.Join(fs.path, "CURRENT")
251 if _, err := os.Stat(currentPath); err == nil {
252 b, err := ioutil.ReadFile(currentPath)
253 if err != nil {
254 fs.log(fmt.Sprintf("backup CURRENT: %v", err))
255 return err
256 }
257 if string(b) == content {
258
259 return nil
260 }
261 if err := writeFileSynced(currentPath+".bak", b, 0644); err != nil {
262 fs.log(fmt.Sprintf("backup CURRENT: %v", err))
263 return err
264 }
265 } else if !os.IsNotExist(err) {
266 return err
267 }
268 path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
269 if err := writeFileSynced(path, []byte(content), 0644); err != nil {
270 fs.log(fmt.Sprintf("create CURRENT.%d: %v", fd.Num, err))
271 return err
272 }
273
274 if err := rename(path, currentPath); err != nil {
275 fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
276 return err
277 }
278
279 if err := syncDir(fs.path); err != nil {
280 fs.log(fmt.Sprintf("syncDir: %v", err))
281 return err
282 }
283 return nil
284 }
285
286 func (fs *fileStorage) SetMeta(fd FileDesc) error {
287 if !FileDescOk(fd) {
288 return ErrInvalidFile
289 }
290 if fs.readOnly {
291 return errReadOnly
292 }
293
294 fs.mu.Lock()
295 defer fs.mu.Unlock()
296 if fs.open < 0 {
297 return ErrClosed
298 }
299 return fs.setMeta(fd)
300 }
301
302 func (fs *fileStorage) GetMeta() (FileDesc, error) {
303 fs.mu.Lock()
304 defer fs.mu.Unlock()
305 if fs.open < 0 {
306 return FileDesc{}, ErrClosed
307 }
308 dir, err := os.Open(fs.path)
309 if err != nil {
310 return FileDesc{}, err
311 }
312 names, err := dir.Readdirnames(0)
313
314 if ce := dir.Close(); ce != nil {
315 fs.log(fmt.Sprintf("close dir: %v", ce))
316 }
317 if err != nil {
318 return FileDesc{}, err
319 }
320
321
322
323
324
325
326 type currentFile struct {
327 name string
328 fd FileDesc
329 }
330 tryCurrent := func(name string) (*currentFile, error) {
331 b, err := ioutil.ReadFile(filepath.Join(fs.path, name))
332 if err != nil {
333 if os.IsNotExist(err) {
334 err = os.ErrNotExist
335 }
336 return nil, err
337 }
338 var fd FileDesc
339 if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd) {
340 fs.log(fmt.Sprintf("%s: corrupted content: %q", name, b))
341 err := &ErrCorrupted{
342 Err: errors.New("leveldb/storage: corrupted or incomplete CURRENT file"),
343 }
344 return nil, err
345 }
346 if _, err := os.Stat(filepath.Join(fs.path, fsGenName(fd))); err != nil {
347 if os.IsNotExist(err) {
348 fs.log(fmt.Sprintf("%s: missing target file: %s", name, fd))
349 err = os.ErrNotExist
350 }
351 return nil, err
352 }
353 return ¤tFile{name: name, fd: fd}, nil
354 }
355 tryCurrents := func(names []string) (*currentFile, error) {
356 var (
357 cur *currentFile
358
359 lastCerr error
360 )
361 for _, name := range names {
362 var err error
363 cur, err = tryCurrent(name)
364 if err == nil {
365 break
366 } else if err == os.ErrNotExist {
367
368 } else if isCorrupted(err) {
369 lastCerr = err
370
371 } else {
372
373 return nil, err
374 }
375 }
376 if cur == nil {
377 err := os.ErrNotExist
378 if lastCerr != nil {
379 err = lastCerr
380 }
381 return nil, err
382 }
383 return cur, nil
384 }
385
386
387 var nums []int64
388 for _, name := range names {
389 if strings.HasPrefix(name, "CURRENT.") && name != "CURRENT.bak" {
390 i, err := strconv.ParseInt(name[8:], 10, 64)
391 if err == nil {
392 nums = append(nums, i)
393 }
394 }
395 }
396 var (
397 pendCur *currentFile
398 pendErr = os.ErrNotExist
399 pendNames []string
400 )
401 if len(nums) > 0 {
402 sort.Sort(sort.Reverse(int64Slice(nums)))
403 pendNames = make([]string, len(nums))
404 for i, num := range nums {
405 pendNames[i] = fmt.Sprintf("CURRENT.%d", num)
406 }
407 pendCur, pendErr = tryCurrents(pendNames)
408 if pendErr != nil && pendErr != os.ErrNotExist && !isCorrupted(pendErr) {
409 return FileDesc{}, pendErr
410 }
411 }
412
413
414 curCur, curErr := tryCurrents([]string{"CURRENT", "CURRENT.bak"})
415 if curErr != nil && curErr != os.ErrNotExist && !isCorrupted(curErr) {
416 return FileDesc{}, curErr
417 }
418
419
420 if pendCur != nil && (curCur == nil || pendCur.fd.Num > curCur.fd.Num) {
421 curCur = pendCur
422 }
423
424 if curCur != nil {
425
426 if !fs.readOnly && (curCur.name != "CURRENT" || len(pendNames) != 0) {
427
428
429 if err := fs.setMeta(curCur.fd); err == nil {
430
431 for _, name := range pendNames {
432 if err := os.Remove(filepath.Join(fs.path, name)); err != nil {
433 fs.log(fmt.Sprintf("remove %s: %v", name, err))
434 }
435 }
436 }
437 }
438 return curCur.fd, nil
439 }
440
441
442 if isCorrupted(pendErr) {
443 return FileDesc{}, pendErr
444 }
445 return FileDesc{}, curErr
446 }
447
448 func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) {
449 fs.mu.Lock()
450 defer fs.mu.Unlock()
451 if fs.open < 0 {
452 return nil, ErrClosed
453 }
454 dir, err := os.Open(fs.path)
455 if err != nil {
456 return
457 }
458 names, err := dir.Readdirnames(0)
459
460 if cerr := dir.Close(); cerr != nil {
461 fs.log(fmt.Sprintf("close dir: %v", cerr))
462 }
463 if err == nil {
464 for _, name := range names {
465 if fd, ok := fsParseName(name); ok && fd.Type&ft != 0 {
466 fds = append(fds, fd)
467 }
468 }
469 }
470 return
471 }
472
473 func (fs *fileStorage) Open(fd FileDesc) (Reader, error) {
474 if !FileDescOk(fd) {
475 return nil, ErrInvalidFile
476 }
477
478 fs.mu.Lock()
479 defer fs.mu.Unlock()
480 if fs.open < 0 {
481 return nil, ErrClosed
482 }
483 of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_RDONLY, 0)
484 if err != nil {
485 if fsHasOldName(fd) && os.IsNotExist(err) {
486 of, err = os.OpenFile(filepath.Join(fs.path, fsGenOldName(fd)), os.O_RDONLY, 0)
487 if err == nil {
488 goto ok
489 }
490 }
491 return nil, err
492 }
493 ok:
494 fs.open++
495 return &fileWrap{File: of, fs: fs, fd: fd}, nil
496 }
497
498 func (fs *fileStorage) Create(fd FileDesc) (Writer, error) {
499 if !FileDescOk(fd) {
500 return nil, ErrInvalidFile
501 }
502 if fs.readOnly {
503 return nil, errReadOnly
504 }
505
506 fs.mu.Lock()
507 defer fs.mu.Unlock()
508 if fs.open < 0 {
509 return nil, ErrClosed
510 }
511 of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
512 if err != nil {
513 return nil, err
514 }
515 fs.open++
516 return &fileWrap{File: of, fs: fs, fd: fd}, nil
517 }
518
519 func (fs *fileStorage) Remove(fd FileDesc) error {
520 if !FileDescOk(fd) {
521 return ErrInvalidFile
522 }
523 if fs.readOnly {
524 return errReadOnly
525 }
526
527 fs.mu.Lock()
528 defer fs.mu.Unlock()
529 if fs.open < 0 {
530 return ErrClosed
531 }
532 err := os.Remove(filepath.Join(fs.path, fsGenName(fd)))
533 if err != nil {
534 if fsHasOldName(fd) && os.IsNotExist(err) {
535 if e1 := os.Remove(filepath.Join(fs.path, fsGenOldName(fd))); !os.IsNotExist(e1) {
536 fs.log(fmt.Sprintf("remove %s: %v (old name)", fd, err))
537 err = e1
538 }
539 } else {
540 fs.log(fmt.Sprintf("remove %s: %v", fd, err))
541 }
542 }
543 return err
544 }
545
546 func (fs *fileStorage) Rename(oldfd, newfd FileDesc) error {
547 if !FileDescOk(oldfd) || !FileDescOk(newfd) {
548 return ErrInvalidFile
549 }
550 if oldfd == newfd {
551 return nil
552 }
553 if fs.readOnly {
554 return errReadOnly
555 }
556
557 fs.mu.Lock()
558 defer fs.mu.Unlock()
559 if fs.open < 0 {
560 return ErrClosed
561 }
562 return rename(filepath.Join(fs.path, fsGenName(oldfd)), filepath.Join(fs.path, fsGenName(newfd)))
563 }
564
565 func (fs *fileStorage) Close() error {
566 fs.mu.Lock()
567 defer fs.mu.Unlock()
568 if fs.open < 0 {
569 return ErrClosed
570 }
571
572 runtime.SetFinalizer(fs, nil)
573
574 if fs.open > 0 {
575 fs.log(fmt.Sprintf("close: warning, %d files still open", fs.open))
576 }
577 fs.open = -1
578 if fs.logw != nil {
579 fs.logw.Close()
580 }
581 return fs.flock.release()
582 }
583
584 type fileWrap struct {
585 *os.File
586 fs *fileStorage
587 fd FileDesc
588 closed bool
589 }
590
591 func (fw *fileWrap) Sync() error {
592 if err := fw.File.Sync(); err != nil {
593 return err
594 }
595 if fw.fd.Type == TypeManifest {
596
597
598 if err := syncDir(fw.fs.path); err != nil {
599 fw.fs.log(fmt.Sprintf("syncDir: %v", err))
600 return err
601 }
602 }
603 return nil
604 }
605
606 func (fw *fileWrap) Close() error {
607 fw.fs.mu.Lock()
608 defer fw.fs.mu.Unlock()
609 if fw.closed {
610 return ErrClosed
611 }
612 fw.closed = true
613 fw.fs.open--
614 err := fw.File.Close()
615 if err != nil {
616 fw.fs.log(fmt.Sprintf("close %s: %v", fw.fd, err))
617 }
618 return err
619 }
620
621 func fsGenName(fd FileDesc) string {
622 switch fd.Type {
623 case TypeManifest:
624 return fmt.Sprintf("MANIFEST-%06d", fd.Num)
625 case TypeJournal:
626 return fmt.Sprintf("%06d.log", fd.Num)
627 case TypeTable:
628 return fmt.Sprintf("%06d.ldb", fd.Num)
629 case TypeTemp:
630 return fmt.Sprintf("%06d.tmp", fd.Num)
631 default:
632 panic("invalid file type")
633 }
634 }
635
636 func fsHasOldName(fd FileDesc) bool {
637 return fd.Type == TypeTable
638 }
639
640 func fsGenOldName(fd FileDesc) string {
641 switch fd.Type {
642 case TypeTable:
643 return fmt.Sprintf("%06d.sst", fd.Num)
644 default:
645 return fsGenName(fd)
646 }
647 }
648
649 func fsParseName(name string) (fd FileDesc, ok bool) {
650 var tail string
651 _, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail)
652 if err == nil {
653 switch tail {
654 case "log":
655 fd.Type = TypeJournal
656 case "ldb", "sst":
657 fd.Type = TypeTable
658 case "tmp":
659 fd.Type = TypeTemp
660 default:
661 return
662 }
663 return fd, true
664 }
665 n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail)
666 if n == 1 {
667 fd.Type = TypeManifest
668 return fd, true
669 }
670 return
671 }
672
673 func fsParseNamePtr(name string, fd *FileDesc) bool {
674 _fd, ok := fsParseName(name)
675 if fd != nil {
676 *fd = _fd
677 }
678 return ok
679 }
680
View as plain text