1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package backend
16
17 import (
18 "fmt"
19 "hash/crc32"
20 "io"
21 "io/ioutil"
22 "os"
23 "path/filepath"
24 "sync"
25 "sync/atomic"
26 "time"
27
28 humanize "github.com/dustin/go-humanize"
29 bolt "go.etcd.io/bbolt"
30 "go.uber.org/zap"
31 )
32
33 var (
34 defaultBatchLimit = 10000
35 defaultBatchInterval = 100 * time.Millisecond
36
37 defragLimit = 10000
38
39
40
41
42 initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
43
44
45 minSnapshotWarningTimeout = 30 * time.Second
46 )
47
48 type Backend interface {
49
50 ReadTx() ReadTx
51 BatchTx() BatchTx
52
53 ConcurrentReadTx() ReadTx
54
55 Snapshot() Snapshot
56 Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
57
58
59
60
61 Size() int64
62
63
64
65 SizeInUse() int64
66
67 OpenReadTxN() int64
68 Defrag() error
69 ForceCommit()
70 Close() error
71
72
73 SetTxPostLockInsideApplyHook(func())
74 }
75
76 type Snapshot interface {
77
78 Size() int64
79
80 WriteTo(w io.Writer) (n int64, err error)
81
82 Close() error
83 }
84
85 type txReadBufferCache struct {
86 mu sync.Mutex
87 buf *txReadBuffer
88 bufVersion uint64
89 }
90
91 type backend struct {
92
93
94
95
96 size int64
97
98 sizeInUse int64
99
100 commits int64
101
102 openReadTxN int64
103
104 mlock bool
105
106 mu sync.RWMutex
107 bopts *bolt.Options
108 db *bolt.DB
109
110 batchInterval time.Duration
111 batchLimit int
112 batchTx *batchTxBuffered
113
114 readTx *readTx
115
116
117
118
119 txReadBufferCache txReadBufferCache
120
121 stopc chan struct{}
122 donec chan struct{}
123
124 hooks Hooks
125
126
127 txPostLockInsideApplyHook func()
128
129 lg *zap.Logger
130 }
131
132 type BackendConfig struct {
133
134 Path string
135
136 BatchInterval time.Duration
137
138 BatchLimit int
139
140 BackendFreelistType bolt.FreelistType
141
142 MmapSize uint64
143
144 Logger *zap.Logger
145
146 UnsafeNoFsync bool `json:"unsafe-no-fsync"`
147
148 Mlock bool
149
150
151 Hooks Hooks
152 }
153
154 func DefaultBackendConfig() BackendConfig {
155 return BackendConfig{
156 BatchInterval: defaultBatchInterval,
157 BatchLimit: defaultBatchLimit,
158 MmapSize: initialMmapSize,
159 }
160 }
161
162 func New(bcfg BackendConfig) Backend {
163 return newBackend(bcfg)
164 }
165
166 func NewDefaultBackend(path string) Backend {
167 bcfg := DefaultBackendConfig()
168 bcfg.Path = path
169 return newBackend(bcfg)
170 }
171
172 func newBackend(bcfg BackendConfig) *backend {
173 if bcfg.Logger == nil {
174 bcfg.Logger = zap.NewNop()
175 }
176
177 bopts := &bolt.Options{}
178 if boltOpenOptions != nil {
179 *bopts = *boltOpenOptions
180 }
181 bopts.InitialMmapSize = bcfg.mmapSize()
182 bopts.FreelistType = bcfg.BackendFreelistType
183 bopts.NoSync = bcfg.UnsafeNoFsync
184 bopts.NoGrowSync = bcfg.UnsafeNoFsync
185 bopts.Mlock = bcfg.Mlock
186
187 db, err := bolt.Open(bcfg.Path, 0600, bopts)
188 if err != nil {
189 bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
190 }
191
192
193
194 b := &backend{
195 bopts: bopts,
196 db: db,
197
198 batchInterval: bcfg.BatchInterval,
199 batchLimit: bcfg.BatchLimit,
200 mlock: bcfg.Mlock,
201
202 readTx: &readTx{
203 baseReadTx: baseReadTx{
204 buf: txReadBuffer{
205 txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
206 bufVersion: 0,
207 },
208 buckets: make(map[BucketID]*bolt.Bucket),
209 txWg: new(sync.WaitGroup),
210 txMu: new(sync.RWMutex),
211 },
212 },
213 txReadBufferCache: txReadBufferCache{
214 mu: sync.Mutex{},
215 bufVersion: 0,
216 buf: nil,
217 },
218
219 stopc: make(chan struct{}),
220 donec: make(chan struct{}),
221
222 lg: bcfg.Logger,
223 }
224
225 b.batchTx = newBatchTxBuffered(b)
226
227 b.hooks = bcfg.Hooks
228
229 go b.run()
230 return b
231 }
232
233
234
235
236 func (b *backend) BatchTx() BatchTx {
237 return b.batchTx
238 }
239
240 func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
241
242
243 b.batchTx.lock()
244 defer b.batchTx.Unlock()
245 b.txPostLockInsideApplyHook = hook
246 }
247
248 func (b *backend) ReadTx() ReadTx { return b.readTx }
249
250
251
252
253 func (b *backend) ConcurrentReadTx() ReadTx {
254 b.readTx.RLock()
255 defer b.readTx.RUnlock()
256
257 b.readTx.txWg.Add(1)
258
259
260
261
262
263
264
265
266
267
268
269
270
271 b.txReadBufferCache.mu.Lock()
272
273 curCache := b.txReadBufferCache.buf
274 curCacheVer := b.txReadBufferCache.bufVersion
275 curBufVer := b.readTx.buf.bufVersion
276
277 isEmptyCache := curCache == nil
278 isStaleCache := curCacheVer != curBufVer
279
280 var buf *txReadBuffer
281 switch {
282 case isEmptyCache:
283
284
285 curBuf := b.readTx.buf.unsafeCopy()
286 buf = &curBuf
287 case isStaleCache:
288
289
290
291
292 b.txReadBufferCache.mu.Unlock()
293 curBuf := b.readTx.buf.unsafeCopy()
294 b.txReadBufferCache.mu.Lock()
295 buf = &curBuf
296 default:
297
298 buf = curCache
299 }
300
301
302
303
304
305
306
307
308 if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
309
310 b.txReadBufferCache.buf = buf
311 b.txReadBufferCache.bufVersion = curBufVer
312 }
313
314 b.txReadBufferCache.mu.Unlock()
315
316
317 return &concurrentReadTx{
318 baseReadTx: baseReadTx{
319 buf: *buf,
320 txMu: b.readTx.txMu,
321 tx: b.readTx.tx,
322 buckets: b.readTx.buckets,
323 txWg: b.readTx.txWg,
324 },
325 }
326 }
327
328
329 func (b *backend) ForceCommit() {
330 b.batchTx.Commit()
331 }
332
333 func (b *backend) Snapshot() Snapshot {
334 b.batchTx.Commit()
335
336 b.mu.RLock()
337 defer b.mu.RUnlock()
338 tx, err := b.db.Begin(false)
339 if err != nil {
340 b.lg.Fatal("failed to begin tx", zap.Error(err))
341 }
342
343 stopc, donec := make(chan struct{}), make(chan struct{})
344 dbBytes := tx.Size()
345 go func() {
346 defer close(donec)
347
348
349 var sendRateBytes int64 = 100 * 1024 * 1024
350 warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
351 if warningTimeout < minSnapshotWarningTimeout {
352 warningTimeout = minSnapshotWarningTimeout
353 }
354 start := time.Now()
355 ticker := time.NewTicker(warningTimeout)
356 defer ticker.Stop()
357 for {
358 select {
359 case <-ticker.C:
360 b.lg.Warn(
361 "snapshotting taking too long to transfer",
362 zap.Duration("taking", time.Since(start)),
363 zap.Int64("bytes", dbBytes),
364 zap.String("size", humanize.Bytes(uint64(dbBytes))),
365 )
366
367 case <-stopc:
368 snapshotTransferSec.Observe(time.Since(start).Seconds())
369 return
370 }
371 }
372 }()
373
374 return &snapshot{tx, stopc, donec}
375 }
376
377 func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {
378 h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
379
380 b.mu.RLock()
381 defer b.mu.RUnlock()
382 err := b.db.View(func(tx *bolt.Tx) error {
383 c := tx.Cursor()
384 for next, _ := c.First(); next != nil; next, _ = c.Next() {
385 b := tx.Bucket(next)
386 if b == nil {
387 return fmt.Errorf("cannot get hash of bucket %s", string(next))
388 }
389 h.Write(next)
390 b.ForEach(func(k, v []byte) error {
391 if ignores != nil && !ignores(next, k) {
392 h.Write(k)
393 h.Write(v)
394 }
395 return nil
396 })
397 }
398 return nil
399 })
400
401 if err != nil {
402 return 0, err
403 }
404
405 return h.Sum32(), nil
406 }
407
408 func (b *backend) Size() int64 {
409 return atomic.LoadInt64(&b.size)
410 }
411
412 func (b *backend) SizeInUse() int64 {
413 return atomic.LoadInt64(&b.sizeInUse)
414 }
415
416 func (b *backend) run() {
417 defer close(b.donec)
418 t := time.NewTimer(b.batchInterval)
419 defer t.Stop()
420 for {
421 select {
422 case <-t.C:
423 case <-b.stopc:
424 b.batchTx.CommitAndStop()
425 return
426 }
427 if b.batchTx.safePending() != 0 {
428 b.batchTx.Commit()
429 }
430 t.Reset(b.batchInterval)
431 }
432 }
433
434 func (b *backend) Close() error {
435 close(b.stopc)
436 <-b.donec
437 return b.db.Close()
438 }
439
440
441 func (b *backend) Commits() int64 {
442 return atomic.LoadInt64(&b.commits)
443 }
444
445 func (b *backend) Defrag() error {
446 return b.defrag()
447 }
448
449 func (b *backend) defrag() error {
450 now := time.Now()
451 isDefragActive.Set(1)
452 defer isDefragActive.Set(0)
453
454
455
456
457 b.batchTx.LockOutsideApply()
458 defer b.batchTx.Unlock()
459
460
461 b.mu.Lock()
462 defer b.mu.Unlock()
463
464
465 b.readTx.Lock()
466 defer b.readTx.Unlock()
467
468 b.batchTx.unsafeCommit(true)
469
470 b.batchTx.tx = nil
471
472
473
474 dir := filepath.Dir(b.db.Path())
475 temp, err := ioutil.TempFile(dir, "db.tmp.*")
476 if err != nil {
477 return err
478 }
479 options := bolt.Options{}
480 if boltOpenOptions != nil {
481 options = *boltOpenOptions
482 }
483 options.OpenFile = func(_ string, _ int, _ os.FileMode) (file *os.File, err error) {
484 return temp, nil
485 }
486
487 options.Mlock = false
488 tdbp := temp.Name()
489 tmpdb, err := bolt.Open(tdbp, 0600, &options)
490 if err != nil {
491 return err
492 }
493
494 dbp := b.db.Path()
495 size1, sizeInUse1 := b.Size(), b.SizeInUse()
496 if b.lg != nil {
497 b.lg.Info(
498 "defragmenting",
499 zap.String("path", dbp),
500 zap.Int64("current-db-size-bytes", size1),
501 zap.String("current-db-size", humanize.Bytes(uint64(size1))),
502 zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
503 zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
504 )
505 }
506
507 err = defragdb(b.db, tmpdb, defragLimit)
508 if err != nil {
509 tmpdb.Close()
510 if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
511 b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
512 }
513 return err
514 }
515
516 err = b.db.Close()
517 if err != nil {
518 b.lg.Fatal("failed to close database", zap.Error(err))
519 }
520 err = tmpdb.Close()
521 if err != nil {
522 b.lg.Fatal("failed to close tmp database", zap.Error(err))
523 }
524
525 err = os.Rename(tdbp, dbp)
526 if err != nil {
527 b.lg.Fatal("failed to rename tmp database", zap.Error(err))
528 }
529
530 b.db, err = bolt.Open(dbp, 0600, b.bopts)
531 if err != nil {
532 b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
533 }
534 b.batchTx.tx = b.unsafeBegin(true)
535
536 b.readTx.reset()
537 b.readTx.tx = b.unsafeBegin(false)
538
539 size := b.readTx.tx.Size()
540 db := b.readTx.tx.DB()
541 atomic.StoreInt64(&b.size, size)
542 atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
543
544 took := time.Since(now)
545 defragSec.Observe(took.Seconds())
546
547 size2, sizeInUse2 := b.Size(), b.SizeInUse()
548 if b.lg != nil {
549 b.lg.Info(
550 "finished defragmenting directory",
551 zap.String("path", dbp),
552 zap.Int64("current-db-size-bytes-diff", size2-size1),
553 zap.Int64("current-db-size-bytes", size2),
554 zap.String("current-db-size", humanize.Bytes(uint64(size2))),
555 zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
556 zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
557 zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
558 zap.Duration("took", took),
559 )
560 }
561 return nil
562 }
563
564 func defragdb(odb, tmpdb *bolt.DB, limit int) error {
565
566 tmptx, err := tmpdb.Begin(true)
567 if err != nil {
568 return err
569 }
570 defer func() {
571 if err != nil {
572 tmptx.Rollback()
573 }
574 }()
575
576
577 tx, err := odb.Begin(false)
578 if err != nil {
579 return err
580 }
581 defer tx.Rollback()
582
583 c := tx.Cursor()
584
585 count := 0
586 for next, _ := c.First(); next != nil; next, _ = c.Next() {
587 b := tx.Bucket(next)
588 if b == nil {
589 return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
590 }
591
592 tmpb, berr := tmptx.CreateBucketIfNotExists(next)
593 if berr != nil {
594 return berr
595 }
596 tmpb.FillPercent = 0.9
597
598 if err = b.ForEach(func(k, v []byte) error {
599 count++
600 if count > limit {
601 err = tmptx.Commit()
602 if err != nil {
603 return err
604 }
605 tmptx, err = tmpdb.Begin(true)
606 if err != nil {
607 return err
608 }
609 tmpb = tmptx.Bucket(next)
610 tmpb.FillPercent = 0.9
611
612 count = 0
613 }
614 return tmpb.Put(k, v)
615 }); err != nil {
616 return err
617 }
618 }
619
620 return tmptx.Commit()
621 }
622
623 func (b *backend) begin(write bool) *bolt.Tx {
624 b.mu.RLock()
625 tx := b.unsafeBegin(write)
626 b.mu.RUnlock()
627
628 size := tx.Size()
629 db := tx.DB()
630 stats := db.Stats()
631 atomic.StoreInt64(&b.size, size)
632 atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
633 atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
634
635 return tx
636 }
637
638 func (b *backend) unsafeBegin(write bool) *bolt.Tx {
639 tx, err := b.db.Begin(write)
640 if err != nil {
641 b.lg.Fatal("failed to begin tx", zap.Error(err))
642 }
643 return tx
644 }
645
646 func (b *backend) OpenReadTxN() int64 {
647 return atomic.LoadInt64(&b.openReadTxN)
648 }
649
650 type snapshot struct {
651 *bolt.Tx
652 stopc chan struct{}
653 donec chan struct{}
654 }
655
656 func (s *snapshot) Close() error {
657 close(s.stopc)
658 <-s.donec
659 return s.Tx.Rollback()
660 }
661
View as plain text