...

Source file src/github.com/syndtr/goleveldb/leveldb/db_transaction.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"errors"
    11  	"sync"
    12  	"time"
    13  
    14  	"github.com/syndtr/goleveldb/leveldb/iterator"
    15  	"github.com/syndtr/goleveldb/leveldb/opt"
    16  	"github.com/syndtr/goleveldb/leveldb/util"
    17  )
    18  
    19  var errTransactionDone = errors.New("leveldb: transaction already closed")
    20  
    21  // Transaction is the transaction handle.
    22  type Transaction struct {
    23  	db        *DB
    24  	lk        sync.RWMutex
    25  	seq       uint64
    26  	mem       *memDB
    27  	tables    tFiles
    28  	ikScratch []byte
    29  	rec       sessionRecord
    30  	stats     cStatStaging
    31  	closed    bool
    32  }
    33  
    34  // Get gets the value for the given key. It returns ErrNotFound if the
    35  // DB does not contains the key.
    36  //
    37  // The returned slice is its own copy, it is safe to modify the contents
    38  // of the returned slice.
    39  // It is safe to modify the contents of the argument after Get returns.
    40  func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
    41  	tr.lk.RLock()
    42  	defer tr.lk.RUnlock()
    43  	if tr.closed {
    44  		return nil, errTransactionDone
    45  	}
    46  	return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
    47  }
    48  
    49  // Has returns true if the DB does contains the given key.
    50  //
    51  // It is safe to modify the contents of the argument after Has returns.
    52  func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
    53  	tr.lk.RLock()
    54  	defer tr.lk.RUnlock()
    55  	if tr.closed {
    56  		return false, errTransactionDone
    57  	}
    58  	return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
    59  }
    60  
    61  // NewIterator returns an iterator for the latest snapshot of the transaction.
    62  // The returned iterator is not safe for concurrent use, but it is safe to use
    63  // multiple iterators concurrently, with each in a dedicated goroutine.
    64  // It is also safe to use an iterator concurrently while writes to the
    65  // transaction. The resultant key/value pairs are guaranteed to be consistent.
    66  //
    67  // Slice allows slicing the iterator to only contains keys in the given
    68  // range. A nil Range.Start is treated as a key before all keys in the
    69  // DB. And a nil Range.Limit is treated as a key after all keys in
    70  // the DB.
    71  //
    72  // The returned iterator has locks on its own resources, so it can live beyond
    73  // the lifetime of the transaction who creates them.
    74  //
    75  // WARNING: Any slice returned by interator (e.g. slice returned by calling
    76  // Iterator.Key() or Iterator.Key() methods), its content should not be modified
    77  // unless noted otherwise.
    78  //
    79  // The iterator must be released after use, by calling Release method.
    80  //
    81  // Also read Iterator documentation of the leveldb/iterator package.
    82  func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
    83  	tr.lk.RLock()
    84  	defer tr.lk.RUnlock()
    85  	if tr.closed {
    86  		return iterator.NewEmptyIterator(errTransactionDone)
    87  	}
    88  	tr.mem.incref()
    89  	return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
    90  }
    91  
    92  func (tr *Transaction) flush() error {
    93  	// Flush memdb.
    94  	if tr.mem.Len() != 0 {
    95  		tr.stats.startTimer()
    96  		iter := tr.mem.NewIterator(nil)
    97  		t, n, err := tr.db.s.tops.createFrom(iter)
    98  		iter.Release()
    99  		tr.stats.stopTimer()
   100  		if err != nil {
   101  			return err
   102  		}
   103  		if tr.mem.getref() == 1 {
   104  			tr.mem.Reset()
   105  		} else {
   106  			tr.mem.decref()
   107  			tr.mem = tr.db.mpoolGet(0)
   108  			tr.mem.incref()
   109  		}
   110  		tr.tables = append(tr.tables, t)
   111  		tr.rec.addTableFile(0, t)
   112  		tr.stats.write += t.size
   113  		tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(t.size), t.imin, t.imax)
   114  	}
   115  	return nil
   116  }
   117  
   118  func (tr *Transaction) put(kt keyType, key, value []byte) error {
   119  	tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
   120  	if tr.mem.Free() < len(tr.ikScratch)+len(value) {
   121  		if err := tr.flush(); err != nil {
   122  			return err
   123  		}
   124  	}
   125  	if err := tr.mem.Put(tr.ikScratch, value); err != nil {
   126  		return err
   127  	}
   128  	tr.seq++
   129  	return nil
   130  }
   131  
   132  // Put sets the value for the given key. It overwrites any previous value
   133  // for that key; a DB is not a multi-map.
   134  // Please note that the transaction is not compacted until committed, so if you
   135  // writes 10 same keys, then those 10 same keys are in the transaction.
   136  //
   137  // It is safe to modify the contents of the arguments after Put returns.
   138  func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
   139  	tr.lk.Lock()
   140  	defer tr.lk.Unlock()
   141  	if tr.closed {
   142  		return errTransactionDone
   143  	}
   144  	return tr.put(keyTypeVal, key, value)
   145  }
   146  
   147  // Delete deletes the value for the given key.
   148  // Please note that the transaction is not compacted until committed, so if you
   149  // writes 10 same keys, then those 10 same keys are in the transaction.
   150  //
   151  // It is safe to modify the contents of the arguments after Delete returns.
   152  func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
   153  	tr.lk.Lock()
   154  	defer tr.lk.Unlock()
   155  	if tr.closed {
   156  		return errTransactionDone
   157  	}
   158  	return tr.put(keyTypeDel, key, nil)
   159  }
   160  
   161  // Write apply the given batch to the transaction. The batch will be applied
   162  // sequentially.
   163  // Please note that the transaction is not compacted until committed, so if you
   164  // writes 10 same keys, then those 10 same keys are in the transaction.
   165  //
   166  // It is safe to modify the contents of the arguments after Write returns.
   167  func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
   168  	if b == nil || b.Len() == 0 {
   169  		return nil
   170  	}
   171  
   172  	tr.lk.Lock()
   173  	defer tr.lk.Unlock()
   174  	if tr.closed {
   175  		return errTransactionDone
   176  	}
   177  	return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
   178  		return tr.put(kt, k, v)
   179  	})
   180  }
   181  
   182  func (tr *Transaction) setDone() {
   183  	tr.closed = true
   184  	tr.db.tr = nil
   185  	tr.mem.decref()
   186  	<-tr.db.writeLockC
   187  }
   188  
   189  // Commit commits the transaction. If error is not nil, then the transaction is
   190  // not committed, it can then either be retried or discarded.
   191  //
   192  // Other methods should not be called after transaction has been committed.
   193  func (tr *Transaction) Commit() error {
   194  	if err := tr.db.ok(); err != nil {
   195  		return err
   196  	}
   197  
   198  	tr.lk.Lock()
   199  	defer tr.lk.Unlock()
   200  	if tr.closed {
   201  		return errTransactionDone
   202  	}
   203  	if err := tr.flush(); err != nil {
   204  		// Return error, lets user decide either to retry or discard
   205  		// transaction.
   206  		return err
   207  	}
   208  	if len(tr.tables) != 0 {
   209  		// Committing transaction.
   210  		tr.rec.setSeqNum(tr.seq)
   211  		tr.db.compCommitLk.Lock()
   212  		tr.stats.startTimer()
   213  		var cerr error
   214  		for retry := 0; retry < 3; retry++ {
   215  			cerr = tr.db.s.commit(&tr.rec, false)
   216  			if cerr != nil {
   217  				tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
   218  				select {
   219  				case <-time.After(time.Second):
   220  				case <-tr.db.closeC:
   221  					tr.db.logf("transaction@commit exiting")
   222  					tr.db.compCommitLk.Unlock()
   223  					return cerr
   224  				}
   225  			} else {
   226  				// Success. Set db.seq.
   227  				tr.db.setSeq(tr.seq)
   228  				break
   229  			}
   230  		}
   231  		tr.stats.stopTimer()
   232  		if cerr != nil {
   233  			// Return error, lets user decide either to retry or discard
   234  			// transaction.
   235  			return cerr
   236  		}
   237  
   238  		// Update compaction stats. This is safe as long as we hold compCommitLk.
   239  		tr.db.compStats.addStat(0, &tr.stats)
   240  
   241  		// Trigger table auto-compaction.
   242  		tr.db.compTrigger(tr.db.tcompCmdC)
   243  		tr.db.compCommitLk.Unlock()
   244  
   245  		// Additionally, wait compaction when certain threshold reached.
   246  		// Ignore error, returns error only if transaction can't be committed.
   247  		_ = tr.db.waitCompaction()
   248  	}
   249  	// Only mark as done if transaction committed successfully.
   250  	tr.setDone()
   251  	return nil
   252  }
   253  
   254  func (tr *Transaction) discard() {
   255  	// Discard transaction.
   256  	for _, t := range tr.tables {
   257  		tr.db.logf("transaction@discard @%d", t.fd.Num)
   258  		// Iterator may still use the table, so we use tOps.remove here.
   259  		tr.db.s.tops.remove(t.fd)
   260  	}
   261  }
   262  
   263  // Discard discards the transaction.
   264  // This method is noop if transaction is already closed (either committed or
   265  // discarded)
   266  //
   267  // Other methods should not be called after transaction has been discarded.
   268  func (tr *Transaction) Discard() {
   269  	tr.lk.Lock()
   270  	if !tr.closed {
   271  		tr.discard()
   272  		tr.setDone()
   273  	}
   274  	tr.lk.Unlock()
   275  }
   276  
   277  func (db *DB) waitCompaction() error {
   278  	if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
   279  		return db.compTriggerWait(db.tcompCmdC)
   280  	}
   281  	return nil
   282  }
   283  
   284  // OpenTransaction opens an atomic DB transaction. Only one transaction can be
   285  // opened at a time. Subsequent call to Write and OpenTransaction will be blocked
   286  // until in-flight transaction is committed or discarded.
   287  // The returned transaction handle is safe for concurrent use.
   288  //
   289  // Transaction is very expensive and can overwhelm compaction, especially if
   290  // transaction size is small. Use with caution.
   291  // The rule of thumb is if you need to merge at least same amount of
   292  // `Options.WriteBuffer` worth of data then use transaction, otherwise don't.
   293  //
   294  // The transaction must be closed once done, either by committing or discarding
   295  // the transaction.
   296  // Closing the DB will discard open transaction.
   297  func (db *DB) OpenTransaction() (*Transaction, error) {
   298  	if err := db.ok(); err != nil {
   299  		return nil, err
   300  	}
   301  
   302  	// The write happen synchronously.
   303  	select {
   304  	case db.writeLockC <- struct{}{}:
   305  	case err := <-db.compPerErrC:
   306  		return nil, err
   307  	case <-db.closeC:
   308  		return nil, ErrClosed
   309  	}
   310  
   311  	if db.tr != nil {
   312  		panic("leveldb: has open transaction")
   313  	}
   314  
   315  	// Flush current memdb.
   316  	if db.mem != nil && db.mem.Len() != 0 {
   317  		if _, err := db.rotateMem(0, true); err != nil {
   318  			return nil, err
   319  		}
   320  	}
   321  
   322  	// Wait compaction when certain threshold reached.
   323  	if err := db.waitCompaction(); err != nil {
   324  		return nil, err
   325  	}
   326  
   327  	tr := &Transaction{
   328  		db:  db,
   329  		seq: db.seq,
   330  		mem: db.mpoolGet(0),
   331  	}
   332  	tr.mem.incref()
   333  	db.tr = tr
   334  	return tr, nil
   335  }
   336  

View as plain text