...

Source file src/github.com/syndtr/goleveldb/leveldb/batch.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"encoding/binary"
    11  	"fmt"
    12  	"io"
    13  
    14  	"github.com/syndtr/goleveldb/leveldb/errors"
    15  	"github.com/syndtr/goleveldb/leveldb/memdb"
    16  	"github.com/syndtr/goleveldb/leveldb/storage"
    17  )
    18  
    19  // ErrBatchCorrupted records reason of batch corruption. This error will be
    20  // wrapped with errors.ErrCorrupted.
    21  type ErrBatchCorrupted struct {
    22  	Reason string
    23  }
    24  
    25  func (e *ErrBatchCorrupted) Error() string {
    26  	return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason)
    27  }
    28  
    29  func newErrBatchCorrupted(reason string) error {
    30  	return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason})
    31  }
    32  
    33  const (
    34  	batchHeaderLen = 8 + 4
    35  	batchGrowLimit = 3000
    36  )
    37  
    38  // BatchReplay wraps basic batch operations.
    39  type BatchReplay interface {
    40  	Put(key, value []byte)
    41  	Delete(key []byte)
    42  }
    43  
    44  type batchIndex struct {
    45  	keyType            keyType
    46  	keyPos, keyLen     int
    47  	valuePos, valueLen int
    48  }
    49  
    50  func (index batchIndex) k(data []byte) []byte {
    51  	return data[index.keyPos : index.keyPos+index.keyLen]
    52  }
    53  
    54  func (index batchIndex) v(data []byte) []byte {
    55  	if index.valueLen != 0 {
    56  		return data[index.valuePos : index.valuePos+index.valueLen]
    57  	}
    58  	return nil
    59  }
    60  
    61  // Batch is a write batch.
    62  type Batch struct {
    63  	data  []byte
    64  	index []batchIndex
    65  
    66  	// internalLen is sums of key/value pair length plus 8-bytes internal key.
    67  	internalLen int
    68  
    69  	// growLimit is the threshold in order to slow down the memory allocation
    70  	// for batch when the number of accumulated entries exceeds value.
    71  	//
    72  	// batchGrowLimit is used as the default threshold if it's not configured.
    73  	growLimit int
    74  }
    75  
    76  func (b *Batch) grow(n int) {
    77  	o := len(b.data)
    78  	if cap(b.data)-o < n {
    79  		limit := batchGrowLimit
    80  		if b.growLimit > 0 {
    81  			limit = b.growLimit
    82  		}
    83  		div := 1
    84  		if len(b.index) > limit {
    85  			div = len(b.index) / limit
    86  		}
    87  		ndata := make([]byte, o, o+n+o/div)
    88  		copy(ndata, b.data)
    89  		b.data = ndata
    90  	}
    91  }
    92  
    93  func (b *Batch) appendRec(kt keyType, key, value []byte) {
    94  	n := 1 + binary.MaxVarintLen32 + len(key)
    95  	if kt == keyTypeVal {
    96  		n += binary.MaxVarintLen32 + len(value)
    97  	}
    98  	b.grow(n)
    99  	index := batchIndex{keyType: kt}
   100  	o := len(b.data)
   101  	data := b.data[:o+n]
   102  	data[o] = byte(kt)
   103  	o++
   104  	o += binary.PutUvarint(data[o:], uint64(len(key)))
   105  	index.keyPos = o
   106  	index.keyLen = len(key)
   107  	o += copy(data[o:], key)
   108  	if kt == keyTypeVal {
   109  		o += binary.PutUvarint(data[o:], uint64(len(value)))
   110  		index.valuePos = o
   111  		index.valueLen = len(value)
   112  		o += copy(data[o:], value)
   113  	}
   114  	b.data = data[:o]
   115  	b.index = append(b.index, index)
   116  	b.internalLen += index.keyLen + index.valueLen + 8
   117  }
   118  
   119  // Put appends 'put operation' of the given key/value pair to the batch.
   120  // It is safe to modify the contents of the argument after Put returns but not
   121  // before.
   122  func (b *Batch) Put(key, value []byte) {
   123  	b.appendRec(keyTypeVal, key, value)
   124  }
   125  
   126  // Delete appends 'delete operation' of the given key to the batch.
   127  // It is safe to modify the contents of the argument after Delete returns but
   128  // not before.
   129  func (b *Batch) Delete(key []byte) {
   130  	b.appendRec(keyTypeDel, key, nil)
   131  }
   132  
   133  // Dump dumps batch contents. The returned slice can be loaded into the
   134  // batch using Load method.
   135  // The returned slice is not its own copy, so the contents should not be
   136  // modified.
   137  func (b *Batch) Dump() []byte {
   138  	return b.data
   139  }
   140  
   141  // Load loads given slice into the batch. Previous contents of the batch
   142  // will be discarded.
   143  // The given slice will not be copied and will be used as batch buffer, so
   144  // it is not safe to modify the contents of the slice.
   145  func (b *Batch) Load(data []byte) error {
   146  	return b.decode(data, -1)
   147  }
   148  
   149  // Replay replays batch contents.
   150  func (b *Batch) Replay(r BatchReplay) error {
   151  	for _, index := range b.index {
   152  		switch index.keyType {
   153  		case keyTypeVal:
   154  			r.Put(index.k(b.data), index.v(b.data))
   155  		case keyTypeDel:
   156  			r.Delete(index.k(b.data))
   157  		}
   158  	}
   159  	return nil
   160  }
   161  
   162  // Len returns number of records in the batch.
   163  func (b *Batch) Len() int {
   164  	return len(b.index)
   165  }
   166  
   167  // Reset resets the batch.
   168  func (b *Batch) Reset() {
   169  	b.data = b.data[:0]
   170  	b.index = b.index[:0]
   171  	b.internalLen = 0
   172  }
   173  
   174  func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
   175  	for i, index := range b.index {
   176  		if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
   177  			return err
   178  		}
   179  	}
   180  	return nil
   181  }
   182  
   183  func (b *Batch) append(p *Batch) {
   184  	ob := len(b.data)
   185  	oi := len(b.index)
   186  	b.data = append(b.data, p.data...)
   187  	b.index = append(b.index, p.index...)
   188  	b.internalLen += p.internalLen
   189  
   190  	// Updating index offset.
   191  	if ob != 0 {
   192  		for ; oi < len(b.index); oi++ {
   193  			index := &b.index[oi]
   194  			index.keyPos += ob
   195  			if index.valueLen != 0 {
   196  				index.valuePos += ob
   197  			}
   198  		}
   199  	}
   200  }
   201  
   202  func (b *Batch) decode(data []byte, expectedLen int) error {
   203  	b.data = data
   204  	b.index = b.index[:0]
   205  	b.internalLen = 0
   206  	err := decodeBatch(data, func(i int, index batchIndex) error {
   207  		b.index = append(b.index, index)
   208  		b.internalLen += index.keyLen + index.valueLen + 8
   209  		return nil
   210  	})
   211  	if err != nil {
   212  		return err
   213  	}
   214  	if expectedLen >= 0 && len(b.index) != expectedLen {
   215  		return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
   216  	}
   217  	return nil
   218  }
   219  
   220  func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
   221  	var ik []byte
   222  	for i, index := range b.index {
   223  		ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
   224  		if err := mdb.Put(ik, index.v(b.data)); err != nil {
   225  			return err
   226  		}
   227  	}
   228  	return nil
   229  }
   230  
   231  func newBatch() interface{} {
   232  	return &Batch{}
   233  }
   234  
   235  // MakeBatch returns empty batch with preallocated buffer.
   236  func MakeBatch(n int) *Batch {
   237  	return &Batch{data: make([]byte, 0, n)}
   238  }
   239  
   240  // BatchConfig contains the config options for batch.
   241  type BatchConfig struct {
   242  	// InitialCapacity is the batch initial capacity to preallocate.
   243  	//
   244  	// The default value is 0.
   245  	InitialCapacity int
   246  
   247  	// GrowLimit is the limit (in terms of entry) of how much buffer
   248  	// can grow each cycle.
   249  	//
   250  	// Initially the buffer will grow twice its current size until
   251  	// GrowLimit threshold is reached, after that the buffer will grow
   252  	// up to GrowLimit each cycle. This buffer grow size in bytes is
   253  	// loosely calculated from average entry size multiplied by GrowLimit.
   254  	//
   255  	// Generally, the memory allocation step is larger if this value
   256  	// is configured large, vice versa.
   257  	//
   258  	// The default value is 3000.
   259  	GrowLimit int
   260  }
   261  
   262  // MakeBatchWithConfig initializes a batch object with the given configs.
   263  func MakeBatchWithConfig(config *BatchConfig) *Batch {
   264  	var batch = new(Batch)
   265  	if config != nil {
   266  		if config.InitialCapacity > 0 {
   267  			batch.data = make([]byte, 0, config.InitialCapacity)
   268  		}
   269  		if config.GrowLimit > 0 {
   270  			batch.growLimit = config.GrowLimit
   271  		}
   272  	}
   273  	return batch
   274  }
   275  
   276  func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
   277  	var index batchIndex
   278  	for i, o := 0, 0; o < len(data); i++ {
   279  		// Key type.
   280  		index.keyType = keyType(data[o])
   281  		if index.keyType > keyTypeVal {
   282  			return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
   283  		}
   284  		o++
   285  
   286  		// Key.
   287  		x, n := binary.Uvarint(data[o:])
   288  		o += n
   289  		if n <= 0 || o+int(x) > len(data) {
   290  			return newErrBatchCorrupted("bad record: invalid key length")
   291  		}
   292  		index.keyPos = o
   293  		index.keyLen = int(x)
   294  		o += index.keyLen
   295  
   296  		// Value.
   297  		if index.keyType == keyTypeVal {
   298  			x, n = binary.Uvarint(data[o:])
   299  			o += n
   300  			if n <= 0 || o+int(x) > len(data) {
   301  				return newErrBatchCorrupted("bad record: invalid value length")
   302  			}
   303  			index.valuePos = o
   304  			index.valueLen = int(x)
   305  			o += index.valueLen
   306  		} else {
   307  			index.valuePos = 0
   308  			index.valueLen = 0
   309  		}
   310  
   311  		if err := fn(i, index); err != nil {
   312  			return err
   313  		}
   314  	}
   315  	return nil
   316  }
   317  
   318  func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
   319  	seq, batchLen, err = decodeBatchHeader(data)
   320  	if err != nil {
   321  		return 0, 0, err
   322  	}
   323  	if seq < expectSeq {
   324  		return 0, 0, newErrBatchCorrupted("invalid sequence number")
   325  	}
   326  	data = data[batchHeaderLen:]
   327  	var ik []byte
   328  	var decodedLen int
   329  	err = decodeBatch(data, func(i int, index batchIndex) error {
   330  		if i >= batchLen {
   331  			return newErrBatchCorrupted("invalid records length")
   332  		}
   333  		ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
   334  		if err := mdb.Put(ik, index.v(data)); err != nil {
   335  			return err
   336  		}
   337  		decodedLen++
   338  		return nil
   339  	})
   340  	if err == nil && decodedLen != batchLen {
   341  		err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
   342  	}
   343  	return
   344  }
   345  
   346  func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
   347  	dst = ensureBuffer(dst, batchHeaderLen)
   348  	binary.LittleEndian.PutUint64(dst, seq)
   349  	binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
   350  	return dst
   351  }
   352  
   353  func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
   354  	if len(data) < batchHeaderLen {
   355  		return 0, 0, newErrBatchCorrupted("too short")
   356  	}
   357  
   358  	seq = binary.LittleEndian.Uint64(data)
   359  	batchLen = int(binary.LittleEndian.Uint32(data[8:]))
   360  	if batchLen < 0 {
   361  		return 0, 0, newErrBatchCorrupted("invalid records length")
   362  	}
   363  	return
   364  }
   365  
   366  func batchesLen(batches []*Batch) int {
   367  	batchLen := 0
   368  	for _, batch := range batches {
   369  		batchLen += batch.Len()
   370  	}
   371  	return batchLen
   372  }
   373  
   374  func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
   375  	if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
   376  		return err
   377  	}
   378  	for _, batch := range batches {
   379  		if _, err := wr.Write(batch.data); err != nil {
   380  			return err
   381  		}
   382  	}
   383  	return nil
   384  }
   385  

View as plain text