1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "container/list"
11 "fmt"
12 "io"
13 "os"
14 "runtime"
15 "strings"
16 "sync"
17 "sync/atomic"
18 "time"
19
20 "github.com/syndtr/goleveldb/leveldb/cache"
21 "github.com/syndtr/goleveldb/leveldb/errors"
22 "github.com/syndtr/goleveldb/leveldb/iterator"
23 "github.com/syndtr/goleveldb/leveldb/journal"
24 "github.com/syndtr/goleveldb/leveldb/memdb"
25 "github.com/syndtr/goleveldb/leveldb/opt"
26 "github.com/syndtr/goleveldb/leveldb/storage"
27 "github.com/syndtr/goleveldb/leveldb/table"
28 "github.com/syndtr/goleveldb/leveldb/util"
29 )
30
31
32 type DB struct {
33
34 seq uint64
35
36
37 cWriteDelay int64
38 cWriteDelayN int32
39 inWritePaused int32
40 aliveSnaps, aliveIters int32
41
42
43 memComp uint32
44 level0Comp uint32
45 nonLevel0Comp uint32
46 seekComp uint32
47
48
49 s *session
50
51
52 memMu sync.RWMutex
53 memPool chan *memdb.DB
54 mem, frozenMem *memDB
55 journal *journal.Writer
56 journalWriter storage.Writer
57 journalFd storage.FileDesc
58 frozenJournalFd storage.FileDesc
59 frozenSeq uint64
60
61
62 snapsMu sync.Mutex
63 snapsList *list.List
64
65
66 batchPool sync.Pool
67 writeMergeC chan writeMerge
68 writeMergedC chan bool
69 writeLockC chan struct{}
70 writeAckC chan error
71 writeDelay time.Duration
72 writeDelayN int
73 tr *Transaction
74
75
76 compCommitLk sync.Mutex
77 tcompCmdC chan cCmd
78 tcompPauseC chan chan<- struct{}
79 mcompCmdC chan cCmd
80 compErrC chan error
81 compPerErrC chan error
82 compErrSetC chan error
83 compWriteLocking bool
84 compStats cStats
85 memdbMaxLevel int
86
87
88 closeW sync.WaitGroup
89 closeC chan struct{}
90 closed uint32
91 closer io.Closer
92 }
93
94 func openDB(s *session) (*DB, error) {
95 s.log("db@open opening")
96 start := time.Now()
97 db := &DB{
98 s: s,
99
100 seq: s.stSeqNum,
101
102 memPool: make(chan *memdb.DB, 1),
103
104 snapsList: list.New(),
105
106 batchPool: sync.Pool{New: newBatch},
107 writeMergeC: make(chan writeMerge),
108 writeMergedC: make(chan bool),
109 writeLockC: make(chan struct{}, 1),
110 writeAckC: make(chan error),
111
112 tcompCmdC: make(chan cCmd),
113 tcompPauseC: make(chan chan<- struct{}),
114 mcompCmdC: make(chan cCmd),
115 compErrC: make(chan error),
116 compPerErrC: make(chan error),
117 compErrSetC: make(chan error),
118
119 closeC: make(chan struct{}),
120 }
121
122
123 readOnly := s.o.GetReadOnly()
124
125 if readOnly {
126
127 if err := db.recoverJournalRO(); err != nil {
128 return nil, err
129 }
130 } else {
131
132 if err := db.recoverJournal(); err != nil {
133 return nil, err
134 }
135
136
137 if err := db.checkAndCleanFiles(); err != nil {
138
139 if db.journal != nil {
140 db.journal.Close()
141 db.journalWriter.Close()
142 }
143 return nil, err
144 }
145 }
146
147
148 go db.compactionError()
149 go db.mpoolDrain()
150
151 if readOnly {
152 if err := db.SetReadOnly(); err != nil {
153 return nil, err
154 }
155 } else {
156 db.closeW.Add(2)
157 go db.tCompaction()
158 go db.mCompaction()
159
160 }
161
162 s.logf("db@open done T·%v", time.Since(start))
163
164 runtime.SetFinalizer(db, (*DB).Close)
165 return db, nil
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179 func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
180 s, err := newSession(stor, o)
181 if err != nil {
182 return
183 }
184 defer func() {
185 if err != nil {
186 s.close()
187 s.release()
188 }
189 }()
190
191 err = s.recover()
192 if err != nil {
193 if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() {
194 return
195 }
196 err = s.create()
197 if err != nil {
198 return
199 }
200 } else if s.o.GetErrorIfExist() {
201 err = os.ErrExist
202 return
203 }
204
205 return openDB(s)
206 }
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222 func OpenFile(path string, o *opt.Options) (db *DB, err error) {
223 stor, err := storage.OpenFile(path, o.GetReadOnly())
224 if err != nil {
225 return
226 }
227 db, err = Open(stor, o)
228 if err != nil {
229 stor.Close()
230 } else {
231 db.closer = stor
232 }
233 return
234 }
235
236
237
238
239
240
241
242
243 func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
244 s, err := newSession(stor, o)
245 if err != nil {
246 return
247 }
248 defer func() {
249 if err != nil {
250 s.close()
251 s.release()
252 }
253 }()
254
255 err = recoverTable(s, o)
256 if err != nil {
257 return
258 }
259 return openDB(s)
260 }
261
262
263
264
265
266
267
268
269
270
271
272 func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
273 stor, err := storage.OpenFile(path, false)
274 if err != nil {
275 return
276 }
277 db, err = Recover(stor, o)
278 if err != nil {
279 stor.Close()
280 } else {
281 db.closer = stor
282 }
283 return
284 }
285
286 func recoverTable(s *session, o *opt.Options) error {
287 o = dupOptions(o)
288
289 o.Strict &= ^opt.StrictReader
290
291
292 fds, err := s.stor.List(storage.TypeTable)
293 if err != nil {
294 return err
295 }
296 sortFds(fds)
297
298 var (
299 maxSeq uint64
300 recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
301
302
303 strict = o.GetStrict(opt.StrictRecovery)
304 noSync = o.GetNoSync()
305
306 rec = &sessionRecord{}
307 bpool = util.NewBufferPool(o.GetBlockSize() + 5)
308 )
309 buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
310 tmpFd = s.newTemp()
311 writer, err := s.stor.Create(tmpFd)
312 if err != nil {
313 return
314 }
315 defer func() {
316 if cerr := writer.Close(); cerr != nil {
317 if err == nil {
318 err = cerr
319 } else {
320 err = fmt.Errorf("error recovering table (%v); error closing (%v)", err, cerr)
321 }
322 }
323 if err != nil {
324 if rerr := s.stor.Remove(tmpFd); rerr != nil {
325 err = fmt.Errorf("error recovering table (%v); error removing (%v)", err, rerr)
326 }
327 tmpFd = storage.FileDesc{}
328 }
329 }()
330
331
332 tw := table.NewWriter(writer, o, nil, 0)
333 for iter.Next() {
334 key := iter.Key()
335 if validInternalKey(key) {
336 err = tw.Append(key, iter.Value())
337 if err != nil {
338 return
339 }
340 }
341 }
342 err = iter.Error()
343 if err != nil && !errors.IsCorrupted(err) {
344 return
345 }
346 err = tw.Close()
347 if err != nil {
348 return
349 }
350 if !noSync {
351 err = writer.Sync()
352 if err != nil {
353 return
354 }
355 }
356 size = int64(tw.BytesLen())
357 return
358 }
359 recoverTable := func(fd storage.FileDesc) error {
360 s.logf("table@recovery recovering @%d", fd.Num)
361 reader, err := s.stor.Open(fd)
362 if err != nil {
363 return err
364 }
365 var closed bool
366 defer func() {
367 if !closed {
368 reader.Close()
369 }
370 }()
371
372
373 size, err := reader.Seek(0, 2)
374 if err != nil {
375 return err
376 }
377
378 var (
379 tSeq uint64
380 tgoodKey, tcorruptedKey, tcorruptedBlock int
381 imin, imax []byte
382 )
383 tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
384 if err != nil {
385 return err
386 }
387 iter := tr.NewIterator(nil, nil)
388 if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
389 itererr.SetErrorCallback(func(err error) {
390 if errors.IsCorrupted(err) {
391 s.logf("table@recovery block corruption @%d %q", fd.Num, err)
392 tcorruptedBlock++
393 }
394 })
395 }
396
397
398 for iter.Next() {
399 key := iter.Key()
400 _, seq, _, kerr := parseInternalKey(key)
401 if kerr != nil {
402 tcorruptedKey++
403 continue
404 }
405 tgoodKey++
406 if seq > tSeq {
407 tSeq = seq
408 }
409 if imin == nil {
410 imin = append([]byte(nil), key...)
411 }
412 imax = append(imax[:0], key...)
413 }
414 if err := iter.Error(); err != nil && !errors.IsCorrupted(err) {
415 iter.Release()
416 return err
417 }
418 iter.Release()
419
420 goodKey += tgoodKey
421 corruptedKey += tcorruptedKey
422 corruptedBlock += tcorruptedBlock
423
424 if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
425 droppedTable++
426 s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
427 return nil
428 }
429
430 if tgoodKey > 0 {
431 if tcorruptedKey > 0 || tcorruptedBlock > 0 {
432
433 s.logf("table@recovery rebuilding @%d", fd.Num)
434 iter := tr.NewIterator(nil, nil)
435 tmpFd, newSize, err := buildTable(iter)
436 iter.Release()
437 if err != nil {
438 return err
439 }
440 closed = true
441 reader.Close()
442 if err := s.stor.Rename(tmpFd, fd); err != nil {
443 return err
444 }
445 size = newSize
446 }
447 if tSeq > maxSeq {
448 maxSeq = tSeq
449 }
450 recoveredKey += tgoodKey
451
452 rec.addTable(0, fd.Num, size, imin, imax)
453 s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
454 } else {
455 droppedTable++
456 s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
457 }
458
459 return nil
460 }
461
462
463 if len(fds) > 0 {
464 s.logf("table@recovery F·%d", len(fds))
465
466
467 s.markFileNum(fds[len(fds)-1].Num)
468
469 for _, fd := range fds {
470 if err := recoverTable(fd); err != nil {
471 return err
472 }
473 }
474
475 s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
476 }
477
478
479 rec.setSeqNum(maxSeq)
480
481
482 if err := s.create(); err != nil {
483 return err
484 }
485
486
487 return s.commit(rec, false)
488 }
489
490 func (db *DB) recoverJournal() error {
491
492 rawFds, err := db.s.stor.List(storage.TypeJournal)
493 if err != nil {
494 return err
495 }
496 sortFds(rawFds)
497
498
499 var fds []storage.FileDesc
500 for _, fd := range rawFds {
501 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
502 fds = append(fds, fd)
503 }
504 }
505
506 var (
507 ofd storage.FileDesc
508 rec = &sessionRecord{}
509 )
510
511
512 if len(fds) > 0 {
513 db.logf("journal@recovery F·%d", len(fds))
514
515
516 db.s.markFileNum(fds[len(fds)-1].Num)
517
518 var (
519
520 strict = db.s.o.GetStrict(opt.StrictJournal)
521 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
522 writeBuffer = db.s.o.GetWriteBuffer()
523
524 jr *journal.Reader
525 mdb = memdb.New(db.s.icmp, writeBuffer)
526 buf = &util.Buffer{}
527 batchSeq uint64
528 batchLen int
529 )
530
531 for _, fd := range fds {
532 db.logf("journal@recovery recovering @%d", fd.Num)
533
534 fr, err := db.s.stor.Open(fd)
535 if err != nil {
536 return err
537 }
538
539
540 if jr == nil {
541 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
542 } else {
543
544 _ = jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
545 }
546
547
548 if !ofd.Zero() {
549 if mdb.Len() > 0 {
550 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
551 fr.Close()
552 return err
553 }
554 }
555
556 rec.setJournalNum(fd.Num)
557 rec.setSeqNum(db.seq)
558 if err := db.s.commit(rec, false); err != nil {
559 fr.Close()
560 return err
561 }
562 rec.resetAddedTables()
563
564 if err := db.s.stor.Remove(ofd); err != nil {
565 fr.Close()
566 return err
567 }
568 ofd = storage.FileDesc{}
569 }
570
571
572 mdb.Reset()
573 for {
574 r, err := jr.Next()
575 if err != nil {
576 if err == io.EOF {
577 break
578 }
579
580 fr.Close()
581 return errors.SetFd(err, fd)
582 }
583
584 buf.Reset()
585 if _, err := buf.ReadFrom(r); err != nil {
586 if err == io.ErrUnexpectedEOF {
587
588 continue
589 }
590
591 fr.Close()
592 return errors.SetFd(err, fd)
593 }
594 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
595 if err != nil {
596 if !strict && errors.IsCorrupted(err) {
597 db.s.logf("journal error: %v (skipped)", err)
598
599 continue
600 }
601
602 fr.Close()
603 return errors.SetFd(err, fd)
604 }
605
606
607 db.seq = batchSeq + uint64(batchLen)
608
609
610 if mdb.Size() >= writeBuffer {
611 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
612 fr.Close()
613 return err
614 }
615
616 mdb.Reset()
617 }
618 }
619
620 fr.Close()
621 ofd = fd
622 }
623
624
625 if mdb.Len() > 0 {
626 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
627 return err
628 }
629 }
630 }
631
632
633 if _, err := db.newMem(0); err != nil {
634 return err
635 }
636
637
638 rec.setJournalNum(db.journalFd.Num)
639 rec.setSeqNum(db.seq)
640 if err := db.s.commit(rec, false); err != nil {
641
642 if db.journal != nil {
643 db.journal.Close()
644 db.journalWriter.Close()
645 }
646 return err
647 }
648
649
650 if !ofd.Zero() {
651 if err := db.s.stor.Remove(ofd); err != nil {
652 return err
653 }
654 }
655
656 return nil
657 }
658
659 func (db *DB) recoverJournalRO() error {
660
661 rawFds, err := db.s.stor.List(storage.TypeJournal)
662 if err != nil {
663 return err
664 }
665 sortFds(rawFds)
666
667
668 var fds []storage.FileDesc
669 for _, fd := range rawFds {
670 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
671 fds = append(fds, fd)
672 }
673 }
674
675 var (
676
677 strict = db.s.o.GetStrict(opt.StrictJournal)
678 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
679 writeBuffer = db.s.o.GetWriteBuffer()
680
681 mdb = memdb.New(db.s.icmp, writeBuffer)
682 )
683
684
685 if len(fds) > 0 {
686 db.logf("journal@recovery RO·Mode F·%d", len(fds))
687
688 var (
689 jr *journal.Reader
690 buf = &util.Buffer{}
691 batchSeq uint64
692 batchLen int
693 )
694
695 for _, fd := range fds {
696 db.logf("journal@recovery recovering @%d", fd.Num)
697
698 fr, err := db.s.stor.Open(fd)
699 if err != nil {
700 return err
701 }
702
703
704 if jr == nil {
705 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
706 } else {
707 if err := jr.Reset(fr, dropper{db.s, fd}, strict, checksum); err != nil {
708 return err
709 }
710 }
711
712
713 for {
714 r, err := jr.Next()
715 if err != nil {
716 if err == io.EOF {
717 break
718 }
719
720 fr.Close()
721 return errors.SetFd(err, fd)
722 }
723
724 buf.Reset()
725 if _, err := buf.ReadFrom(r); err != nil {
726 if err == io.ErrUnexpectedEOF {
727
728 continue
729 }
730
731 fr.Close()
732 return errors.SetFd(err, fd)
733 }
734 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
735 if err != nil {
736 if !strict && errors.IsCorrupted(err) {
737 db.s.logf("journal error: %v (skipped)", err)
738
739 continue
740 }
741
742 fr.Close()
743 return errors.SetFd(err, fd)
744 }
745
746
747 db.seq = batchSeq + uint64(batchLen)
748 }
749
750 fr.Close()
751 }
752 }
753
754
755 db.mem = &memDB{db: db, DB: mdb, ref: 1}
756
757 return nil
758 }
759
760 func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
761 mk, mv, err := mdb.Find(ikey)
762 if err == nil {
763 ukey, _, kt, kerr := parseInternalKey(mk)
764 if kerr != nil {
765
766 panic(kerr)
767 }
768 if icmp.uCompare(ukey, ikey.ukey()) == 0 {
769 if kt == keyTypeDel {
770 return true, nil, ErrNotFound
771 }
772 return true, mv, nil
773
774 }
775 } else if err != ErrNotFound {
776 return true, nil, err
777 }
778 return
779 }
780
781 func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
782 ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
783
784 if auxm != nil {
785 if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
786 return append([]byte(nil), mv...), me
787 }
788 }
789
790 em, fm := db.getMems()
791 for _, m := range [...]*memDB{em, fm} {
792 if m == nil {
793 continue
794 }
795 defer m.decref()
796
797 if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
798 return append([]byte(nil), mv...), me
799 }
800 }
801
802 v := db.s.version()
803 value, cSched, err := v.get(auxt, ikey, ro, false)
804 v.release()
805 if cSched {
806
807 db.compTrigger(db.tcompCmdC)
808 }
809 return
810 }
811
812 func nilIfNotFound(err error) error {
813 if err == ErrNotFound {
814 return nil
815 }
816 return err
817 }
818
819 func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
820 ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
821
822 if auxm != nil {
823 if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
824 return me == nil, nilIfNotFound(me)
825 }
826 }
827
828 em, fm := db.getMems()
829 for _, m := range [...]*memDB{em, fm} {
830 if m == nil {
831 continue
832 }
833 defer m.decref()
834
835 if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
836 return me == nil, nilIfNotFound(me)
837 }
838 }
839
840 v := db.s.version()
841 _, cSched, err := v.get(auxt, ikey, ro, true)
842 v.release()
843 if cSched {
844
845 db.compTrigger(db.tcompCmdC)
846 }
847 if err == nil {
848 ret = true
849 } else if err == ErrNotFound {
850 err = nil
851 }
852 return
853 }
854
855
856
857
858
859
860
861 func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
862 err = db.ok()
863 if err != nil {
864 return
865 }
866
867 se := db.acquireSnapshot()
868 defer db.releaseSnapshot(se)
869 return db.get(nil, nil, key, se.seq, ro)
870 }
871
872
873
874
875 func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
876 err = db.ok()
877 if err != nil {
878 return
879 }
880
881 se := db.acquireSnapshot()
882 defer db.releaseSnapshot(se)
883 return db.has(nil, nil, key, se.seq, ro)
884 }
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906 func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
907 if err := db.ok(); err != nil {
908 return iterator.NewEmptyIterator(err)
909 }
910
911 se := db.acquireSnapshot()
912 defer db.releaseSnapshot(se)
913
914
915 return db.newIterator(nil, nil, se.seq, slice, ro)
916 }
917
918
919
920
921
922
923 func (db *DB) GetSnapshot() (*Snapshot, error) {
924 if err := db.ok(); err != nil {
925 return nil, err
926 }
927
928 return db.newSnapshot(), nil
929 }
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954 func (db *DB) GetProperty(name string) (value string, err error) {
955 err = db.ok()
956 if err != nil {
957 return
958 }
959
960 const prefix = "leveldb."
961 if !strings.HasPrefix(name, prefix) {
962 return "", ErrNotFound
963 }
964 p := name[len(prefix):]
965
966 v := db.s.version()
967 defer v.release()
968
969 numFilesPrefix := "num-files-at-level"
970 switch {
971 case strings.HasPrefix(p, numFilesPrefix):
972 var level uint
973 var rest string
974 n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
975 if n != 1 {
976 err = ErrNotFound
977 } else {
978 value = fmt.Sprint(v.tLen(int(level)))
979 }
980 case p == "stats":
981 value = "Compactions\n" +
982 " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
983 "-------+------------+---------------+---------------+---------------+---------------\n"
984 var totalTables int
985 var totalSize, totalRead, totalWrite int64
986 var totalDuration time.Duration
987 for level, tables := range v.levels {
988 duration, read, write := db.compStats.getStat(level)
989 if len(tables) == 0 && duration == 0 {
990 continue
991 }
992 totalTables += len(tables)
993 totalSize += tables.size()
994 totalRead += read
995 totalWrite += write
996 totalDuration += duration
997 value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
998 level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
999 float64(read)/1048576.0, float64(write)/1048576.0)
1000 }
1001 value += "-------+------------+---------------+---------------+---------------+---------------\n"
1002 value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
1003 totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(),
1004 float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0)
1005 case p == "compcount":
1006 value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp))
1007 case p == "iostats":
1008 value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
1009 float64(db.s.stor.reads())/1048576.0,
1010 float64(db.s.stor.writes())/1048576.0)
1011 case p == "writedelay":
1012 writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
1013 paused := atomic.LoadInt32(&db.inWritePaused) == 1
1014 value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
1015 case p == "sstables":
1016 for level, tables := range v.levels {
1017 value += fmt.Sprintf("--- level %d ---\n", level)
1018 for _, t := range tables {
1019 value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
1020 }
1021 }
1022 case p == "blockpool":
1023 value = fmt.Sprintf("%v", db.s.tops.blockBuffer)
1024 case p == "cachedblock":
1025 if db.s.tops.blockCache != nil {
1026 value = fmt.Sprintf("%d", db.s.tops.blockCache.Size())
1027 } else {
1028 value = "<nil>"
1029 }
1030 case p == "openedtables":
1031 value = fmt.Sprintf("%d", db.s.tops.fileCache.Size())
1032 case p == "alivesnaps":
1033 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
1034 case p == "aliveiters":
1035 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
1036 default:
1037 err = ErrNotFound
1038 }
1039
1040 return
1041 }
1042
1043
1044 type DBStats struct {
1045 WriteDelayCount int32
1046 WriteDelayDuration time.Duration
1047 WritePaused bool
1048
1049 AliveSnapshots int32
1050 AliveIterators int32
1051
1052 IOWrite uint64
1053 IORead uint64
1054
1055 BlockCacheSize int
1056 OpenedTablesCount int
1057
1058 FileCache cache.Stats
1059 BlockCache cache.Stats
1060
1061 LevelSizes Sizes
1062 LevelTablesCounts []int
1063 LevelRead Sizes
1064 LevelWrite Sizes
1065 LevelDurations []time.Duration
1066
1067 MemComp uint32
1068 Level0Comp uint32
1069 NonLevel0Comp uint32
1070 SeekComp uint32
1071 }
1072
1073
1074 func (db *DB) Stats(s *DBStats) error {
1075 err := db.ok()
1076 if err != nil {
1077 return err
1078 }
1079
1080 s.IORead = db.s.stor.reads()
1081 s.IOWrite = db.s.stor.writes()
1082 s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
1083 s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
1084 s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
1085
1086 s.OpenedTablesCount = db.s.tops.fileCache.Size()
1087 if db.s.tops.blockCache != nil {
1088 s.BlockCacheSize = db.s.tops.blockCache.Size()
1089 } else {
1090 s.BlockCacheSize = 0
1091 }
1092
1093 s.FileCache = db.s.tops.fileCache.GetStats()
1094 if db.s.tops.blockCache != nil {
1095 s.BlockCache = db.s.tops.blockCache.GetStats()
1096 } else {
1097 s.BlockCache = cache.Stats{}
1098 }
1099
1100 s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
1101 s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
1102
1103 s.LevelDurations = s.LevelDurations[:0]
1104 s.LevelRead = s.LevelRead[:0]
1105 s.LevelWrite = s.LevelWrite[:0]
1106 s.LevelSizes = s.LevelSizes[:0]
1107 s.LevelTablesCounts = s.LevelTablesCounts[:0]
1108
1109 v := db.s.version()
1110 defer v.release()
1111
1112 for level, tables := range v.levels {
1113 duration, read, write := db.compStats.getStat(level)
1114
1115 s.LevelDurations = append(s.LevelDurations, duration)
1116 s.LevelRead = append(s.LevelRead, read)
1117 s.LevelWrite = append(s.LevelWrite, write)
1118 s.LevelSizes = append(s.LevelSizes, tables.size())
1119 s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
1120 }
1121 s.MemComp = atomic.LoadUint32(&db.memComp)
1122 s.Level0Comp = atomic.LoadUint32(&db.level0Comp)
1123 s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp)
1124 s.SeekComp = atomic.LoadUint32(&db.seekComp)
1125 return nil
1126 }
1127
1128
1129
1130
1131
1132
1133
1134 func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
1135 if err := db.ok(); err != nil {
1136 return nil, err
1137 }
1138
1139 v := db.s.version()
1140 defer v.release()
1141
1142 sizes := make(Sizes, 0, len(ranges))
1143 for _, r := range ranges {
1144 imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
1145 imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
1146 start, err := v.offsetOf(imin)
1147 if err != nil {
1148 return nil, err
1149 }
1150 limit, err := v.offsetOf(imax)
1151 if err != nil {
1152 return nil, err
1153 }
1154 var size int64
1155 if limit >= start {
1156 size = limit - start
1157 }
1158 sizes = append(sizes, size)
1159 }
1160
1161 return sizes, nil
1162 }
1163
1164
1165
1166
1167
1168
1169
1170 func (db *DB) Close() error {
1171 if !db.setClosed() {
1172 return ErrClosed
1173 }
1174
1175 start := time.Now()
1176 db.log("db@close closing")
1177
1178
1179 runtime.SetFinalizer(db, nil)
1180
1181
1182 var err error
1183 select {
1184 case err = <-db.compErrC:
1185 if err == ErrReadOnly {
1186 err = nil
1187 }
1188 default:
1189 }
1190
1191
1192 close(db.closeC)
1193
1194
1195 if db.tr != nil {
1196 db.tr.Discard()
1197 }
1198
1199
1200 db.writeLockC <- struct{}{}
1201
1202
1203 db.closeW.Wait()
1204
1205
1206 if db.journal != nil {
1207 db.journal.Close()
1208 db.journalWriter.Close()
1209 db.journal = nil
1210 db.journalWriter = nil
1211 }
1212
1213 if db.writeDelayN > 0 {
1214 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
1215 }
1216
1217
1218 db.s.close()
1219 db.logf("db@close done T·%v", time.Since(start))
1220 db.s.release()
1221
1222 if db.closer != nil {
1223 if err1 := db.closer.Close(); err == nil {
1224 err = err1
1225 }
1226 db.closer = nil
1227 }
1228
1229
1230 db.clearMems()
1231
1232 return err
1233 }
1234
View as plain text