...

Source file src/go.etcd.io/bbolt/bucket.go

Documentation: go.etcd.io/bbolt

     1  package bbolt
     2  
     3  import (
     4  	"bytes"
     5  	"fmt"
     6  	"unsafe"
     7  )
     8  
     9  const (
    10  	// MaxKeySize is the maximum length of a key, in bytes.
    11  	MaxKeySize = 32768
    12  
    13  	// MaxValueSize is the maximum length of a value, in bytes.
    14  	MaxValueSize = (1 << 31) - 2
    15  )
    16  
    17  const bucketHeaderSize = int(unsafe.Sizeof(bucket{}))
    18  
    19  const (
    20  	minFillPercent = 0.1
    21  	maxFillPercent = 1.0
    22  )
    23  
    24  // DefaultFillPercent is the percentage that split pages are filled.
    25  // This value can be changed by setting Bucket.FillPercent.
    26  const DefaultFillPercent = 0.5
    27  
    28  // Bucket represents a collection of key/value pairs inside the database.
    29  type Bucket struct {
    30  	*bucket
    31  	tx       *Tx                // the associated transaction
    32  	buckets  map[string]*Bucket // subbucket cache
    33  	page     *page              // inline page reference
    34  	rootNode *node              // materialized node for the root page.
    35  	nodes    map[pgid]*node     // node cache
    36  
    37  	// Sets the threshold for filling nodes when they split. By default,
    38  	// the bucket will fill to 50% but it can be useful to increase this
    39  	// amount if you know that your write workloads are mostly append-only.
    40  	//
    41  	// This is non-persisted across transactions so it must be set in every Tx.
    42  	FillPercent float64
    43  }
    44  
    45  // bucket represents the on-file representation of a bucket.
    46  // This is stored as the "value" of a bucket key. If the bucket is small enough,
    47  // then its root page can be stored inline in the "value", after the bucket
    48  // header. In the case of inline buckets, the "root" will be 0.
    49  type bucket struct {
    50  	root     pgid   // page id of the bucket's root-level page
    51  	sequence uint64 // monotonically incrementing, used by NextSequence()
    52  }
    53  
    54  // newBucket returns a new bucket associated with a transaction.
    55  func newBucket(tx *Tx) Bucket {
    56  	var b = Bucket{tx: tx, FillPercent: DefaultFillPercent}
    57  	if tx.writable {
    58  		b.buckets = make(map[string]*Bucket)
    59  		b.nodes = make(map[pgid]*node)
    60  	}
    61  	return b
    62  }
    63  
    64  // Tx returns the tx of the bucket.
    65  func (b *Bucket) Tx() *Tx {
    66  	return b.tx
    67  }
    68  
    69  // Root returns the root of the bucket.
    70  func (b *Bucket) Root() pgid {
    71  	return b.root
    72  }
    73  
    74  // Writable returns whether the bucket is writable.
    75  func (b *Bucket) Writable() bool {
    76  	return b.tx.writable
    77  }
    78  
    79  // Cursor creates a cursor associated with the bucket.
    80  // The cursor is only valid as long as the transaction is open.
    81  // Do not use a cursor after the transaction is closed.
    82  func (b *Bucket) Cursor() *Cursor {
    83  	// Update transaction statistics.
    84  	b.tx.stats.IncCursorCount(1)
    85  
    86  	// Allocate and return a cursor.
    87  	return &Cursor{
    88  		bucket: b,
    89  		stack:  make([]elemRef, 0),
    90  	}
    91  }
    92  
    93  // Bucket retrieves a nested bucket by name.
    94  // Returns nil if the bucket does not exist.
    95  // The bucket instance is only valid for the lifetime of the transaction.
    96  func (b *Bucket) Bucket(name []byte) *Bucket {
    97  	if b.buckets != nil {
    98  		if child := b.buckets[string(name)]; child != nil {
    99  			return child
   100  		}
   101  	}
   102  
   103  	// Move cursor to key.
   104  	c := b.Cursor()
   105  	k, v, flags := c.seek(name)
   106  
   107  	// Return nil if the key doesn't exist or it is not a bucket.
   108  	if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
   109  		return nil
   110  	}
   111  
   112  	// Otherwise create a bucket and cache it.
   113  	var child = b.openBucket(v)
   114  	if b.buckets != nil {
   115  		b.buckets[string(name)] = child
   116  	}
   117  
   118  	return child
   119  }
   120  
   121  // Helper method that re-interprets a sub-bucket value
   122  // from a parent into a Bucket
   123  func (b *Bucket) openBucket(value []byte) *Bucket {
   124  	var child = newBucket(b.tx)
   125  
   126  	// Unaligned access requires a copy to be made.
   127  	const unalignedMask = unsafe.Alignof(struct {
   128  		bucket
   129  		page
   130  	}{}) - 1
   131  	unaligned := uintptr(unsafe.Pointer(&value[0]))&unalignedMask != 0
   132  	if unaligned {
   133  		value = cloneBytes(value)
   134  	}
   135  
   136  	// If this is a writable transaction then we need to copy the bucket entry.
   137  	// Read-only transactions can point directly at the mmap entry.
   138  	if b.tx.writable && !unaligned {
   139  		child.bucket = &bucket{}
   140  		*child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
   141  	} else {
   142  		child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
   143  	}
   144  
   145  	// Save a reference to the inline page if the bucket is inline.
   146  	if child.root == 0 {
   147  		child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
   148  	}
   149  
   150  	return &child
   151  }
   152  
   153  // CreateBucket creates a new bucket at the given key and returns the new bucket.
   154  // Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
   155  // The bucket instance is only valid for the lifetime of the transaction.
   156  func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
   157  	if b.tx.db == nil {
   158  		return nil, ErrTxClosed
   159  	} else if !b.tx.writable {
   160  		return nil, ErrTxNotWritable
   161  	} else if len(key) == 0 {
   162  		return nil, ErrBucketNameRequired
   163  	}
   164  
   165  	// Insert into node.
   166  	// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
   167  	// it from being marked as leaking, and accordingly cannot be allocated on stack.
   168  	newKey := cloneBytes(key)
   169  
   170  	// Move cursor to correct position.
   171  	c := b.Cursor()
   172  	k, _, flags := c.seek(newKey)
   173  
   174  	// Return an error if there is an existing key.
   175  	if bytes.Equal(newKey, k) {
   176  		if (flags & bucketLeafFlag) != 0 {
   177  			return nil, ErrBucketExists
   178  		}
   179  		return nil, ErrIncompatibleValue
   180  	}
   181  
   182  	// Create empty, inline bucket.
   183  	var bucket = Bucket{
   184  		bucket:      &bucket{},
   185  		rootNode:    &node{isLeaf: true},
   186  		FillPercent: DefaultFillPercent,
   187  	}
   188  	var value = bucket.write()
   189  
   190  	c.node().put(newKey, newKey, value, 0, bucketLeafFlag)
   191  
   192  	// Since subbuckets are not allowed on inline buckets, we need to
   193  	// dereference the inline page, if it exists. This will cause the bucket
   194  	// to be treated as a regular, non-inline bucket for the rest of the tx.
   195  	b.page = nil
   196  
   197  	return b.Bucket(newKey), nil
   198  }
   199  
   200  // CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
   201  // Returns an error if the bucket name is blank, or if the bucket name is too long.
   202  // The bucket instance is only valid for the lifetime of the transaction.
   203  func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
   204  	child, err := b.CreateBucket(key)
   205  	if err == ErrBucketExists {
   206  		return b.Bucket(key), nil
   207  	} else if err != nil {
   208  		return nil, err
   209  	}
   210  	return child, nil
   211  }
   212  
   213  // DeleteBucket deletes a bucket at the given key.
   214  // Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
   215  func (b *Bucket) DeleteBucket(key []byte) error {
   216  	if b.tx.db == nil {
   217  		return ErrTxClosed
   218  	} else if !b.Writable() {
   219  		return ErrTxNotWritable
   220  	}
   221  
   222  	// Move cursor to correct position.
   223  	c := b.Cursor()
   224  	k, _, flags := c.seek(key)
   225  
   226  	// Return an error if bucket doesn't exist or is not a bucket.
   227  	if !bytes.Equal(key, k) {
   228  		return ErrBucketNotFound
   229  	} else if (flags & bucketLeafFlag) == 0 {
   230  		return ErrIncompatibleValue
   231  	}
   232  
   233  	// Recursively delete all child buckets.
   234  	child := b.Bucket(key)
   235  	err := child.ForEachBucket(func(k []byte) error {
   236  		if err := child.DeleteBucket(k); err != nil {
   237  			return fmt.Errorf("delete bucket: %s", err)
   238  		}
   239  		return nil
   240  	})
   241  	if err != nil {
   242  		return err
   243  	}
   244  
   245  	// Remove cached copy.
   246  	delete(b.buckets, string(key))
   247  
   248  	// Release all bucket pages to freelist.
   249  	child.nodes = nil
   250  	child.rootNode = nil
   251  	child.free()
   252  
   253  	// Delete the node if we have a matching key.
   254  	c.node().del(key)
   255  
   256  	return nil
   257  }
   258  
   259  // Get retrieves the value for a key in the bucket.
   260  // Returns a nil value if the key does not exist or if the key is a nested bucket.
   261  // The returned value is only valid for the life of the transaction.
   262  func (b *Bucket) Get(key []byte) []byte {
   263  	k, v, flags := b.Cursor().seek(key)
   264  
   265  	// Return nil if this is a bucket.
   266  	if (flags & bucketLeafFlag) != 0 {
   267  		return nil
   268  	}
   269  
   270  	// If our target node isn't the same key as what's passed in then return nil.
   271  	if !bytes.Equal(key, k) {
   272  		return nil
   273  	}
   274  	return v
   275  }
   276  
   277  // Put sets the value for a key in the bucket.
   278  // If the key exist then its previous value will be overwritten.
   279  // Supplied value must remain valid for the life of the transaction.
   280  // Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
   281  func (b *Bucket) Put(key []byte, value []byte) error {
   282  	if b.tx.db == nil {
   283  		return ErrTxClosed
   284  	} else if !b.Writable() {
   285  		return ErrTxNotWritable
   286  	} else if len(key) == 0 {
   287  		return ErrKeyRequired
   288  	} else if len(key) > MaxKeySize {
   289  		return ErrKeyTooLarge
   290  	} else if int64(len(value)) > MaxValueSize {
   291  		return ErrValueTooLarge
   292  	}
   293  
   294  	// Insert into node.
   295  	// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
   296  	// it from being marked as leaking, and accordingly cannot be allocated on stack.
   297  	newKey := cloneBytes(key)
   298  
   299  	// Move cursor to correct position.
   300  	c := b.Cursor()
   301  	k, _, flags := c.seek(newKey)
   302  
   303  	// Return an error if there is an existing key with a bucket value.
   304  	if bytes.Equal(newKey, k) && (flags&bucketLeafFlag) != 0 {
   305  		return ErrIncompatibleValue
   306  	}
   307  
   308  	// gofail: var beforeBucketPut struct{}
   309  
   310  	c.node().put(newKey, newKey, value, 0, 0)
   311  
   312  	return nil
   313  }
   314  
   315  // Delete removes a key from the bucket.
   316  // If the key does not exist then nothing is done and a nil error is returned.
   317  // Returns an error if the bucket was created from a read-only transaction.
   318  func (b *Bucket) Delete(key []byte) error {
   319  	if b.tx.db == nil {
   320  		return ErrTxClosed
   321  	} else if !b.Writable() {
   322  		return ErrTxNotWritable
   323  	}
   324  
   325  	// Move cursor to correct position.
   326  	c := b.Cursor()
   327  	k, _, flags := c.seek(key)
   328  
   329  	// Return nil if the key doesn't exist.
   330  	if !bytes.Equal(key, k) {
   331  		return nil
   332  	}
   333  
   334  	// Return an error if there is already existing bucket value.
   335  	if (flags & bucketLeafFlag) != 0 {
   336  		return ErrIncompatibleValue
   337  	}
   338  
   339  	// Delete the node if we have a matching key.
   340  	c.node().del(key)
   341  
   342  	return nil
   343  }
   344  
   345  // Sequence returns the current integer for the bucket without incrementing it.
   346  func (b *Bucket) Sequence() uint64 { return b.bucket.sequence }
   347  
   348  // SetSequence updates the sequence number for the bucket.
   349  func (b *Bucket) SetSequence(v uint64) error {
   350  	if b.tx.db == nil {
   351  		return ErrTxClosed
   352  	} else if !b.Writable() {
   353  		return ErrTxNotWritable
   354  	}
   355  
   356  	// Materialize the root node if it hasn't been already so that the
   357  	// bucket will be saved during commit.
   358  	if b.rootNode == nil {
   359  		_ = b.node(b.root, nil)
   360  	}
   361  
   362  	// Set the sequence.
   363  	b.bucket.sequence = v
   364  	return nil
   365  }
   366  
   367  // NextSequence returns an autoincrementing integer for the bucket.
   368  func (b *Bucket) NextSequence() (uint64, error) {
   369  	if b.tx.db == nil {
   370  		return 0, ErrTxClosed
   371  	} else if !b.Writable() {
   372  		return 0, ErrTxNotWritable
   373  	}
   374  
   375  	// Materialize the root node if it hasn't been already so that the
   376  	// bucket will be saved during commit.
   377  	if b.rootNode == nil {
   378  		_ = b.node(b.root, nil)
   379  	}
   380  
   381  	// Increment and return the sequence.
   382  	b.bucket.sequence++
   383  	return b.bucket.sequence, nil
   384  }
   385  
   386  // ForEach executes a function for each key/value pair in a bucket.
   387  // Because ForEach uses a Cursor, the iteration over keys is in lexicographical order.
   388  // If the provided function returns an error then the iteration is stopped and
   389  // the error is returned to the caller. The provided function must not modify
   390  // the bucket; this will result in undefined behavior.
   391  func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
   392  	if b.tx.db == nil {
   393  		return ErrTxClosed
   394  	}
   395  	c := b.Cursor()
   396  	for k, v := c.First(); k != nil; k, v = c.Next() {
   397  		if err := fn(k, v); err != nil {
   398  			return err
   399  		}
   400  	}
   401  	return nil
   402  }
   403  
   404  func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
   405  	if b.tx.db == nil {
   406  		return ErrTxClosed
   407  	}
   408  	c := b.Cursor()
   409  	for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
   410  		if flags&bucketLeafFlag != 0 {
   411  			if err := fn(k); err != nil {
   412  				return err
   413  			}
   414  		}
   415  	}
   416  	return nil
   417  }
   418  
   419  // Stats returns stats on a bucket.
   420  func (b *Bucket) Stats() BucketStats {
   421  	var s, subStats BucketStats
   422  	pageSize := b.tx.db.pageSize
   423  	s.BucketN += 1
   424  	if b.root == 0 {
   425  		s.InlineBucketN += 1
   426  	}
   427  	b.forEachPage(func(p *page, depth int, pgstack []pgid) {
   428  		if (p.flags & leafPageFlag) != 0 {
   429  			s.KeyN += int(p.count)
   430  
   431  			// used totals the used bytes for the page
   432  			used := pageHeaderSize
   433  
   434  			if p.count != 0 {
   435  				// If page has any elements, add all element headers.
   436  				used += leafPageElementSize * uintptr(p.count-1)
   437  
   438  				// Add all element key, value sizes.
   439  				// The computation takes advantage of the fact that the position
   440  				// of the last element's key/value equals to the total of the sizes
   441  				// of all previous elements' keys and values.
   442  				// It also includes the last element's header.
   443  				lastElement := p.leafPageElement(p.count - 1)
   444  				used += uintptr(lastElement.pos + lastElement.ksize + lastElement.vsize)
   445  			}
   446  
   447  			if b.root == 0 {
   448  				// For inlined bucket just update the inline stats
   449  				s.InlineBucketInuse += int(used)
   450  			} else {
   451  				// For non-inlined bucket update all the leaf stats
   452  				s.LeafPageN++
   453  				s.LeafInuse += int(used)
   454  				s.LeafOverflowN += int(p.overflow)
   455  
   456  				// Collect stats from sub-buckets.
   457  				// Do that by iterating over all element headers
   458  				// looking for the ones with the bucketLeafFlag.
   459  				for i := uint16(0); i < p.count; i++ {
   460  					e := p.leafPageElement(i)
   461  					if (e.flags & bucketLeafFlag) != 0 {
   462  						// For any bucket element, open the element value
   463  						// and recursively call Stats on the contained bucket.
   464  						subStats.Add(b.openBucket(e.value()).Stats())
   465  					}
   466  				}
   467  			}
   468  		} else if (p.flags & branchPageFlag) != 0 {
   469  			s.BranchPageN++
   470  			lastElement := p.branchPageElement(p.count - 1)
   471  
   472  			// used totals the used bytes for the page
   473  			// Add header and all element headers.
   474  			used := pageHeaderSize + (branchPageElementSize * uintptr(p.count-1))
   475  
   476  			// Add size of all keys and values.
   477  			// Again, use the fact that last element's position equals to
   478  			// the total of key, value sizes of all previous elements.
   479  			used += uintptr(lastElement.pos + lastElement.ksize)
   480  			s.BranchInuse += int(used)
   481  			s.BranchOverflowN += int(p.overflow)
   482  		}
   483  
   484  		// Keep track of maximum page depth.
   485  		if depth+1 > s.Depth {
   486  			s.Depth = depth + 1
   487  		}
   488  	})
   489  
   490  	// Alloc stats can be computed from page counts and pageSize.
   491  	s.BranchAlloc = (s.BranchPageN + s.BranchOverflowN) * pageSize
   492  	s.LeafAlloc = (s.LeafPageN + s.LeafOverflowN) * pageSize
   493  
   494  	// Add the max depth of sub-buckets to get total nested depth.
   495  	s.Depth += subStats.Depth
   496  	// Add the stats for all sub-buckets
   497  	s.Add(subStats)
   498  	return s
   499  }
   500  
   501  // forEachPage iterates over every page in a bucket, including inline pages.
   502  func (b *Bucket) forEachPage(fn func(*page, int, []pgid)) {
   503  	// If we have an inline page then just use that.
   504  	if b.page != nil {
   505  		fn(b.page, 0, []pgid{b.root})
   506  		return
   507  	}
   508  
   509  	// Otherwise traverse the page hierarchy.
   510  	b.tx.forEachPage(b.root, fn)
   511  }
   512  
   513  // forEachPageNode iterates over every page (or node) in a bucket.
   514  // This also includes inline pages.
   515  func (b *Bucket) forEachPageNode(fn func(*page, *node, int)) {
   516  	// If we have an inline page or root node then just use that.
   517  	if b.page != nil {
   518  		fn(b.page, nil, 0)
   519  		return
   520  	}
   521  	b._forEachPageNode(b.root, 0, fn)
   522  }
   523  
   524  func (b *Bucket) _forEachPageNode(pgId pgid, depth int, fn func(*page, *node, int)) {
   525  	var p, n = b.pageNode(pgId)
   526  
   527  	// Execute function.
   528  	fn(p, n, depth)
   529  
   530  	// Recursively loop over children.
   531  	if p != nil {
   532  		if (p.flags & branchPageFlag) != 0 {
   533  			for i := 0; i < int(p.count); i++ {
   534  				elem := p.branchPageElement(uint16(i))
   535  				b._forEachPageNode(elem.pgid, depth+1, fn)
   536  			}
   537  		}
   538  	} else {
   539  		if !n.isLeaf {
   540  			for _, inode := range n.inodes {
   541  				b._forEachPageNode(inode.pgid, depth+1, fn)
   542  			}
   543  		}
   544  	}
   545  }
   546  
   547  // spill writes all the nodes for this bucket to dirty pages.
   548  func (b *Bucket) spill() error {
   549  	// Spill all child buckets first.
   550  	for name, child := range b.buckets {
   551  		// If the child bucket is small enough and it has no child buckets then
   552  		// write it inline into the parent bucket's page. Otherwise spill it
   553  		// like a normal bucket and make the parent value a pointer to the page.
   554  		var value []byte
   555  		if child.inlineable() {
   556  			child.free()
   557  			value = child.write()
   558  		} else {
   559  			if err := child.spill(); err != nil {
   560  				return err
   561  			}
   562  
   563  			// Update the child bucket header in this bucket.
   564  			value = make([]byte, unsafe.Sizeof(bucket{}))
   565  			var bucket = (*bucket)(unsafe.Pointer(&value[0]))
   566  			*bucket = *child.bucket
   567  		}
   568  
   569  		// Skip writing the bucket if there are no materialized nodes.
   570  		if child.rootNode == nil {
   571  			continue
   572  		}
   573  
   574  		// Update parent node.
   575  		var c = b.Cursor()
   576  		k, _, flags := c.seek([]byte(name))
   577  		if !bytes.Equal([]byte(name), k) {
   578  			panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
   579  		}
   580  		if flags&bucketLeafFlag == 0 {
   581  			panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
   582  		}
   583  		c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
   584  	}
   585  
   586  	// Ignore if there's not a materialized root node.
   587  	if b.rootNode == nil {
   588  		return nil
   589  	}
   590  
   591  	// Spill nodes.
   592  	if err := b.rootNode.spill(); err != nil {
   593  		return err
   594  	}
   595  	b.rootNode = b.rootNode.root()
   596  
   597  	// Update the root node for this bucket.
   598  	if b.rootNode.pgid >= b.tx.meta.pgid {
   599  		panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
   600  	}
   601  	b.root = b.rootNode.pgid
   602  
   603  	return nil
   604  }
   605  
   606  // inlineable returns true if a bucket is small enough to be written inline
   607  // and if it contains no subbuckets. Otherwise returns false.
   608  func (b *Bucket) inlineable() bool {
   609  	var n = b.rootNode
   610  
   611  	// Bucket must only contain a single leaf node.
   612  	if n == nil || !n.isLeaf {
   613  		return false
   614  	}
   615  
   616  	// Bucket is not inlineable if it contains subbuckets or if it goes beyond
   617  	// our threshold for inline bucket size.
   618  	var size = pageHeaderSize
   619  	for _, inode := range n.inodes {
   620  		size += leafPageElementSize + uintptr(len(inode.key)) + uintptr(len(inode.value))
   621  
   622  		if inode.flags&bucketLeafFlag != 0 {
   623  			return false
   624  		} else if size > b.maxInlineBucketSize() {
   625  			return false
   626  		}
   627  	}
   628  
   629  	return true
   630  }
   631  
   632  // Returns the maximum total size of a bucket to make it a candidate for inlining.
   633  func (b *Bucket) maxInlineBucketSize() uintptr {
   634  	return uintptr(b.tx.db.pageSize / 4)
   635  }
   636  
   637  // write allocates and writes a bucket to a byte slice.
   638  func (b *Bucket) write() []byte {
   639  	// Allocate the appropriate size.
   640  	var n = b.rootNode
   641  	var value = make([]byte, bucketHeaderSize+n.size())
   642  
   643  	// Write a bucket header.
   644  	var bucket = (*bucket)(unsafe.Pointer(&value[0]))
   645  	*bucket = *b.bucket
   646  
   647  	// Convert byte slice to a fake page and write the root node.
   648  	var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
   649  	n.write(p)
   650  
   651  	return value
   652  }
   653  
   654  // rebalance attempts to balance all nodes.
   655  func (b *Bucket) rebalance() {
   656  	for _, n := range b.nodes {
   657  		n.rebalance()
   658  	}
   659  	for _, child := range b.buckets {
   660  		child.rebalance()
   661  	}
   662  }
   663  
   664  // node creates a node from a page and associates it with a given parent.
   665  func (b *Bucket) node(pgId pgid, parent *node) *node {
   666  	_assert(b.nodes != nil, "nodes map expected")
   667  
   668  	// Retrieve node if it's already been created.
   669  	if n := b.nodes[pgId]; n != nil {
   670  		return n
   671  	}
   672  
   673  	// Otherwise create a node and cache it.
   674  	n := &node{bucket: b, parent: parent}
   675  	if parent == nil {
   676  		b.rootNode = n
   677  	} else {
   678  		parent.children = append(parent.children, n)
   679  	}
   680  
   681  	// Use the inline page if this is an inline bucket.
   682  	var p = b.page
   683  	if p == nil {
   684  		p = b.tx.page(pgId)
   685  	}
   686  
   687  	// Read the page into the node and cache it.
   688  	n.read(p)
   689  	b.nodes[pgId] = n
   690  
   691  	// Update statistics.
   692  	b.tx.stats.IncNodeCount(1)
   693  
   694  	return n
   695  }
   696  
   697  // free recursively frees all pages in the bucket.
   698  func (b *Bucket) free() {
   699  	if b.root == 0 {
   700  		return
   701  	}
   702  
   703  	var tx = b.tx
   704  	b.forEachPageNode(func(p *page, n *node, _ int) {
   705  		if p != nil {
   706  			tx.db.freelist.free(tx.meta.txid, p)
   707  		} else {
   708  			n.free()
   709  		}
   710  	})
   711  	b.root = 0
   712  }
   713  
   714  // dereference removes all references to the old mmap.
   715  func (b *Bucket) dereference() {
   716  	if b.rootNode != nil {
   717  		b.rootNode.root().dereference()
   718  	}
   719  
   720  	for _, child := range b.buckets {
   721  		child.dereference()
   722  	}
   723  }
   724  
   725  // pageNode returns the in-memory node, if it exists.
   726  // Otherwise returns the underlying page.
   727  func (b *Bucket) pageNode(id pgid) (*page, *node) {
   728  	// Inline buckets have a fake page embedded in their value so treat them
   729  	// differently. We'll return the rootNode (if available) or the fake page.
   730  	if b.root == 0 {
   731  		if id != 0 {
   732  			panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
   733  		}
   734  		if b.rootNode != nil {
   735  			return nil, b.rootNode
   736  		}
   737  		return b.page, nil
   738  	}
   739  
   740  	// Check the node cache for non-inline buckets.
   741  	if b.nodes != nil {
   742  		if n := b.nodes[id]; n != nil {
   743  			return nil, n
   744  		}
   745  	}
   746  
   747  	// Finally lookup the page from the transaction if no node is materialized.
   748  	return b.tx.page(id), nil
   749  }
   750  
   751  // BucketStats records statistics about resources used by a bucket.
   752  type BucketStats struct {
   753  	// Page count statistics.
   754  	BranchPageN     int // number of logical branch pages
   755  	BranchOverflowN int // number of physical branch overflow pages
   756  	LeafPageN       int // number of logical leaf pages
   757  	LeafOverflowN   int // number of physical leaf overflow pages
   758  
   759  	// Tree statistics.
   760  	KeyN  int // number of keys/value pairs
   761  	Depth int // number of levels in B+tree
   762  
   763  	// Page size utilization.
   764  	BranchAlloc int // bytes allocated for physical branch pages
   765  	BranchInuse int // bytes actually used for branch data
   766  	LeafAlloc   int // bytes allocated for physical leaf pages
   767  	LeafInuse   int // bytes actually used for leaf data
   768  
   769  	// Bucket statistics
   770  	BucketN           int // total number of buckets including the top bucket
   771  	InlineBucketN     int // total number on inlined buckets
   772  	InlineBucketInuse int // bytes used for inlined buckets (also accounted for in LeafInuse)
   773  }
   774  
   775  func (s *BucketStats) Add(other BucketStats) {
   776  	s.BranchPageN += other.BranchPageN
   777  	s.BranchOverflowN += other.BranchOverflowN
   778  	s.LeafPageN += other.LeafPageN
   779  	s.LeafOverflowN += other.LeafOverflowN
   780  	s.KeyN += other.KeyN
   781  	if s.Depth < other.Depth {
   782  		s.Depth = other.Depth
   783  	}
   784  	s.BranchAlloc += other.BranchAlloc
   785  	s.BranchInuse += other.BranchInuse
   786  	s.LeafAlloc += other.LeafAlloc
   787  	s.LeafInuse += other.LeafInuse
   788  
   789  	s.BucketN += other.BucketN
   790  	s.InlineBucketN += other.InlineBucketN
   791  	s.InlineBucketInuse += other.InlineBucketInuse
   792  }
   793  
   794  // cloneBytes returns a copy of a given slice.
   795  func cloneBytes(v []byte) []byte {
   796  	var clone = make([]byte, len(v))
   797  	copy(clone, v)
   798  	return clone
   799  }
   800  

View as plain text