...

Source file src/github.com/syndtr/goleveldb/leveldb/db_write.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"sync/atomic"
    11  	"time"
    12  
    13  	"github.com/syndtr/goleveldb/leveldb/memdb"
    14  	"github.com/syndtr/goleveldb/leveldb/opt"
    15  	"github.com/syndtr/goleveldb/leveldb/util"
    16  )
    17  
    18  func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
    19  	wr, err := db.journal.Next()
    20  	if err != nil {
    21  		return err
    22  	}
    23  	if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
    24  		return err
    25  	}
    26  	if err := db.journal.Flush(); err != nil {
    27  		return err
    28  	}
    29  	if sync {
    30  		return db.journalWriter.Sync()
    31  	}
    32  	return nil
    33  }
    34  
    35  func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
    36  	retryLimit := 3
    37  retry:
    38  	// Wait for pending memdb compaction.
    39  	err = db.compTriggerWait(db.mcompCmdC)
    40  	if err != nil {
    41  		return
    42  	}
    43  	retryLimit--
    44  
    45  	// Create new memdb and journal.
    46  	mem, err = db.newMem(n)
    47  	if err != nil {
    48  		if err == errHasFrozenMem {
    49  			if retryLimit <= 0 {
    50  				panic("BUG: still has frozen memdb")
    51  			}
    52  			goto retry
    53  		}
    54  		return
    55  	}
    56  
    57  	// Schedule memdb compaction.
    58  	if wait {
    59  		err = db.compTriggerWait(db.mcompCmdC)
    60  	} else {
    61  		db.compTrigger(db.mcompCmdC)
    62  	}
    63  	return
    64  }
    65  
    66  func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
    67  	delayed := false
    68  	slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
    69  	pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
    70  	flush := func() (retry bool) {
    71  		mdb = db.getEffectiveMem()
    72  		if mdb == nil {
    73  			err = ErrClosed
    74  			return false
    75  		}
    76  		defer func() {
    77  			if retry {
    78  				mdb.decref()
    79  				mdb = nil
    80  			}
    81  		}()
    82  		tLen := db.s.tLen(0)
    83  		mdbFree = mdb.Free()
    84  		switch {
    85  		case tLen >= slowdownTrigger && !delayed:
    86  			delayed = true
    87  			time.Sleep(time.Millisecond)
    88  		case mdbFree >= n:
    89  			return false
    90  		case tLen >= pauseTrigger:
    91  			delayed = true
    92  			// Set the write paused flag explicitly.
    93  			atomic.StoreInt32(&db.inWritePaused, 1)
    94  			err = db.compTriggerWait(db.tcompCmdC)
    95  			// Unset the write paused flag.
    96  			atomic.StoreInt32(&db.inWritePaused, 0)
    97  			if err != nil {
    98  				return false
    99  			}
   100  		default:
   101  			// Allow memdb to grow if it has no entry.
   102  			if mdb.Len() == 0 {
   103  				mdbFree = n
   104  			} else {
   105  				mdb.decref()
   106  				mdb, err = db.rotateMem(n, false)
   107  				if err == nil {
   108  					mdbFree = mdb.Free()
   109  				} else {
   110  					mdbFree = 0
   111  				}
   112  			}
   113  			return false
   114  		}
   115  		return true
   116  	}
   117  	start := time.Now()
   118  	for flush() {
   119  	}
   120  	if delayed {
   121  		db.writeDelay += time.Since(start)
   122  		db.writeDelayN++
   123  	} else if db.writeDelayN > 0 {
   124  		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
   125  		atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
   126  		atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
   127  		db.writeDelay = 0
   128  		db.writeDelayN = 0
   129  	}
   130  	return
   131  }
   132  
   133  type writeMerge struct {
   134  	sync       bool
   135  	batch      *Batch
   136  	keyType    keyType
   137  	key, value []byte
   138  }
   139  
   140  func (db *DB) unlockWrite(overflow bool, merged int, err error) {
   141  	for i := 0; i < merged; i++ {
   142  		db.writeAckC <- err
   143  	}
   144  	if overflow {
   145  		// Pass lock to the next write (that failed to merge).
   146  		db.writeMergedC <- false
   147  	} else {
   148  		// Release lock.
   149  		<-db.writeLockC
   150  	}
   151  }
   152  
   153  // ourBatch is batch that we can modify.
   154  func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
   155  	// Try to flush memdb. This method would also trying to throttle writes
   156  	// if it is too fast and compaction cannot catch-up.
   157  	mdb, mdbFree, err := db.flush(batch.internalLen)
   158  	if err != nil {
   159  		db.unlockWrite(false, 0, err)
   160  		return err
   161  	}
   162  	defer mdb.decref()
   163  
   164  	var (
   165  		overflow bool
   166  		merged   int
   167  		batches  = []*Batch{batch}
   168  	)
   169  
   170  	if merge {
   171  		// Merge limit.
   172  		var mergeLimit int
   173  		if batch.internalLen > 128<<10 {
   174  			mergeLimit = (1 << 20) - batch.internalLen
   175  		} else {
   176  			mergeLimit = 128 << 10
   177  		}
   178  		mergeCap := mdbFree - batch.internalLen
   179  		if mergeLimit > mergeCap {
   180  			mergeLimit = mergeCap
   181  		}
   182  
   183  	merge:
   184  		for mergeLimit > 0 {
   185  			select {
   186  			case incoming := <-db.writeMergeC:
   187  				if incoming.batch != nil {
   188  					// Merge batch.
   189  					if incoming.batch.internalLen > mergeLimit {
   190  						overflow = true
   191  						break merge
   192  					}
   193  					batches = append(batches, incoming.batch)
   194  					mergeLimit -= incoming.batch.internalLen
   195  				} else {
   196  					// Merge put.
   197  					internalLen := len(incoming.key) + len(incoming.value) + 8
   198  					if internalLen > mergeLimit {
   199  						overflow = true
   200  						break merge
   201  					}
   202  					if ourBatch == nil {
   203  						ourBatch = db.batchPool.Get().(*Batch)
   204  						ourBatch.Reset()
   205  						batches = append(batches, ourBatch)
   206  					}
   207  					// We can use same batch since concurrent write doesn't
   208  					// guarantee write order.
   209  					ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
   210  					mergeLimit -= internalLen
   211  				}
   212  				sync = sync || incoming.sync
   213  				merged++
   214  				db.writeMergedC <- true
   215  
   216  			default:
   217  				break merge
   218  			}
   219  		}
   220  	}
   221  
   222  	// Release ourBatch if any.
   223  	if ourBatch != nil {
   224  		defer db.batchPool.Put(ourBatch)
   225  	}
   226  
   227  	// Seq number.
   228  	seq := db.seq + 1
   229  
   230  	// Write journal.
   231  	if err := db.writeJournal(batches, seq, sync); err != nil {
   232  		db.unlockWrite(overflow, merged, err)
   233  		return err
   234  	}
   235  
   236  	// Put batches.
   237  	for _, batch := range batches {
   238  		if err := batch.putMem(seq, mdb.DB); err != nil {
   239  			panic(err)
   240  		}
   241  		seq += uint64(batch.Len())
   242  	}
   243  
   244  	// Incr seq number.
   245  	db.addSeq(uint64(batchesLen(batches)))
   246  
   247  	// Rotate memdb if it's reach the threshold.
   248  	if batch.internalLen >= mdbFree {
   249  		if _, err := db.rotateMem(0, false); err != nil {
   250  			db.unlockWrite(overflow, merged, err)
   251  			return err
   252  		}
   253  	}
   254  
   255  	db.unlockWrite(overflow, merged, nil)
   256  	return nil
   257  }
   258  
   259  // Write apply the given batch to the DB. The batch records will be applied
   260  // sequentially. Write might be used concurrently, when used concurrently and
   261  // batch is small enough, write will try to merge the batches. Set NoWriteMerge
   262  // option to true to disable write merge.
   263  //
   264  // It is safe to modify the contents of the arguments after Write returns but
   265  // not before. Write will not modify content of the batch.
   266  func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
   267  	if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
   268  		return err
   269  	}
   270  
   271  	// If the batch size is larger than write buffer, it may justified to write
   272  	// using transaction instead. Using transaction the batch will be written
   273  	// into tables directly, skipping the journaling.
   274  	if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
   275  		tr, err := db.OpenTransaction()
   276  		if err != nil {
   277  			return err
   278  		}
   279  		if err := tr.Write(batch, wo); err != nil {
   280  			tr.Discard()
   281  			return err
   282  		}
   283  		return tr.Commit()
   284  	}
   285  
   286  	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
   287  	sync := wo.GetSync() && !db.s.o.GetNoSync()
   288  
   289  	// Acquire write lock.
   290  	if merge {
   291  		select {
   292  		case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
   293  			if <-db.writeMergedC {
   294  				// Write is merged.
   295  				return <-db.writeAckC
   296  			}
   297  			// Write is not merged, the write lock is handed to us. Continue.
   298  		case db.writeLockC <- struct{}{}:
   299  			// Write lock acquired.
   300  		case err := <-db.compPerErrC:
   301  			// Compaction error.
   302  			return err
   303  		case <-db.closeC:
   304  			// Closed
   305  			return ErrClosed
   306  		}
   307  	} else {
   308  		select {
   309  		case db.writeLockC <- struct{}{}:
   310  			// Write lock acquired.
   311  		case err := <-db.compPerErrC:
   312  			// Compaction error.
   313  			return err
   314  		case <-db.closeC:
   315  			// Closed
   316  			return ErrClosed
   317  		}
   318  	}
   319  
   320  	return db.writeLocked(batch, nil, merge, sync)
   321  }
   322  
   323  func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
   324  	if err := db.ok(); err != nil {
   325  		return err
   326  	}
   327  
   328  	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
   329  	sync := wo.GetSync() && !db.s.o.GetNoSync()
   330  
   331  	// Acquire write lock.
   332  	if merge {
   333  		select {
   334  		case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
   335  			if <-db.writeMergedC {
   336  				// Write is merged.
   337  				return <-db.writeAckC
   338  			}
   339  			// Write is not merged, the write lock is handed to us. Continue.
   340  		case db.writeLockC <- struct{}{}:
   341  			// Write lock acquired.
   342  		case err := <-db.compPerErrC:
   343  			// Compaction error.
   344  			return err
   345  		case <-db.closeC:
   346  			// Closed
   347  			return ErrClosed
   348  		}
   349  	} else {
   350  		select {
   351  		case db.writeLockC <- struct{}{}:
   352  			// Write lock acquired.
   353  		case err := <-db.compPerErrC:
   354  			// Compaction error.
   355  			return err
   356  		case <-db.closeC:
   357  			// Closed
   358  			return ErrClosed
   359  		}
   360  	}
   361  
   362  	batch := db.batchPool.Get().(*Batch)
   363  	batch.Reset()
   364  	batch.appendRec(kt, key, value)
   365  	return db.writeLocked(batch, batch, merge, sync)
   366  }
   367  
   368  // Put sets the value for the given key. It overwrites any previous value
   369  // for that key; a DB is not a multi-map. Write merge also applies for Put, see
   370  // Write.
   371  //
   372  // It is safe to modify the contents of the arguments after Put returns but not
   373  // before.
   374  func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
   375  	return db.putRec(keyTypeVal, key, value, wo)
   376  }
   377  
   378  // Delete deletes the value for the given key. Delete will not returns error if
   379  // key doesn't exist. Write merge also applies for Delete, see Write.
   380  //
   381  // It is safe to modify the contents of the arguments after Delete returns but
   382  // not before.
   383  func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
   384  	return db.putRec(keyTypeDel, key, nil, wo)
   385  }
   386  
   387  func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
   388  	iter := mem.NewIterator(nil)
   389  	defer iter.Release()
   390  	return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
   391  		(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
   392  }
   393  
   394  // CompactRange compacts the underlying DB for the given key range.
   395  // In particular, deleted and overwritten versions are discarded,
   396  // and the data is rearranged to reduce the cost of operations
   397  // needed to access the data. This operation should typically only
   398  // be invoked by users who understand the underlying implementation.
   399  //
   400  // A nil Range.Start is treated as a key before all keys in the DB.
   401  // And a nil Range.Limit is treated as a key after all keys in the DB.
   402  // Therefore if both is nil then it will compact entire DB.
   403  func (db *DB) CompactRange(r util.Range) error {
   404  	if err := db.ok(); err != nil {
   405  		return err
   406  	}
   407  
   408  	// Lock writer.
   409  	select {
   410  	case db.writeLockC <- struct{}{}:
   411  	case err := <-db.compPerErrC:
   412  		return err
   413  	case <-db.closeC:
   414  		return ErrClosed
   415  	}
   416  
   417  	// Check for overlaps in memdb.
   418  	mdb := db.getEffectiveMem()
   419  	if mdb == nil {
   420  		return ErrClosed
   421  	}
   422  	defer mdb.decref()
   423  	if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
   424  		// Memdb compaction.
   425  		if _, err := db.rotateMem(0, false); err != nil {
   426  			<-db.writeLockC
   427  			return err
   428  		}
   429  		<-db.writeLockC
   430  		if err := db.compTriggerWait(db.mcompCmdC); err != nil {
   431  			return err
   432  		}
   433  	} else {
   434  		<-db.writeLockC
   435  	}
   436  
   437  	// Table compaction.
   438  	return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
   439  }
   440  
   441  // SetReadOnly makes DB read-only. It will stay read-only until reopened.
   442  func (db *DB) SetReadOnly() error {
   443  	if err := db.ok(); err != nil {
   444  		return err
   445  	}
   446  
   447  	// Lock writer.
   448  	select {
   449  	case db.writeLockC <- struct{}{}:
   450  		db.compWriteLocking = true
   451  	case err := <-db.compPerErrC:
   452  		return err
   453  	case <-db.closeC:
   454  		return ErrClosed
   455  	}
   456  
   457  	// Set compaction read-only.
   458  	select {
   459  	case db.compErrSetC <- ErrReadOnly:
   460  	case perr := <-db.compPerErrC:
   461  		return perr
   462  	case <-db.closeC:
   463  		return ErrClosed
   464  	}
   465  
   466  	return nil
   467  }
   468  

View as plain text