1 package bbolt
2
3 import (
4 "fmt"
5 "io"
6 "os"
7 "sort"
8 "strings"
9 "sync/atomic"
10 "time"
11 "unsafe"
12 )
13
14
15 type txid uint64
16
17
18
19
20
21
22
23
24
25 type Tx struct {
26 writable bool
27 managed bool
28 db *DB
29 meta *meta
30 root Bucket
31 pages map[pgid]*page
32 stats TxStats
33 commitHandlers []func()
34
35
36
37
38
39
40
41 WriteFlag int
42 }
43
44
45 func (tx *Tx) init(db *DB) {
46 tx.db = db
47 tx.pages = nil
48
49
50 tx.meta = &meta{}
51 db.meta().copy(tx.meta)
52
53
54 tx.root = newBucket(tx)
55 tx.root.bucket = &bucket{}
56 *tx.root.bucket = tx.meta.root
57
58
59 if tx.writable {
60 tx.pages = make(map[pgid]*page)
61 tx.meta.txid += txid(1)
62 }
63 }
64
65
66 func (tx *Tx) ID() int {
67 return int(tx.meta.txid)
68 }
69
70
71 func (tx *Tx) DB() *DB {
72 return tx.db
73 }
74
75
76 func (tx *Tx) Size() int64 {
77 return int64(tx.meta.pgid) * int64(tx.db.pageSize)
78 }
79
80
81 func (tx *Tx) Writable() bool {
82 return tx.writable
83 }
84
85
86
87
88
89 func (tx *Tx) Cursor() *Cursor {
90 return tx.root.Cursor()
91 }
92
93
94 func (tx *Tx) Stats() TxStats {
95 return tx.stats
96 }
97
98
99
100
101 func (tx *Tx) Bucket(name []byte) *Bucket {
102 return tx.root.Bucket(name)
103 }
104
105
106
107
108 func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
109 return tx.root.CreateBucket(name)
110 }
111
112
113
114
115 func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
116 return tx.root.CreateBucketIfNotExists(name)
117 }
118
119
120
121 func (tx *Tx) DeleteBucket(name []byte) error {
122 return tx.root.DeleteBucket(name)
123 }
124
125
126
127
128 func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
129 return tx.root.ForEach(func(k, v []byte) error {
130 return fn(k, tx.root.Bucket(k))
131 })
132 }
133
134
135 func (tx *Tx) OnCommit(fn func()) {
136 tx.commitHandlers = append(tx.commitHandlers, fn)
137 }
138
139
140
141
142 func (tx *Tx) Commit() error {
143 _assert(!tx.managed, "managed tx commit not allowed")
144 if tx.db == nil {
145 return ErrTxClosed
146 } else if !tx.writable {
147 return ErrTxNotWritable
148 }
149
150
151
152
153 var startTime = time.Now()
154 tx.root.rebalance()
155 if tx.stats.GetRebalance() > 0 {
156 tx.stats.IncRebalanceTime(time.Since(startTime))
157 }
158
159 opgid := tx.meta.pgid
160
161
162 startTime = time.Now()
163 if err := tx.root.spill(); err != nil {
164 tx.rollback()
165 return err
166 }
167 tx.stats.IncSpillTime(time.Since(startTime))
168
169
170 tx.meta.root.root = tx.root.root
171
172
173 if tx.meta.freelist != pgidNoFreelist {
174 tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
175 }
176
177 if !tx.db.NoFreelistSync {
178 err := tx.commitFreelist()
179 if err != nil {
180 return err
181 }
182 } else {
183 tx.meta.freelist = pgidNoFreelist
184 }
185
186
187 if tx.meta.pgid > opgid {
188 if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
189 tx.rollback()
190 return err
191 }
192 }
193
194
195 startTime = time.Now()
196 if err := tx.write(); err != nil {
197 tx.rollback()
198 return err
199 }
200
201
202 if tx.db.StrictMode {
203 ch := tx.Check()
204 var errs []string
205 for {
206 err, ok := <-ch
207 if !ok {
208 break
209 }
210 errs = append(errs, err.Error())
211 }
212 if len(errs) > 0 {
213 panic("check fail: " + strings.Join(errs, "\n"))
214 }
215 }
216
217
218 if err := tx.writeMeta(); err != nil {
219 tx.rollback()
220 return err
221 }
222 tx.stats.IncWriteTime(time.Since(startTime))
223
224
225 tx.close()
226
227
228 for _, fn := range tx.commitHandlers {
229 fn()
230 }
231
232 return nil
233 }
234
235 func (tx *Tx) commitFreelist() error {
236
237
238 p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
239 if err != nil {
240 tx.rollback()
241 return err
242 }
243 if err := tx.db.freelist.write(p); err != nil {
244 tx.rollback()
245 return err
246 }
247 tx.meta.freelist = p.id
248
249 return nil
250 }
251
252
253
254 func (tx *Tx) Rollback() error {
255 _assert(!tx.managed, "managed tx rollback not allowed")
256 if tx.db == nil {
257 return ErrTxClosed
258 }
259 tx.nonPhysicalRollback()
260 return nil
261 }
262
263
264 func (tx *Tx) nonPhysicalRollback() {
265 if tx.db == nil {
266 return
267 }
268 if tx.writable {
269 tx.db.freelist.rollback(tx.meta.txid)
270 }
271 tx.close()
272 }
273
274
275 func (tx *Tx) rollback() {
276 if tx.db == nil {
277 return
278 }
279 if tx.writable {
280 tx.db.freelist.rollback(tx.meta.txid)
281
282
283 if tx.db.data != nil {
284 if !tx.db.hasSyncedFreelist() {
285
286
287 tx.db.freelist.noSyncReload(tx.db.freepages())
288 } else {
289
290 tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
291 }
292 }
293 }
294 tx.close()
295 }
296
297 func (tx *Tx) close() {
298 if tx.db == nil {
299 return
300 }
301 if tx.writable {
302
303 var freelistFreeN = tx.db.freelist.free_count()
304 var freelistPendingN = tx.db.freelist.pending_count()
305 var freelistAlloc = tx.db.freelist.size()
306
307
308 tx.db.rwtx = nil
309 tx.db.rwlock.Unlock()
310
311
312 tx.db.statlock.Lock()
313 tx.db.stats.FreePageN = freelistFreeN
314 tx.db.stats.PendingPageN = freelistPendingN
315 tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
316 tx.db.stats.FreelistInuse = freelistAlloc
317 tx.db.stats.TxStats.add(&tx.stats)
318 tx.db.statlock.Unlock()
319 } else {
320 tx.db.removeTx(tx)
321 }
322
323
324 tx.db = nil
325 tx.meta = nil
326 tx.root = Bucket{tx: tx}
327 tx.pages = nil
328 }
329
330
331
332
333
334 func (tx *Tx) Copy(w io.Writer) error {
335 _, err := tx.WriteTo(w)
336 return err
337 }
338
339
340
341 func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
342
343 f, err := tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
344 if err != nil {
345 return 0, err
346 }
347 defer func() {
348 if cerr := f.Close(); err == nil {
349 err = cerr
350 }
351 }()
352
353
354 buf := make([]byte, tx.db.pageSize)
355 page := (*page)(unsafe.Pointer(&buf[0]))
356 page.flags = metaPageFlag
357 *page.meta() = *tx.meta
358
359
360 page.id = 0
361 page.meta().checksum = page.meta().sum64()
362 nn, err := w.Write(buf)
363 n += int64(nn)
364 if err != nil {
365 return n, fmt.Errorf("meta 0 copy: %s", err)
366 }
367
368
369 page.id = 1
370 page.meta().txid -= 1
371 page.meta().checksum = page.meta().sum64()
372 nn, err = w.Write(buf)
373 n += int64(nn)
374 if err != nil {
375 return n, fmt.Errorf("meta 1 copy: %s", err)
376 }
377
378
379 if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil {
380 return n, fmt.Errorf("seek: %s", err)
381 }
382
383
384 wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
385 n += wn
386 if err != nil {
387 return n, err
388 }
389
390 return n, nil
391 }
392
393
394
395
396 func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
397 f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
398 if err != nil {
399 return err
400 }
401
402 _, err = tx.WriteTo(f)
403 if err != nil {
404 _ = f.Close()
405 return err
406 }
407 return f.Close()
408 }
409
410
411 func (tx *Tx) allocate(count int) (*page, error) {
412 p, err := tx.db.allocate(tx.meta.txid, count)
413 if err != nil {
414 return nil, err
415 }
416
417
418 tx.pages[p.id] = p
419
420
421 tx.stats.IncPageCount(int64(count))
422 tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))
423
424 return p, nil
425 }
426
427
428 func (tx *Tx) write() error {
429
430 pages := make(pages, 0, len(tx.pages))
431 for _, p := range tx.pages {
432 pages = append(pages, p)
433 }
434
435 tx.pages = make(map[pgid]*page)
436 sort.Sort(pages)
437
438
439 for _, p := range pages {
440 rem := (uint64(p.overflow) + 1) * uint64(tx.db.pageSize)
441 offset := int64(p.id) * int64(tx.db.pageSize)
442 var written uintptr
443
444
445 for {
446 sz := rem
447 if sz > maxAllocSize-1 {
448 sz = maxAllocSize - 1
449 }
450 buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
451
452 if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
453 return err
454 }
455
456
457 tx.stats.IncWrite(1)
458
459
460 rem -= sz
461 if rem == 0 {
462 break
463 }
464
465
466 offset += int64(sz)
467 written += uintptr(sz)
468 }
469 }
470
471
472 if !tx.db.NoSync || IgnoreNoSync {
473 if err := fdatasync(tx.db); err != nil {
474 return err
475 }
476 }
477
478
479 for _, p := range pages {
480
481
482 if int(p.overflow) != 0 {
483 continue
484 }
485
486 buf := unsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)
487
488
489 for i := range buf {
490 buf[i] = 0
491 }
492 tx.db.pagePool.Put(buf)
493 }
494
495 return nil
496 }
497
498
499 func (tx *Tx) writeMeta() error {
500
501 buf := make([]byte, tx.db.pageSize)
502 p := tx.db.pageInBuffer(buf, 0)
503 tx.meta.write(p)
504
505
506 if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
507 return err
508 }
509 if !tx.db.NoSync || IgnoreNoSync {
510 if err := fdatasync(tx.db); err != nil {
511 return err
512 }
513 }
514
515
516 tx.stats.IncWrite(1)
517
518 return nil
519 }
520
521
522
523 func (tx *Tx) page(id pgid) *page {
524
525 if tx.pages != nil {
526 if p, ok := tx.pages[id]; ok {
527 p.fastCheck(id)
528 return p
529 }
530 }
531
532
533 p := tx.db.page(id)
534 p.fastCheck(id)
535 return p
536 }
537
538
539 func (tx *Tx) forEachPage(pgidnum pgid, fn func(*page, int, []pgid)) {
540 stack := make([]pgid, 10)
541 stack[0] = pgidnum
542 tx.forEachPageInternal(stack[:1], fn)
543 }
544
545 func (tx *Tx) forEachPageInternal(pgidstack []pgid, fn func(*page, int, []pgid)) {
546 p := tx.page(pgidstack[len(pgidstack)-1])
547
548
549 fn(p, len(pgidstack)-1, pgidstack)
550
551
552 if (p.flags & branchPageFlag) != 0 {
553 for i := 0; i < int(p.count); i++ {
554 elem := p.branchPageElement(uint16(i))
555 tx.forEachPageInternal(append(pgidstack, elem.pgid), fn)
556 }
557 }
558 }
559
560
561
562 func (tx *Tx) Page(id int) (*PageInfo, error) {
563 if tx.db == nil {
564 return nil, ErrTxClosed
565 } else if pgid(id) >= tx.meta.pgid {
566 return nil, nil
567 }
568
569 if tx.db.freelist == nil {
570 return nil, ErrFreePagesNotLoaded
571 }
572
573
574 p := tx.db.page(pgid(id))
575 info := &PageInfo{
576 ID: id,
577 Count: int(p.count),
578 OverflowCount: int(p.overflow),
579 }
580
581
582 if tx.db.freelist.freed(pgid(id)) {
583 info.Type = "free"
584 } else {
585 info.Type = p.typ()
586 }
587
588 return info, nil
589 }
590
591
592 type TxStats struct {
593
594
595
596 PageCount int64
597
598 PageAlloc int64
599
600
601
602
603 CursorCount int64
604
605
606
607
608 NodeCount int64
609
610 NodeDeref int64
611
612
613
614
615 Rebalance int64
616
617 RebalanceTime time.Duration
618
619
620
621
622 Split int64
623
624 Spill int64
625
626 SpillTime time.Duration
627
628
629
630
631 Write int64
632
633 WriteTime time.Duration
634 }
635
636 func (s *TxStats) add(other *TxStats) {
637 s.IncPageCount(other.GetPageCount())
638 s.IncPageAlloc(other.GetPageAlloc())
639 s.IncCursorCount(other.GetCursorCount())
640 s.IncNodeCount(other.GetNodeCount())
641 s.IncNodeDeref(other.GetNodeDeref())
642 s.IncRebalance(other.GetRebalance())
643 s.IncRebalanceTime(other.GetRebalanceTime())
644 s.IncSplit(other.GetSplit())
645 s.IncSpill(other.GetSpill())
646 s.IncSpillTime(other.GetSpillTime())
647 s.IncWrite(other.GetWrite())
648 s.IncWriteTime(other.GetWriteTime())
649 }
650
651
652
653
654 func (s *TxStats) Sub(other *TxStats) TxStats {
655 var diff TxStats
656 diff.PageCount = s.GetPageCount() - other.GetPageCount()
657 diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
658 diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
659 diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
660 diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
661 diff.Rebalance = s.GetRebalance() - other.GetRebalance()
662 diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
663 diff.Split = s.GetSplit() - other.GetSplit()
664 diff.Spill = s.GetSpill() - other.GetSpill()
665 diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
666 diff.Write = s.GetWrite() - other.GetWrite()
667 diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
668 return diff
669 }
670
671
672 func (s *TxStats) GetPageCount() int64 {
673 return atomic.LoadInt64(&s.PageCount)
674 }
675
676
677 func (s *TxStats) IncPageCount(delta int64) int64 {
678 return atomic.AddInt64(&s.PageCount, delta)
679 }
680
681
682 func (s *TxStats) GetPageAlloc() int64 {
683 return atomic.LoadInt64(&s.PageAlloc)
684 }
685
686
687 func (s *TxStats) IncPageAlloc(delta int64) int64 {
688 return atomic.AddInt64(&s.PageAlloc, delta)
689 }
690
691
692 func (s *TxStats) GetCursorCount() int64 {
693 return atomic.LoadInt64(&s.CursorCount)
694 }
695
696
697 func (s *TxStats) IncCursorCount(delta int64) int64 {
698 return atomic.AddInt64(&s.CursorCount, delta)
699 }
700
701
702 func (s *TxStats) GetNodeCount() int64 {
703 return atomic.LoadInt64(&s.NodeCount)
704 }
705
706
707 func (s *TxStats) IncNodeCount(delta int64) int64 {
708 return atomic.AddInt64(&s.NodeCount, delta)
709 }
710
711
712 func (s *TxStats) GetNodeDeref() int64 {
713 return atomic.LoadInt64(&s.NodeDeref)
714 }
715
716
717 func (s *TxStats) IncNodeDeref(delta int64) int64 {
718 return atomic.AddInt64(&s.NodeDeref, delta)
719 }
720
721
722 func (s *TxStats) GetRebalance() int64 {
723 return atomic.LoadInt64(&s.Rebalance)
724 }
725
726
727 func (s *TxStats) IncRebalance(delta int64) int64 {
728 return atomic.AddInt64(&s.Rebalance, delta)
729 }
730
731
732 func (s *TxStats) GetRebalanceTime() time.Duration {
733 return atomicLoadDuration(&s.RebalanceTime)
734 }
735
736
737 func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
738 return atomicAddDuration(&s.RebalanceTime, delta)
739 }
740
741
742 func (s *TxStats) GetSplit() int64 {
743 return atomic.LoadInt64(&s.Split)
744 }
745
746
747 func (s *TxStats) IncSplit(delta int64) int64 {
748 return atomic.AddInt64(&s.Split, delta)
749 }
750
751
752 func (s *TxStats) GetSpill() int64 {
753 return atomic.LoadInt64(&s.Spill)
754 }
755
756
757 func (s *TxStats) IncSpill(delta int64) int64 {
758 return atomic.AddInt64(&s.Spill, delta)
759 }
760
761
762 func (s *TxStats) GetSpillTime() time.Duration {
763 return atomicLoadDuration(&s.SpillTime)
764 }
765
766
767 func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
768 return atomicAddDuration(&s.SpillTime, delta)
769 }
770
771
772 func (s *TxStats) GetWrite() int64 {
773 return atomic.LoadInt64(&s.Write)
774 }
775
776
777 func (s *TxStats) IncWrite(delta int64) int64 {
778 return atomic.AddInt64(&s.Write, delta)
779 }
780
781
782 func (s *TxStats) GetWriteTime() time.Duration {
783 return atomicLoadDuration(&s.WriteTime)
784 }
785
786
787 func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
788 return atomicAddDuration(&s.WriteTime, delta)
789 }
790
791 func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
792 return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
793 }
794
795 func atomicLoadDuration(ptr *time.Duration) time.Duration {
796 return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
797 }
798
View as plain text