...

Source file src/go.etcd.io/bbolt/tx_check.go

Documentation: go.etcd.io/bbolt

     1  package bbolt
     2  
     3  import (
     4  	"encoding/hex"
     5  	"fmt"
     6  )
     7  
     8  // Check performs several consistency checks on the database for this transaction.
     9  // An error is returned if any inconsistency is found.
    10  //
    11  // It can be safely run concurrently on a writable transaction. However, this
    12  // incurs a high cost for large databases and databases with a lot of subbuckets
    13  // because of caching. This overhead can be removed if running on a read-only
    14  // transaction, however, it is not safe to execute other writer transactions at
    15  // the same time.
    16  func (tx *Tx) Check() <-chan error {
    17  	return tx.CheckWithOptions()
    18  }
    19  
    20  // CheckWithOptions allows users to provide a customized `KVStringer` implementation,
    21  // so that bolt can generate human-readable diagnostic messages.
    22  func (tx *Tx) CheckWithOptions(options ...CheckOption) <-chan error {
    23  	chkConfig := checkConfig{
    24  		kvStringer: HexKVStringer(),
    25  	}
    26  	for _, op := range options {
    27  		op(&chkConfig)
    28  	}
    29  
    30  	ch := make(chan error)
    31  	go tx.check(chkConfig.kvStringer, ch)
    32  	return ch
    33  }
    34  
    35  func (tx *Tx) check(kvStringer KVStringer, ch chan error) {
    36  	// Force loading free list if opened in ReadOnly mode.
    37  	tx.db.loadFreelist()
    38  
    39  	// Check if any pages are double freed.
    40  	freed := make(map[pgid]bool)
    41  	all := make([]pgid, tx.db.freelist.count())
    42  	tx.db.freelist.copyall(all)
    43  	for _, id := range all {
    44  		if freed[id] {
    45  			ch <- fmt.Errorf("page %d: already freed", id)
    46  		}
    47  		freed[id] = true
    48  	}
    49  
    50  	// Track every reachable page.
    51  	reachable := make(map[pgid]*page)
    52  	reachable[0] = tx.page(0) // meta0
    53  	reachable[1] = tx.page(1) // meta1
    54  	if tx.meta.freelist != pgidNoFreelist {
    55  		for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
    56  			reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
    57  		}
    58  	}
    59  
    60  	// Recursively check buckets.
    61  	tx.checkBucket(&tx.root, reachable, freed, kvStringer, ch)
    62  
    63  	// Ensure all pages below high water mark are either reachable or freed.
    64  	for i := pgid(0); i < tx.meta.pgid; i++ {
    65  		_, isReachable := reachable[i]
    66  		if !isReachable && !freed[i] {
    67  			ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
    68  		}
    69  	}
    70  
    71  	// Close the channel to signal completion.
    72  	close(ch)
    73  }
    74  
    75  func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool,
    76  	kvStringer KVStringer, ch chan error) {
    77  	// Ignore inline buckets.
    78  	if b.root == 0 {
    79  		return
    80  	}
    81  
    82  	// Check every page used by this bucket.
    83  	b.tx.forEachPage(b.root, func(p *page, _ int, stack []pgid) {
    84  		if p.id > tx.meta.pgid {
    85  			ch <- fmt.Errorf("page %d: out of bounds: %d (stack: %v)", int(p.id), int(b.tx.meta.pgid), stack)
    86  		}
    87  
    88  		// Ensure each page is only referenced once.
    89  		for i := pgid(0); i <= pgid(p.overflow); i++ {
    90  			var id = p.id + i
    91  			if _, ok := reachable[id]; ok {
    92  				ch <- fmt.Errorf("page %d: multiple references (stack: %v)", int(id), stack)
    93  			}
    94  			reachable[id] = p
    95  		}
    96  
    97  		// We should only encounter un-freed leaf and branch pages.
    98  		if freed[p.id] {
    99  			ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
   100  		} else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
   101  			ch <- fmt.Errorf("page %d: invalid type: %s (stack: %v)", int(p.id), p.typ(), stack)
   102  		}
   103  	})
   104  
   105  	tx.recursivelyCheckPages(b.root, kvStringer.KeyToString, ch)
   106  
   107  	// Check each bucket within this bucket.
   108  	_ = b.ForEachBucket(func(k []byte) error {
   109  		if child := b.Bucket(k); child != nil {
   110  			tx.checkBucket(child, reachable, freed, kvStringer, ch)
   111  		}
   112  		return nil
   113  	})
   114  }
   115  
   116  // recursivelyCheckPages confirms database consistency with respect to b-tree
   117  // key order constraints:
   118  //   - keys on pages must be sorted
   119  //   - keys on children pages are between 2 consecutive keys on the parent's branch page).
   120  func (tx *Tx) recursivelyCheckPages(pgId pgid, keyToString func([]byte) string, ch chan error) {
   121  	tx.recursivelyCheckPagesInternal(pgId, nil, nil, nil, keyToString, ch)
   122  }
   123  
   124  // recursivelyCheckPagesInternal verifies that all keys in the subtree rooted at `pgid` are:
   125  //   - >=`minKeyClosed` (can be nil)
   126  //   - <`maxKeyOpen` (can be nil)
   127  //   - Are in right ordering relationship to their parents.
   128  //     `pagesStack` is expected to contain IDs of pages from the tree root to `pgid` for the clean debugging message.
   129  func (tx *Tx) recursivelyCheckPagesInternal(
   130  	pgId pgid, minKeyClosed, maxKeyOpen []byte, pagesStack []pgid,
   131  	keyToString func([]byte) string, ch chan error) (maxKeyInSubtree []byte) {
   132  
   133  	p := tx.page(pgId)
   134  	pagesStack = append(pagesStack, pgId)
   135  	switch {
   136  	case p.flags&branchPageFlag != 0:
   137  		// For branch page we navigate ranges of all subpages.
   138  		runningMin := minKeyClosed
   139  		for i := range p.branchPageElements() {
   140  			elem := p.branchPageElement(uint16(i))
   141  			verifyKeyOrder(elem.pgid, "branch", i, elem.key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
   142  
   143  			maxKey := maxKeyOpen
   144  			if i < len(p.branchPageElements())-1 {
   145  				maxKey = p.branchPageElement(uint16(i + 1)).key()
   146  			}
   147  			maxKeyInSubtree = tx.recursivelyCheckPagesInternal(elem.pgid, elem.key(), maxKey, pagesStack, keyToString, ch)
   148  			runningMin = maxKeyInSubtree
   149  		}
   150  		return maxKeyInSubtree
   151  	case p.flags&leafPageFlag != 0:
   152  		runningMin := minKeyClosed
   153  		for i := range p.leafPageElements() {
   154  			elem := p.leafPageElement(uint16(i))
   155  			verifyKeyOrder(pgId, "leaf", i, elem.key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
   156  			runningMin = elem.key()
   157  		}
   158  		if p.count > 0 {
   159  			return p.leafPageElement(p.count - 1).key()
   160  		}
   161  	default:
   162  		ch <- fmt.Errorf("unexpected page type for pgId:%d", pgId)
   163  	}
   164  	return maxKeyInSubtree
   165  }
   166  
   167  /***
   168   * verifyKeyOrder checks whether an entry with given #index on pgId (pageType: "branch|leaf") that has given "key",
   169   * is within range determined by (previousKey..maxKeyOpen) and reports found violations to the channel (ch).
   170   */
   171  func verifyKeyOrder(pgId pgid, pageType string, index int, key []byte, previousKey []byte, maxKeyOpen []byte, ch chan error, keyToString func([]byte) string, pagesStack []pgid) {
   172  	if index == 0 && previousKey != nil && compareKeys(previousKey, key) > 0 {
   173  		ch <- fmt.Errorf("the first key[%d]=(hex)%s on %s page(%d) needs to be >= the key in the ancestor (%s). Stack: %v",
   174  			index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
   175  	}
   176  	if index > 0 {
   177  		cmpRet := compareKeys(previousKey, key)
   178  		if cmpRet > 0 {
   179  			ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found <) than previous element (hex)%s. Stack: %v",
   180  				index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
   181  		}
   182  		if cmpRet == 0 {
   183  			ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found =) than previous element (hex)%s. Stack: %v",
   184  				index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
   185  		}
   186  	}
   187  	if maxKeyOpen != nil && compareKeys(key, maxKeyOpen) >= 0 {
   188  		ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be < than key of the next element in ancestor (hex)%s. Pages stack: %v",
   189  			index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
   190  	}
   191  }
   192  
   193  // ===========================================================================================
   194  
   195  type checkConfig struct {
   196  	kvStringer KVStringer
   197  }
   198  
   199  type CheckOption func(options *checkConfig)
   200  
   201  func WithKVStringer(kvStringer KVStringer) CheckOption {
   202  	return func(c *checkConfig) {
   203  		c.kvStringer = kvStringer
   204  	}
   205  }
   206  
   207  // KVStringer allows to prepare human-readable diagnostic messages.
   208  type KVStringer interface {
   209  	KeyToString([]byte) string
   210  	ValueToString([]byte) string
   211  }
   212  
   213  // HexKVStringer serializes both key & value to hex representation.
   214  func HexKVStringer() KVStringer {
   215  	return hexKvStringer{}
   216  }
   217  
   218  type hexKvStringer struct{}
   219  
   220  func (_ hexKvStringer) KeyToString(key []byte) string {
   221  	return hex.EncodeToString(key)
   222  }
   223  
   224  func (_ hexKvStringer) ValueToString(value []byte) string {
   225  	return hex.EncodeToString(value)
   226  }
   227  

View as plain text