...

Source file src/go.etcd.io/bbolt/tx.go

Documentation: go.etcd.io/bbolt

     1  package bbolt
     2  
     3  import (
     4  	"fmt"
     5  	"io"
     6  	"os"
     7  	"sort"
     8  	"strings"
     9  	"sync/atomic"
    10  	"time"
    11  	"unsafe"
    12  )
    13  
    14  // txid represents the internal transaction identifier.
    15  type txid uint64
    16  
    17  // Tx represents a read-only or read/write transaction on the database.
    18  // Read-only transactions can be used for retrieving values for keys and creating cursors.
    19  // Read/write transactions can create and remove buckets and create and remove keys.
    20  //
    21  // IMPORTANT: You must commit or rollback transactions when you are done with
    22  // them. Pages can not be reclaimed by the writer until no more transactions
    23  // are using them. A long running read transaction can cause the database to
    24  // quickly grow.
    25  type Tx struct {
    26  	writable       bool
    27  	managed        bool
    28  	db             *DB
    29  	meta           *meta
    30  	root           Bucket
    31  	pages          map[pgid]*page
    32  	stats          TxStats
    33  	commitHandlers []func()
    34  
    35  	// WriteFlag specifies the flag for write-related methods like WriteTo().
    36  	// Tx opens the database file with the specified flag to copy the data.
    37  	//
    38  	// By default, the flag is unset, which works well for mostly in-memory
    39  	// workloads. For databases that are much larger than available RAM,
    40  	// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
    41  	WriteFlag int
    42  }
    43  
    44  // init initializes the transaction.
    45  func (tx *Tx) init(db *DB) {
    46  	tx.db = db
    47  	tx.pages = nil
    48  
    49  	// Copy the meta page since it can be changed by the writer.
    50  	tx.meta = &meta{}
    51  	db.meta().copy(tx.meta)
    52  
    53  	// Copy over the root bucket.
    54  	tx.root = newBucket(tx)
    55  	tx.root.bucket = &bucket{}
    56  	*tx.root.bucket = tx.meta.root
    57  
    58  	// Increment the transaction id and add a page cache for writable transactions.
    59  	if tx.writable {
    60  		tx.pages = make(map[pgid]*page)
    61  		tx.meta.txid += txid(1)
    62  	}
    63  }
    64  
    65  // ID returns the transaction id.
    66  func (tx *Tx) ID() int {
    67  	return int(tx.meta.txid)
    68  }
    69  
    70  // DB returns a reference to the database that created the transaction.
    71  func (tx *Tx) DB() *DB {
    72  	return tx.db
    73  }
    74  
    75  // Size returns current database size in bytes as seen by this transaction.
    76  func (tx *Tx) Size() int64 {
    77  	return int64(tx.meta.pgid) * int64(tx.db.pageSize)
    78  }
    79  
    80  // Writable returns whether the transaction can perform write operations.
    81  func (tx *Tx) Writable() bool {
    82  	return tx.writable
    83  }
    84  
    85  // Cursor creates a cursor associated with the root bucket.
    86  // All items in the cursor will return a nil value because all root bucket keys point to buckets.
    87  // The cursor is only valid as long as the transaction is open.
    88  // Do not use a cursor after the transaction is closed.
    89  func (tx *Tx) Cursor() *Cursor {
    90  	return tx.root.Cursor()
    91  }
    92  
    93  // Stats retrieves a copy of the current transaction statistics.
    94  func (tx *Tx) Stats() TxStats {
    95  	return tx.stats
    96  }
    97  
    98  // Bucket retrieves a bucket by name.
    99  // Returns nil if the bucket does not exist.
   100  // The bucket instance is only valid for the lifetime of the transaction.
   101  func (tx *Tx) Bucket(name []byte) *Bucket {
   102  	return tx.root.Bucket(name)
   103  }
   104  
   105  // CreateBucket creates a new bucket.
   106  // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
   107  // The bucket instance is only valid for the lifetime of the transaction.
   108  func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
   109  	return tx.root.CreateBucket(name)
   110  }
   111  
   112  // CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
   113  // Returns an error if the bucket name is blank, or if the bucket name is too long.
   114  // The bucket instance is only valid for the lifetime of the transaction.
   115  func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
   116  	return tx.root.CreateBucketIfNotExists(name)
   117  }
   118  
   119  // DeleteBucket deletes a bucket.
   120  // Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
   121  func (tx *Tx) DeleteBucket(name []byte) error {
   122  	return tx.root.DeleteBucket(name)
   123  }
   124  
   125  // ForEach executes a function for each bucket in the root.
   126  // If the provided function returns an error then the iteration is stopped and
   127  // the error is returned to the caller.
   128  func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
   129  	return tx.root.ForEach(func(k, v []byte) error {
   130  		return fn(k, tx.root.Bucket(k))
   131  	})
   132  }
   133  
   134  // OnCommit adds a handler function to be executed after the transaction successfully commits.
   135  func (tx *Tx) OnCommit(fn func()) {
   136  	tx.commitHandlers = append(tx.commitHandlers, fn)
   137  }
   138  
   139  // Commit writes all changes to disk and updates the meta page.
   140  // Returns an error if a disk write error occurs, or if Commit is
   141  // called on a read-only transaction.
   142  func (tx *Tx) Commit() error {
   143  	_assert(!tx.managed, "managed tx commit not allowed")
   144  	if tx.db == nil {
   145  		return ErrTxClosed
   146  	} else if !tx.writable {
   147  		return ErrTxNotWritable
   148  	}
   149  
   150  	// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
   151  
   152  	// Rebalance nodes which have had deletions.
   153  	var startTime = time.Now()
   154  	tx.root.rebalance()
   155  	if tx.stats.GetRebalance() > 0 {
   156  		tx.stats.IncRebalanceTime(time.Since(startTime))
   157  	}
   158  
   159  	opgid := tx.meta.pgid
   160  
   161  	// spill data onto dirty pages.
   162  	startTime = time.Now()
   163  	if err := tx.root.spill(); err != nil {
   164  		tx.rollback()
   165  		return err
   166  	}
   167  	tx.stats.IncSpillTime(time.Since(startTime))
   168  
   169  	// Free the old root bucket.
   170  	tx.meta.root.root = tx.root.root
   171  
   172  	// Free the old freelist because commit writes out a fresh freelist.
   173  	if tx.meta.freelist != pgidNoFreelist {
   174  		tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
   175  	}
   176  
   177  	if !tx.db.NoFreelistSync {
   178  		err := tx.commitFreelist()
   179  		if err != nil {
   180  			return err
   181  		}
   182  	} else {
   183  		tx.meta.freelist = pgidNoFreelist
   184  	}
   185  
   186  	// If the high water mark has moved up then attempt to grow the database.
   187  	if tx.meta.pgid > opgid {
   188  		if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
   189  			tx.rollback()
   190  			return err
   191  		}
   192  	}
   193  
   194  	// Write dirty pages to disk.
   195  	startTime = time.Now()
   196  	if err := tx.write(); err != nil {
   197  		tx.rollback()
   198  		return err
   199  	}
   200  
   201  	// If strict mode is enabled then perform a consistency check.
   202  	if tx.db.StrictMode {
   203  		ch := tx.Check()
   204  		var errs []string
   205  		for {
   206  			err, ok := <-ch
   207  			if !ok {
   208  				break
   209  			}
   210  			errs = append(errs, err.Error())
   211  		}
   212  		if len(errs) > 0 {
   213  			panic("check fail: " + strings.Join(errs, "\n"))
   214  		}
   215  	}
   216  
   217  	// Write meta to disk.
   218  	if err := tx.writeMeta(); err != nil {
   219  		tx.rollback()
   220  		return err
   221  	}
   222  	tx.stats.IncWriteTime(time.Since(startTime))
   223  
   224  	// Finalize the transaction.
   225  	tx.close()
   226  
   227  	// Execute commit handlers now that the locks have been removed.
   228  	for _, fn := range tx.commitHandlers {
   229  		fn()
   230  	}
   231  
   232  	return nil
   233  }
   234  
   235  func (tx *Tx) commitFreelist() error {
   236  	// Allocate new pages for the new free list. This will overestimate
   237  	// the size of the freelist but not underestimate the size (which would be bad).
   238  	p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
   239  	if err != nil {
   240  		tx.rollback()
   241  		return err
   242  	}
   243  	if err := tx.db.freelist.write(p); err != nil {
   244  		tx.rollback()
   245  		return err
   246  	}
   247  	tx.meta.freelist = p.id
   248  
   249  	return nil
   250  }
   251  
   252  // Rollback closes the transaction and ignores all previous updates. Read-only
   253  // transactions must be rolled back and not committed.
   254  func (tx *Tx) Rollback() error {
   255  	_assert(!tx.managed, "managed tx rollback not allowed")
   256  	if tx.db == nil {
   257  		return ErrTxClosed
   258  	}
   259  	tx.nonPhysicalRollback()
   260  	return nil
   261  }
   262  
   263  // nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.
   264  func (tx *Tx) nonPhysicalRollback() {
   265  	if tx.db == nil {
   266  		return
   267  	}
   268  	if tx.writable {
   269  		tx.db.freelist.rollback(tx.meta.txid)
   270  	}
   271  	tx.close()
   272  }
   273  
   274  // rollback needs to reload the free pages from disk in case some system error happens like fsync error.
   275  func (tx *Tx) rollback() {
   276  	if tx.db == nil {
   277  		return
   278  	}
   279  	if tx.writable {
   280  		tx.db.freelist.rollback(tx.meta.txid)
   281  		// When mmap fails, the `data`, `dataref` and `datasz` may be reset to
   282  		// zero values, and there is no way to reload free page IDs in this case.
   283  		if tx.db.data != nil {
   284  			if !tx.db.hasSyncedFreelist() {
   285  				// Reconstruct free page list by scanning the DB to get the whole free page list.
   286  				// Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode.
   287  				tx.db.freelist.noSyncReload(tx.db.freepages())
   288  			} else {
   289  				// Read free page list from freelist page.
   290  				tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
   291  			}
   292  		}
   293  	}
   294  	tx.close()
   295  }
   296  
   297  func (tx *Tx) close() {
   298  	if tx.db == nil {
   299  		return
   300  	}
   301  	if tx.writable {
   302  		// Grab freelist stats.
   303  		var freelistFreeN = tx.db.freelist.free_count()
   304  		var freelistPendingN = tx.db.freelist.pending_count()
   305  		var freelistAlloc = tx.db.freelist.size()
   306  
   307  		// Remove transaction ref & writer lock.
   308  		tx.db.rwtx = nil
   309  		tx.db.rwlock.Unlock()
   310  
   311  		// Merge statistics.
   312  		tx.db.statlock.Lock()
   313  		tx.db.stats.FreePageN = freelistFreeN
   314  		tx.db.stats.PendingPageN = freelistPendingN
   315  		tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
   316  		tx.db.stats.FreelistInuse = freelistAlloc
   317  		tx.db.stats.TxStats.add(&tx.stats)
   318  		tx.db.statlock.Unlock()
   319  	} else {
   320  		tx.db.removeTx(tx)
   321  	}
   322  
   323  	// Clear all references.
   324  	tx.db = nil
   325  	tx.meta = nil
   326  	tx.root = Bucket{tx: tx}
   327  	tx.pages = nil
   328  }
   329  
   330  // Copy writes the entire database to a writer.
   331  // This function exists for backwards compatibility.
   332  //
   333  // Deprecated; Use WriteTo() instead.
   334  func (tx *Tx) Copy(w io.Writer) error {
   335  	_, err := tx.WriteTo(w)
   336  	return err
   337  }
   338  
   339  // WriteTo writes the entire database to a writer.
   340  // If err == nil then exactly tx.Size() bytes will be written into the writer.
   341  func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
   342  	// Attempt to open reader with WriteFlag
   343  	f, err := tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
   344  	if err != nil {
   345  		return 0, err
   346  	}
   347  	defer func() {
   348  		if cerr := f.Close(); err == nil {
   349  			err = cerr
   350  		}
   351  	}()
   352  
   353  	// Generate a meta page. We use the same page data for both meta pages.
   354  	buf := make([]byte, tx.db.pageSize)
   355  	page := (*page)(unsafe.Pointer(&buf[0]))
   356  	page.flags = metaPageFlag
   357  	*page.meta() = *tx.meta
   358  
   359  	// Write meta 0.
   360  	page.id = 0
   361  	page.meta().checksum = page.meta().sum64()
   362  	nn, err := w.Write(buf)
   363  	n += int64(nn)
   364  	if err != nil {
   365  		return n, fmt.Errorf("meta 0 copy: %s", err)
   366  	}
   367  
   368  	// Write meta 1 with a lower transaction id.
   369  	page.id = 1
   370  	page.meta().txid -= 1
   371  	page.meta().checksum = page.meta().sum64()
   372  	nn, err = w.Write(buf)
   373  	n += int64(nn)
   374  	if err != nil {
   375  		return n, fmt.Errorf("meta 1 copy: %s", err)
   376  	}
   377  
   378  	// Move past the meta pages in the file.
   379  	if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil {
   380  		return n, fmt.Errorf("seek: %s", err)
   381  	}
   382  
   383  	// Copy data pages.
   384  	wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
   385  	n += wn
   386  	if err != nil {
   387  		return n, err
   388  	}
   389  
   390  	return n, nil
   391  }
   392  
   393  // CopyFile copies the entire database to file at the given path.
   394  // A reader transaction is maintained during the copy so it is safe to continue
   395  // using the database while a copy is in progress.
   396  func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
   397  	f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
   398  	if err != nil {
   399  		return err
   400  	}
   401  
   402  	_, err = tx.WriteTo(f)
   403  	if err != nil {
   404  		_ = f.Close()
   405  		return err
   406  	}
   407  	return f.Close()
   408  }
   409  
   410  // allocate returns a contiguous block of memory starting at a given page.
   411  func (tx *Tx) allocate(count int) (*page, error) {
   412  	p, err := tx.db.allocate(tx.meta.txid, count)
   413  	if err != nil {
   414  		return nil, err
   415  	}
   416  
   417  	// Save to our page cache.
   418  	tx.pages[p.id] = p
   419  
   420  	// Update statistics.
   421  	tx.stats.IncPageCount(int64(count))
   422  	tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))
   423  
   424  	return p, nil
   425  }
   426  
   427  // write writes any dirty pages to disk.
   428  func (tx *Tx) write() error {
   429  	// Sort pages by id.
   430  	pages := make(pages, 0, len(tx.pages))
   431  	for _, p := range tx.pages {
   432  		pages = append(pages, p)
   433  	}
   434  	// Clear out page cache early.
   435  	tx.pages = make(map[pgid]*page)
   436  	sort.Sort(pages)
   437  
   438  	// Write pages to disk in order.
   439  	for _, p := range pages {
   440  		rem := (uint64(p.overflow) + 1) * uint64(tx.db.pageSize)
   441  		offset := int64(p.id) * int64(tx.db.pageSize)
   442  		var written uintptr
   443  
   444  		// Write out page in "max allocation" sized chunks.
   445  		for {
   446  			sz := rem
   447  			if sz > maxAllocSize-1 {
   448  				sz = maxAllocSize - 1
   449  			}
   450  			buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
   451  
   452  			if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
   453  				return err
   454  			}
   455  
   456  			// Update statistics.
   457  			tx.stats.IncWrite(1)
   458  
   459  			// Exit inner for loop if we've written all the chunks.
   460  			rem -= sz
   461  			if rem == 0 {
   462  				break
   463  			}
   464  
   465  			// Otherwise move offset forward and move pointer to next chunk.
   466  			offset += int64(sz)
   467  			written += uintptr(sz)
   468  		}
   469  	}
   470  
   471  	// Ignore file sync if flag is set on DB.
   472  	if !tx.db.NoSync || IgnoreNoSync {
   473  		if err := fdatasync(tx.db); err != nil {
   474  			return err
   475  		}
   476  	}
   477  
   478  	// Put small pages back to page pool.
   479  	for _, p := range pages {
   480  		// Ignore page sizes over 1 page.
   481  		// These are allocated using make() instead of the page pool.
   482  		if int(p.overflow) != 0 {
   483  			continue
   484  		}
   485  
   486  		buf := unsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)
   487  
   488  		// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
   489  		for i := range buf {
   490  			buf[i] = 0
   491  		}
   492  		tx.db.pagePool.Put(buf) //nolint:staticcheck
   493  	}
   494  
   495  	return nil
   496  }
   497  
   498  // writeMeta writes the meta to the disk.
   499  func (tx *Tx) writeMeta() error {
   500  	// Create a temporary buffer for the meta page.
   501  	buf := make([]byte, tx.db.pageSize)
   502  	p := tx.db.pageInBuffer(buf, 0)
   503  	tx.meta.write(p)
   504  
   505  	// Write the meta page to file.
   506  	if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
   507  		return err
   508  	}
   509  	if !tx.db.NoSync || IgnoreNoSync {
   510  		if err := fdatasync(tx.db); err != nil {
   511  			return err
   512  		}
   513  	}
   514  
   515  	// Update statistics.
   516  	tx.stats.IncWrite(1)
   517  
   518  	return nil
   519  }
   520  
   521  // page returns a reference to the page with a given id.
   522  // If page has been written to then a temporary buffered page is returned.
   523  func (tx *Tx) page(id pgid) *page {
   524  	// Check the dirty pages first.
   525  	if tx.pages != nil {
   526  		if p, ok := tx.pages[id]; ok {
   527  			p.fastCheck(id)
   528  			return p
   529  		}
   530  	}
   531  
   532  	// Otherwise return directly from the mmap.
   533  	p := tx.db.page(id)
   534  	p.fastCheck(id)
   535  	return p
   536  }
   537  
   538  // forEachPage iterates over every page within a given page and executes a function.
   539  func (tx *Tx) forEachPage(pgidnum pgid, fn func(*page, int, []pgid)) {
   540  	stack := make([]pgid, 10)
   541  	stack[0] = pgidnum
   542  	tx.forEachPageInternal(stack[:1], fn)
   543  }
   544  
   545  func (tx *Tx) forEachPageInternal(pgidstack []pgid, fn func(*page, int, []pgid)) {
   546  	p := tx.page(pgidstack[len(pgidstack)-1])
   547  
   548  	// Execute function.
   549  	fn(p, len(pgidstack)-1, pgidstack)
   550  
   551  	// Recursively loop over children.
   552  	if (p.flags & branchPageFlag) != 0 {
   553  		for i := 0; i < int(p.count); i++ {
   554  			elem := p.branchPageElement(uint16(i))
   555  			tx.forEachPageInternal(append(pgidstack, elem.pgid), fn)
   556  		}
   557  	}
   558  }
   559  
   560  // Page returns page information for a given page number.
   561  // This is only safe for concurrent use when used by a writable transaction.
   562  func (tx *Tx) Page(id int) (*PageInfo, error) {
   563  	if tx.db == nil {
   564  		return nil, ErrTxClosed
   565  	} else if pgid(id) >= tx.meta.pgid {
   566  		return nil, nil
   567  	}
   568  
   569  	if tx.db.freelist == nil {
   570  		return nil, ErrFreePagesNotLoaded
   571  	}
   572  
   573  	// Build the page info.
   574  	p := tx.db.page(pgid(id))
   575  	info := &PageInfo{
   576  		ID:            id,
   577  		Count:         int(p.count),
   578  		OverflowCount: int(p.overflow),
   579  	}
   580  
   581  	// Determine the type (or if it's free).
   582  	if tx.db.freelist.freed(pgid(id)) {
   583  		info.Type = "free"
   584  	} else {
   585  		info.Type = p.typ()
   586  	}
   587  
   588  	return info, nil
   589  }
   590  
   591  // TxStats represents statistics about the actions performed by the transaction.
   592  type TxStats struct {
   593  	// Page statistics.
   594  	//
   595  	// DEPRECATED: Use GetPageCount() or IncPageCount()
   596  	PageCount int64 // number of page allocations
   597  	// DEPRECATED: Use GetPageAlloc() or IncPageAlloc()
   598  	PageAlloc int64 // total bytes allocated
   599  
   600  	// Cursor statistics.
   601  	//
   602  	// DEPRECATED: Use GetCursorCount() or IncCursorCount()
   603  	CursorCount int64 // number of cursors created
   604  
   605  	// Node statistics
   606  	//
   607  	// DEPRECATED: Use GetNodeCount() or IncNodeCount()
   608  	NodeCount int64 // number of node allocations
   609  	// DEPRECATED: Use GetNodeDeref() or IncNodeDeref()
   610  	NodeDeref int64 // number of node dereferences
   611  
   612  	// Rebalance statistics.
   613  	//
   614  	// DEPRECATED: Use GetRebalance() or IncRebalance()
   615  	Rebalance int64 // number of node rebalances
   616  	// DEPRECATED: Use GetRebalanceTime() or IncRebalanceTime()
   617  	RebalanceTime time.Duration // total time spent rebalancing
   618  
   619  	// Split/Spill statistics.
   620  	//
   621  	// DEPRECATED: Use GetSplit() or IncSplit()
   622  	Split int64 // number of nodes split
   623  	// DEPRECATED: Use GetSpill() or IncSpill()
   624  	Spill int64 // number of nodes spilled
   625  	// DEPRECATED: Use GetSpillTime() or IncSpillTime()
   626  	SpillTime time.Duration // total time spent spilling
   627  
   628  	// Write statistics.
   629  	//
   630  	// DEPRECATED: Use GetWrite() or IncWrite()
   631  	Write int64 // number of writes performed
   632  	// DEPRECATED: Use GetWriteTime() or IncWriteTime()
   633  	WriteTime time.Duration // total time spent writing to disk
   634  }
   635  
   636  func (s *TxStats) add(other *TxStats) {
   637  	s.IncPageCount(other.GetPageCount())
   638  	s.IncPageAlloc(other.GetPageAlloc())
   639  	s.IncCursorCount(other.GetCursorCount())
   640  	s.IncNodeCount(other.GetNodeCount())
   641  	s.IncNodeDeref(other.GetNodeDeref())
   642  	s.IncRebalance(other.GetRebalance())
   643  	s.IncRebalanceTime(other.GetRebalanceTime())
   644  	s.IncSplit(other.GetSplit())
   645  	s.IncSpill(other.GetSpill())
   646  	s.IncSpillTime(other.GetSpillTime())
   647  	s.IncWrite(other.GetWrite())
   648  	s.IncWriteTime(other.GetWriteTime())
   649  }
   650  
   651  // Sub calculates and returns the difference between two sets of transaction stats.
   652  // This is useful when obtaining stats at two different points and time and
   653  // you need the performance counters that occurred within that time span.
   654  func (s *TxStats) Sub(other *TxStats) TxStats {
   655  	var diff TxStats
   656  	diff.PageCount = s.GetPageCount() - other.GetPageCount()
   657  	diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
   658  	diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
   659  	diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
   660  	diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
   661  	diff.Rebalance = s.GetRebalance() - other.GetRebalance()
   662  	diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
   663  	diff.Split = s.GetSplit() - other.GetSplit()
   664  	diff.Spill = s.GetSpill() - other.GetSpill()
   665  	diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
   666  	diff.Write = s.GetWrite() - other.GetWrite()
   667  	diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
   668  	return diff
   669  }
   670  
   671  // GetPageCount returns PageCount atomically.
   672  func (s *TxStats) GetPageCount() int64 {
   673  	return atomic.LoadInt64(&s.PageCount)
   674  }
   675  
   676  // IncPageCount increases PageCount atomically and returns the new value.
   677  func (s *TxStats) IncPageCount(delta int64) int64 {
   678  	return atomic.AddInt64(&s.PageCount, delta)
   679  }
   680  
   681  // GetPageAlloc returns PageAlloc atomically.
   682  func (s *TxStats) GetPageAlloc() int64 {
   683  	return atomic.LoadInt64(&s.PageAlloc)
   684  }
   685  
   686  // IncPageAlloc increases PageAlloc atomically and returns the new value.
   687  func (s *TxStats) IncPageAlloc(delta int64) int64 {
   688  	return atomic.AddInt64(&s.PageAlloc, delta)
   689  }
   690  
   691  // GetCursorCount returns CursorCount atomically.
   692  func (s *TxStats) GetCursorCount() int64 {
   693  	return atomic.LoadInt64(&s.CursorCount)
   694  }
   695  
   696  // IncCursorCount increases CursorCount atomically and return the new value.
   697  func (s *TxStats) IncCursorCount(delta int64) int64 {
   698  	return atomic.AddInt64(&s.CursorCount, delta)
   699  }
   700  
   701  // GetNodeCount returns NodeCount atomically.
   702  func (s *TxStats) GetNodeCount() int64 {
   703  	return atomic.LoadInt64(&s.NodeCount)
   704  }
   705  
   706  // IncNodeCount increases NodeCount atomically and returns the new value.
   707  func (s *TxStats) IncNodeCount(delta int64) int64 {
   708  	return atomic.AddInt64(&s.NodeCount, delta)
   709  }
   710  
   711  // GetNodeDeref returns NodeDeref atomically.
   712  func (s *TxStats) GetNodeDeref() int64 {
   713  	return atomic.LoadInt64(&s.NodeDeref)
   714  }
   715  
   716  // IncNodeDeref increases NodeDeref atomically and returns the new value.
   717  func (s *TxStats) IncNodeDeref(delta int64) int64 {
   718  	return atomic.AddInt64(&s.NodeDeref, delta)
   719  }
   720  
   721  // GetRebalance returns Rebalance atomically.
   722  func (s *TxStats) GetRebalance() int64 {
   723  	return atomic.LoadInt64(&s.Rebalance)
   724  }
   725  
   726  // IncRebalance increases Rebalance atomically and returns the new value.
   727  func (s *TxStats) IncRebalance(delta int64) int64 {
   728  	return atomic.AddInt64(&s.Rebalance, delta)
   729  }
   730  
   731  // GetRebalanceTime returns RebalanceTime atomically.
   732  func (s *TxStats) GetRebalanceTime() time.Duration {
   733  	return atomicLoadDuration(&s.RebalanceTime)
   734  }
   735  
   736  // IncRebalanceTime increases RebalanceTime atomically and returns the new value.
   737  func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
   738  	return atomicAddDuration(&s.RebalanceTime, delta)
   739  }
   740  
   741  // GetSplit returns Split atomically.
   742  func (s *TxStats) GetSplit() int64 {
   743  	return atomic.LoadInt64(&s.Split)
   744  }
   745  
   746  // IncSplit increases Split atomically and returns the new value.
   747  func (s *TxStats) IncSplit(delta int64) int64 {
   748  	return atomic.AddInt64(&s.Split, delta)
   749  }
   750  
   751  // GetSpill returns Spill atomically.
   752  func (s *TxStats) GetSpill() int64 {
   753  	return atomic.LoadInt64(&s.Spill)
   754  }
   755  
   756  // IncSpill increases Spill atomically and returns the new value.
   757  func (s *TxStats) IncSpill(delta int64) int64 {
   758  	return atomic.AddInt64(&s.Spill, delta)
   759  }
   760  
   761  // GetSpillTime returns SpillTime atomically.
   762  func (s *TxStats) GetSpillTime() time.Duration {
   763  	return atomicLoadDuration(&s.SpillTime)
   764  }
   765  
   766  // IncSpillTime increases SpillTime atomically and returns the new value.
   767  func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
   768  	return atomicAddDuration(&s.SpillTime, delta)
   769  }
   770  
   771  // GetWrite returns Write atomically.
   772  func (s *TxStats) GetWrite() int64 {
   773  	return atomic.LoadInt64(&s.Write)
   774  }
   775  
   776  // IncWrite increases Write atomically and returns the new value.
   777  func (s *TxStats) IncWrite(delta int64) int64 {
   778  	return atomic.AddInt64(&s.Write, delta)
   779  }
   780  
   781  // GetWriteTime returns WriteTime atomically.
   782  func (s *TxStats) GetWriteTime() time.Duration {
   783  	return atomicLoadDuration(&s.WriteTime)
   784  }
   785  
   786  // IncWriteTime increases WriteTime atomically and returns the new value.
   787  func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
   788  	return atomicAddDuration(&s.WriteTime, delta)
   789  }
   790  
   791  func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
   792  	return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
   793  }
   794  
   795  func atomicLoadDuration(ptr *time.Duration) time.Duration {
   796  	return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
   797  }
   798  

View as plain text