1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "fmt"
11 "sync"
12 "sync/atomic"
13 "time"
14
15 "github.com/syndtr/goleveldb/leveldb/errors"
16 "github.com/syndtr/goleveldb/leveldb/opt"
17 "github.com/syndtr/goleveldb/leveldb/storage"
18 )
19
20 var (
21 errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
22 )
23
24 type cStat struct {
25 duration time.Duration
26 read int64
27 write int64
28 }
29
30 func (p *cStat) add(n *cStatStaging) {
31 p.duration += n.duration
32 p.read += n.read
33 p.write += n.write
34 }
35
36 func (p *cStat) get() (duration time.Duration, read, write int64) {
37 return p.duration, p.read, p.write
38 }
39
40 type cStatStaging struct {
41 start time.Time
42 duration time.Duration
43 on bool
44 read int64
45 write int64
46 }
47
48 func (p *cStatStaging) startTimer() {
49 if !p.on {
50 p.start = time.Now()
51 p.on = true
52 }
53 }
54
55 func (p *cStatStaging) stopTimer() {
56 if p.on {
57 p.duration += time.Since(p.start)
58 p.on = false
59 }
60 }
61
62 type cStats struct {
63 lk sync.Mutex
64 stats []cStat
65 }
66
67 func (p *cStats) addStat(level int, n *cStatStaging) {
68 p.lk.Lock()
69 if level >= len(p.stats) {
70 newStats := make([]cStat, level+1)
71 copy(newStats, p.stats)
72 p.stats = newStats
73 }
74 p.stats[level].add(n)
75 p.lk.Unlock()
76 }
77
78 func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
79 p.lk.Lock()
80 defer p.lk.Unlock()
81 if level < len(p.stats) {
82 return p.stats[level].get()
83 }
84 return
85 }
86
87 func (db *DB) compactionError() {
88 var err error
89 noerr:
90
91 for {
92 select {
93 case err = <-db.compErrSetC:
94 switch {
95 case err == nil:
96 case err == ErrReadOnly, errors.IsCorrupted(err):
97 goto hasperr
98 default:
99 goto haserr
100 }
101 case <-db.closeC:
102 return
103 }
104 }
105 haserr:
106
107 for {
108 select {
109 case db.compErrC <- err:
110 case err = <-db.compErrSetC:
111 switch {
112 case err == nil:
113 goto noerr
114 case err == ErrReadOnly, errors.IsCorrupted(err):
115 goto hasperr
116 default:
117 }
118 case <-db.closeC:
119 return
120 }
121 }
122 hasperr:
123
124 for {
125 select {
126 case db.compErrC <- err:
127 case db.compPerErrC <- err:
128 case db.writeLockC <- struct{}{}:
129
130 db.compWriteLocking = true
131 case <-db.closeC:
132 if db.compWriteLocking {
133
134 <-db.writeLockC
135 }
136 return
137 }
138 }
139 }
140
141 type compactionTransactCounter int
142
143 func (cnt *compactionTransactCounter) incr() {
144 *cnt++
145 }
146
147 type compactionTransactInterface interface {
148 run(cnt *compactionTransactCounter) error
149 revert() error
150 }
151
152 func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
153 defer func() {
154 if x := recover(); x != nil {
155 if x == errCompactionTransactExiting {
156 if err := t.revert(); err != nil {
157 db.logf("%s revert error %q", name, err)
158 }
159 }
160 panic(x)
161 }
162 }()
163
164 const (
165 backoffMin = 1 * time.Second
166 backoffMax = 8 * time.Second
167 backoffMul = 2 * time.Second
168 )
169 var (
170 backoff = backoffMin
171 backoffT = time.NewTimer(backoff)
172 lastCnt = compactionTransactCounter(0)
173
174 disableBackoff = db.s.o.GetDisableCompactionBackoff()
175 )
176 for n := 0; ; n++ {
177
178 if db.isClosed() {
179 db.logf("%s exiting", name)
180 db.compactionExitTransact()
181 } else if n > 0 {
182 db.logf("%s retrying N·%d", name, n)
183 }
184
185
186 cnt := compactionTransactCounter(0)
187 err := t.run(&cnt)
188 if err != nil {
189 db.logf("%s error I·%d %q", name, cnt, err)
190 }
191
192
193 select {
194 case db.compErrSetC <- err:
195 case perr := <-db.compPerErrC:
196 if err != nil {
197 db.logf("%s exiting (persistent error %q)", name, perr)
198 db.compactionExitTransact()
199 }
200 case <-db.closeC:
201 db.logf("%s exiting", name)
202 db.compactionExitTransact()
203 }
204 if err == nil {
205 return
206 }
207 if errors.IsCorrupted(err) {
208 db.logf("%s exiting (corruption detected)", name)
209 db.compactionExitTransact()
210 }
211
212 if !disableBackoff {
213
214 if cnt > lastCnt {
215 backoff = backoffMin
216 lastCnt = cnt
217 }
218
219
220 backoffT.Reset(backoff)
221 if backoff < backoffMax {
222 backoff *= backoffMul
223 if backoff > backoffMax {
224 backoff = backoffMax
225 }
226 }
227 select {
228 case <-backoffT.C:
229 case <-db.closeC:
230 db.logf("%s exiting", name)
231 db.compactionExitTransact()
232 }
233 }
234 }
235 }
236
237 type compactionTransactFunc struct {
238 runFunc func(cnt *compactionTransactCounter) error
239 revertFunc func() error
240 }
241
242 func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
243 return t.runFunc(cnt)
244 }
245
246 func (t *compactionTransactFunc) revert() error {
247 if t.revertFunc != nil {
248 return t.revertFunc()
249 }
250 return nil
251 }
252
253 func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
254 db.compactionTransact(name, &compactionTransactFunc{run, revert})
255 }
256
257 func (db *DB) compactionExitTransact() {
258 panic(errCompactionTransactExiting)
259 }
260
261 func (db *DB) compactionCommit(name string, rec *sessionRecord) {
262 db.compCommitLk.Lock()
263 defer db.compCommitLk.Unlock()
264 db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
265 return db.s.commit(rec, true)
266 }, nil)
267 }
268
269 func (db *DB) memCompaction() {
270 mdb := db.getFrozenMem()
271 if mdb == nil {
272 return
273 }
274 defer mdb.decref()
275
276 db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(int64(mdb.Size())))
277
278
279 if mdb.Len() == 0 {
280 db.logf("memdb@flush skipping")
281
282 db.dropFrozenMem()
283 return
284 }
285
286
287 resumeC := make(chan struct{})
288 select {
289 case db.tcompPauseC <- (chan<- struct{})(resumeC):
290 case <-db.compPerErrC:
291 close(resumeC)
292 resumeC = nil
293 case <-db.closeC:
294 db.compactionExitTransact()
295 }
296
297 var (
298 rec = &sessionRecord{}
299 stats = &cStatStaging{}
300 flushLevel int
301 )
302
303
304 db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
305 stats.startTimer()
306 flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
307 stats.stopTimer()
308 return
309 }, func() error {
310 for _, r := range rec.addedTables {
311 db.logf("memdb@flush revert @%d", r.num)
312 if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
313 return err
314 }
315 }
316 return nil
317 })
318
319 rec.setJournalNum(db.journalFd.Num)
320 rec.setSeqNum(db.frozenSeq)
321
322
323 stats.startTimer()
324 db.compactionCommit("memdb", rec)
325 stats.stopTimer()
326
327 db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
328
329
330 for _, r := range rec.addedTables {
331 stats.write += r.size
332 }
333 db.compStats.addStat(flushLevel, stats)
334 atomic.AddUint32(&db.memComp, 1)
335
336
337 db.dropFrozenMem()
338
339
340 if resumeC != nil {
341 select {
342 case <-resumeC:
343 close(resumeC)
344 case <-db.closeC:
345 db.compactionExitTransact()
346 }
347 }
348
349
350 db.compTrigger(db.tcompCmdC)
351 }
352
353 type tableCompactionBuilder struct {
354 db *DB
355 s *session
356 c *compaction
357 rec *sessionRecord
358 stat1 *cStatStaging
359
360 snapHasLastUkey bool
361 snapLastUkey []byte
362 snapLastSeq uint64
363 snapIter int
364 snapKerrCnt int
365 snapDropCnt int
366
367 kerrCnt int
368 dropCnt int
369
370 minSeq uint64
371 strict bool
372 tableSize int
373
374 tw *tWriter
375 }
376
377 func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
378
379 if b.tw == nil {
380
381 if b.db != nil {
382 select {
383 case ch := <-b.db.tcompPauseC:
384 b.db.pauseCompaction(ch)
385 case <-b.db.closeC:
386 b.db.compactionExitTransact()
387 default:
388 }
389 }
390
391
392 var err error
393 b.tw, err = b.s.tops.create(b.tableSize)
394 if err != nil {
395 return err
396 }
397 }
398
399
400 return b.tw.append(key, value)
401 }
402
403 func (b *tableCompactionBuilder) needFlush() bool {
404 return b.tw.tw.BytesLen() >= b.tableSize
405 }
406
407 func (b *tableCompactionBuilder) flush() error {
408 t, err := b.tw.finish()
409 if err != nil {
410 return err
411 }
412 b.rec.addTableFile(b.c.sourceLevel+1, t)
413 b.stat1.write += t.size
414 b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(t.size), t.imin, t.imax)
415 b.tw = nil
416 return nil
417 }
418
419 func (b *tableCompactionBuilder) cleanup() error {
420 if b.tw != nil {
421 if err := b.tw.drop(); err != nil {
422 return err
423 }
424 b.tw = nil
425 }
426 return nil
427 }
428
429 func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) {
430 snapResumed := b.snapIter > 0
431 hasLastUkey := b.snapHasLastUkey
432 lastUkey := append([]byte(nil), b.snapLastUkey...)
433 lastSeq := b.snapLastSeq
434 b.kerrCnt = b.snapKerrCnt
435 b.dropCnt = b.snapDropCnt
436
437 b.c.restore()
438
439 defer func() {
440 if cerr := b.cleanup(); cerr != nil {
441 if err == nil {
442 err = cerr
443 } else {
444 err = fmt.Errorf("tableCompactionBuilder error: %v, cleanup error (%v)", err, cerr)
445 }
446 }
447 }()
448
449 b.stat1.startTimer()
450 defer b.stat1.stopTimer()
451
452 iter := b.c.newIterator()
453 defer iter.Release()
454 for i := 0; iter.Next(); i++ {
455
456 cnt.incr()
457
458
459 if i < b.snapIter {
460 continue
461 }
462
463 resumed := false
464 if snapResumed {
465 resumed = true
466 snapResumed = false
467 }
468
469 ikey := iter.Key()
470 ukey, seq, kt, kerr := parseInternalKey(ikey)
471
472 if kerr == nil {
473 shouldStop := !resumed && b.c.shouldStopBefore(ikey)
474
475 if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
476
477
478
479 if b.tw != nil && (shouldStop || b.needFlush()) {
480 if err := b.flush(); err != nil {
481 return err
482 }
483
484
485 b.c.save()
486 b.snapHasLastUkey = hasLastUkey
487 b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
488 b.snapLastSeq = lastSeq
489 b.snapIter = i
490 b.snapKerrCnt = b.kerrCnt
491 b.snapDropCnt = b.dropCnt
492 }
493
494 hasLastUkey = true
495 lastUkey = append(lastUkey[:0], ukey...)
496 lastSeq = keyMaxSeq
497 }
498
499 switch {
500 case lastSeq <= b.minSeq:
501
502 fallthrough
503 case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
504
505
506
507
508
509
510
511 lastSeq = seq
512 b.dropCnt++
513 continue
514 default:
515 lastSeq = seq
516 }
517 } else {
518 if b.strict {
519 return kerr
520 }
521
522
523 hasLastUkey = false
524 lastUkey = lastUkey[:0]
525 lastSeq = keyMaxSeq
526 b.kerrCnt++
527 }
528
529 if err := b.appendKV(ikey, iter.Value()); err != nil {
530 return err
531 }
532 }
533
534 if err := iter.Error(); err != nil {
535 return err
536 }
537
538
539 if b.tw != nil && !b.tw.empty() {
540 return b.flush()
541 }
542 return nil
543 }
544
545 func (b *tableCompactionBuilder) revert() error {
546 for _, at := range b.rec.addedTables {
547 b.s.logf("table@build revert @%d", at.num)
548 if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
549 return err
550 }
551 }
552 return nil
553 }
554
555 func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
556 defer c.release()
557
558 rec := &sessionRecord{}
559 rec.addCompPtr(c.sourceLevel, c.imax)
560
561 if !noTrivial && c.trivial() {
562 t := c.levels[0][0]
563 db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
564 rec.delTable(c.sourceLevel, t.fd.Num)
565 rec.addTableFile(c.sourceLevel+1, t)
566 db.compactionCommit("table-move", rec)
567 return
568 }
569
570 var stats [2]cStatStaging
571 for i, tables := range c.levels {
572 for _, t := range tables {
573 stats[i].read += t.size
574
575 rec.delTable(c.sourceLevel+i, t.fd.Num)
576 }
577 }
578 sourceSize := stats[0].read + stats[1].read
579 minSeq := db.minSeq()
580 db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
581
582 b := &tableCompactionBuilder{
583 db: db,
584 s: db.s,
585 c: c,
586 rec: rec,
587 stat1: &stats[1],
588 minSeq: minSeq,
589 strict: db.s.o.GetStrict(opt.StrictCompaction),
590 tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
591 }
592 db.compactionTransact("table@build", b)
593
594
595 stats[1].startTimer()
596 db.compactionCommit("table", rec)
597 stats[1].stopTimer()
598
599 resultSize := stats[1].write
600 db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
601
602
603 for i := range stats {
604 db.compStats.addStat(c.sourceLevel+1, &stats[i])
605 }
606 switch c.typ {
607 case level0Compaction:
608 atomic.AddUint32(&db.level0Comp, 1)
609 case nonLevel0Compaction:
610 atomic.AddUint32(&db.nonLevel0Comp, 1)
611 case seekCompaction:
612 atomic.AddUint32(&db.seekComp, 1)
613 }
614 }
615
616 func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
617 db.logf("table@compaction range L%d %q:%q", level, umin, umax)
618 if level >= 0 {
619 if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
620 db.tableCompaction(c, true)
621 }
622 } else {
623
624 for {
625 compacted := false
626
627
628 v := db.s.version()
629 m := 1
630 for i := m; i < len(v.levels); i++ {
631 tables := v.levels[i]
632 if tables.overlaps(db.s.icmp, umin, umax, false) {
633 m = i
634 }
635 }
636 v.release()
637
638 for level := 0; level < m; level++ {
639 if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
640 db.tableCompaction(c, true)
641 compacted = true
642 }
643 }
644
645 if !compacted {
646 break
647 }
648 }
649 }
650
651 return nil
652 }
653
654 func (db *DB) tableAutoCompaction() {
655 if c := db.s.pickCompaction(); c != nil {
656 db.tableCompaction(c, false)
657 }
658 }
659
660 func (db *DB) tableNeedCompaction() bool {
661 v := db.s.version()
662 defer v.release()
663 return v.needCompaction()
664 }
665
666
667 func (db *DB) resumeWrite() bool {
668 v := db.s.version()
669 defer v.release()
670 return v.tLen(0) < db.s.o.GetWriteL0PauseTrigger()
671 }
672
673 func (db *DB) pauseCompaction(ch chan<- struct{}) {
674 select {
675 case ch <- struct{}{}:
676 case <-db.closeC:
677 db.compactionExitTransact()
678 }
679 }
680
681 type cCmd interface {
682 ack(err error)
683 }
684
685 type cAuto struct {
686
687 ackC chan<- error
688 }
689
690 func (r cAuto) ack(err error) {
691 if r.ackC != nil {
692 defer func() {
693 _ = recover()
694 }()
695 r.ackC <- err
696 }
697 }
698
699 type cRange struct {
700 level int
701 min, max []byte
702 ackC chan<- error
703 }
704
705 func (r cRange) ack(err error) {
706 if r.ackC != nil {
707 defer func() {
708 _ = recover()
709 }()
710 r.ackC <- err
711 }
712 }
713
714
715 func (db *DB) compTrigger(compC chan<- cCmd) {
716 select {
717 case compC <- cAuto{}:
718 default:
719 }
720 }
721
722
723 func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
724 ch := make(chan error)
725 defer close(ch)
726
727 select {
728 case compC <- cAuto{ch}:
729 case err = <-db.compErrC:
730 return
731 case <-db.closeC:
732 return ErrClosed
733 }
734
735 select {
736 case err = <-ch:
737 case err = <-db.compErrC:
738 case <-db.closeC:
739 return ErrClosed
740 }
741 return err
742 }
743
744
745 func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
746 ch := make(chan error)
747 defer close(ch)
748
749 select {
750 case compC <- cRange{level, min, max, ch}:
751 case err := <-db.compErrC:
752 return err
753 case <-db.closeC:
754 return ErrClosed
755 }
756
757 select {
758 case err = <-ch:
759 case err = <-db.compErrC:
760 case <-db.closeC:
761 return ErrClosed
762 }
763 return err
764 }
765
766 func (db *DB) mCompaction() {
767 var x cCmd
768
769 defer func() {
770 if x := recover(); x != nil {
771 if x != errCompactionTransactExiting {
772 panic(x)
773 }
774 }
775 if x != nil {
776 x.ack(ErrClosed)
777 }
778 db.closeW.Done()
779 }()
780
781 for {
782 select {
783 case x = <-db.mcompCmdC:
784 switch x.(type) {
785 case cAuto:
786 db.memCompaction()
787 x.ack(nil)
788 x = nil
789 default:
790 panic("leveldb: unknown command")
791 }
792 case <-db.closeC:
793 return
794 }
795 }
796 }
797
798 func (db *DB) tCompaction() {
799 var (
800 x cCmd
801 waitQ []cCmd
802 )
803
804 defer func() {
805 if x := recover(); x != nil {
806 if x != errCompactionTransactExiting {
807 panic(x)
808 }
809 }
810 for i := range waitQ {
811 waitQ[i].ack(ErrClosed)
812 waitQ[i] = nil
813 }
814 if x != nil {
815 x.ack(ErrClosed)
816 }
817 db.closeW.Done()
818 }()
819
820 for {
821 if db.tableNeedCompaction() {
822 select {
823 case x = <-db.tcompCmdC:
824 case ch := <-db.tcompPauseC:
825 db.pauseCompaction(ch)
826 continue
827 case <-db.closeC:
828 return
829 default:
830 }
831
832 if len(waitQ) > 0 && db.resumeWrite() {
833 for i := range waitQ {
834 waitQ[i].ack(nil)
835 waitQ[i] = nil
836 }
837 waitQ = waitQ[:0]
838 }
839 } else {
840 for i := range waitQ {
841 waitQ[i].ack(nil)
842 waitQ[i] = nil
843 }
844 waitQ = waitQ[:0]
845 select {
846 case x = <-db.tcompCmdC:
847 case ch := <-db.tcompPauseC:
848 db.pauseCompaction(ch)
849 continue
850 case <-db.closeC:
851 return
852 }
853 }
854 if x != nil {
855 switch cmd := x.(type) {
856 case cAuto:
857 if cmd.ackC != nil {
858
859 if db.resumeWrite() {
860 x.ack(nil)
861 } else {
862 waitQ = append(waitQ, x)
863 }
864 }
865 case cRange:
866 x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
867 default:
868 panic("leveldb: unknown command")
869 }
870 x = nil
871 }
872 db.tableAutoCompaction()
873 }
874 }
875
View as plain text