...

Source file src/github.com/syndtr/goleveldb/leveldb/table/writer.go

Documentation: github.com/syndtr/goleveldb/leveldb/table

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package table
     8  
     9  import (
    10  	"encoding/binary"
    11  	"errors"
    12  	"fmt"
    13  	"io"
    14  
    15  	"github.com/golang/snappy"
    16  
    17  	"github.com/syndtr/goleveldb/leveldb/comparer"
    18  	"github.com/syndtr/goleveldb/leveldb/filter"
    19  	"github.com/syndtr/goleveldb/leveldb/opt"
    20  	"github.com/syndtr/goleveldb/leveldb/util"
    21  )
    22  
    23  func sharedPrefixLen(a, b []byte) int {
    24  	i, n := 0, len(a)
    25  	if n > len(b) {
    26  		n = len(b)
    27  	}
    28  	for i < n && a[i] == b[i] {
    29  		i++
    30  	}
    31  	return i
    32  }
    33  
    34  type blockWriter struct {
    35  	restartInterval int
    36  	buf             util.Buffer
    37  	nEntries        int
    38  	prevKey         []byte
    39  	restarts        []uint32
    40  	scratch         []byte
    41  }
    42  
    43  func (w *blockWriter) append(key, value []byte) (err error) {
    44  	nShared := 0
    45  	if w.nEntries%w.restartInterval == 0 {
    46  		w.restarts = append(w.restarts, uint32(w.buf.Len()))
    47  	} else {
    48  		nShared = sharedPrefixLen(w.prevKey, key)
    49  	}
    50  	n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
    51  	n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
    52  	n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
    53  	if _, err = w.buf.Write(w.scratch[:n]); err != nil {
    54  		return err
    55  	}
    56  	if _, err = w.buf.Write(key[nShared:]); err != nil {
    57  		return err
    58  	}
    59  	if _, err = w.buf.Write(value); err != nil {
    60  		return err
    61  	}
    62  	w.prevKey = append(w.prevKey[:0], key...)
    63  	w.nEntries++
    64  	return nil
    65  }
    66  
    67  func (w *blockWriter) finish() error {
    68  	// Write restarts entry.
    69  	if w.nEntries == 0 {
    70  		// Must have at least one restart entry.
    71  		w.restarts = append(w.restarts, 0)
    72  	}
    73  	w.restarts = append(w.restarts, uint32(len(w.restarts)))
    74  	for _, x := range w.restarts {
    75  		buf4 := w.buf.Alloc(4)
    76  		binary.LittleEndian.PutUint32(buf4, x)
    77  	}
    78  	return nil
    79  }
    80  
    81  func (w *blockWriter) reset() {
    82  	w.buf.Reset()
    83  	w.nEntries = 0
    84  	w.restarts = w.restarts[:0]
    85  }
    86  
    87  func (w *blockWriter) bytesLen() int {
    88  	restartsLen := len(w.restarts)
    89  	if restartsLen == 0 {
    90  		restartsLen = 1
    91  	}
    92  	return w.buf.Len() + 4*restartsLen + 4
    93  }
    94  
    95  type filterWriter struct {
    96  	generator filter.FilterGenerator
    97  	buf       util.Buffer
    98  	nKeys     int
    99  	offsets   []uint32
   100  	baseLg    uint
   101  }
   102  
   103  func (w *filterWriter) add(key []byte) {
   104  	if w.generator == nil {
   105  		return
   106  	}
   107  	w.generator.Add(key)
   108  	w.nKeys++
   109  }
   110  
   111  func (w *filterWriter) flush(offset uint64) {
   112  	if w.generator == nil {
   113  		return
   114  	}
   115  	for x := int(offset / uint64(1<<w.baseLg)); x > len(w.offsets); {
   116  		w.generate()
   117  	}
   118  }
   119  
   120  func (w *filterWriter) finish() error {
   121  	if w.generator == nil {
   122  		return nil
   123  	}
   124  	// Generate last keys.
   125  
   126  	if w.nKeys > 0 {
   127  		w.generate()
   128  	}
   129  	w.offsets = append(w.offsets, uint32(w.buf.Len()))
   130  	for _, x := range w.offsets {
   131  		buf4 := w.buf.Alloc(4)
   132  		binary.LittleEndian.PutUint32(buf4, x)
   133  	}
   134  	return w.buf.WriteByte(byte(w.baseLg))
   135  }
   136  
   137  func (w *filterWriter) generate() {
   138  	// Record offset.
   139  	w.offsets = append(w.offsets, uint32(w.buf.Len()))
   140  	// Generate filters.
   141  	if w.nKeys > 0 {
   142  		w.generator.Generate(&w.buf)
   143  		w.nKeys = 0
   144  	}
   145  }
   146  
   147  // Writer is a table writer.
   148  type Writer struct {
   149  	writer io.Writer
   150  	err    error
   151  	// Options
   152  	cmp         comparer.Comparer
   153  	filter      filter.Filter
   154  	compression opt.Compression
   155  	blockSize   int
   156  
   157  	bpool       *util.BufferPool
   158  	dataBlock   blockWriter
   159  	indexBlock  blockWriter
   160  	filterBlock filterWriter
   161  	pendingBH   blockHandle
   162  	offset      uint64
   163  	nEntries    int
   164  	// Scratch allocated enough for 5 uvarint. Block writer should not use
   165  	// first 20-bytes since it will be used to encode block handle, which
   166  	// then passed to the block writer itself.
   167  	scratch            [50]byte
   168  	comparerScratch    []byte
   169  	compressionScratch []byte
   170  }
   171  
   172  func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
   173  	// Compress the buffer if necessary.
   174  	var b []byte
   175  	if compression == opt.SnappyCompression {
   176  		// Allocate scratch enough for compression and block trailer.
   177  		if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
   178  			w.compressionScratch = make([]byte, n)
   179  		}
   180  		compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
   181  		n := len(compressed)
   182  		b = compressed[:n+blockTrailerLen]
   183  		b[n] = blockTypeSnappyCompression
   184  	} else {
   185  		tmp := buf.Alloc(blockTrailerLen)
   186  		tmp[0] = blockTypeNoCompression
   187  		b = buf.Bytes()
   188  	}
   189  
   190  	// Calculate the checksum.
   191  	n := len(b) - 4
   192  	checksum := util.NewCRC(b[:n]).Value()
   193  	binary.LittleEndian.PutUint32(b[n:], checksum)
   194  
   195  	// Write the buffer to the file.
   196  	_, err = w.writer.Write(b)
   197  	if err != nil {
   198  		return
   199  	}
   200  	bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
   201  	w.offset += uint64(len(b))
   202  	return
   203  }
   204  
   205  func (w *Writer) flushPendingBH(key []byte) error {
   206  	if w.pendingBH.length == 0 {
   207  		return nil
   208  	}
   209  	var separator []byte
   210  	if len(key) == 0 {
   211  		separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
   212  	} else {
   213  		separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
   214  	}
   215  	if separator == nil {
   216  		separator = w.dataBlock.prevKey
   217  	} else {
   218  		w.comparerScratch = separator
   219  	}
   220  	n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
   221  	// Append the block handle to the index block.
   222  	if err := w.indexBlock.append(separator, w.scratch[:n]); err != nil {
   223  		return err
   224  	}
   225  	// Reset prev key of the data block.
   226  	w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
   227  	// Clear pending block handle.
   228  	w.pendingBH = blockHandle{}
   229  	return nil
   230  }
   231  
   232  func (w *Writer) finishBlock() error {
   233  	if err := w.dataBlock.finish(); err != nil {
   234  		return err
   235  	}
   236  	bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
   237  	if err != nil {
   238  		return err
   239  	}
   240  	w.pendingBH = bh
   241  	// Reset the data block.
   242  	w.dataBlock.reset()
   243  	// Flush the filter block.
   244  	w.filterBlock.flush(w.offset)
   245  	return nil
   246  }
   247  
   248  // Append appends key/value pair to the table. The keys passed must
   249  // be in increasing order.
   250  //
   251  // It is safe to modify the contents of the arguments after Append returns.
   252  func (w *Writer) Append(key, value []byte) error {
   253  	if w.err != nil {
   254  		return w.err
   255  	}
   256  	if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
   257  		w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
   258  		return w.err
   259  	}
   260  
   261  	if err := w.flushPendingBH(key); err != nil {
   262  		return err
   263  	}
   264  	// Append key/value pair to the data block.
   265  	if err := w.dataBlock.append(key, value); err != nil {
   266  		return err
   267  	}
   268  	// Add key to the filter block.
   269  	w.filterBlock.add(key)
   270  
   271  	// Finish the data block if block size target reached.
   272  	if w.dataBlock.bytesLen() >= w.blockSize {
   273  		if err := w.finishBlock(); err != nil {
   274  			w.err = err
   275  			return w.err
   276  		}
   277  	}
   278  	w.nEntries++
   279  	return nil
   280  }
   281  
   282  // BlocksLen returns number of blocks written so far.
   283  func (w *Writer) BlocksLen() int {
   284  	n := w.indexBlock.nEntries
   285  	if w.pendingBH.length > 0 {
   286  		// Includes the pending block.
   287  		n++
   288  	}
   289  	return n
   290  }
   291  
   292  // EntriesLen returns number of entries added so far.
   293  func (w *Writer) EntriesLen() int {
   294  	return w.nEntries
   295  }
   296  
   297  // BytesLen returns number of bytes written so far.
   298  func (w *Writer) BytesLen() int {
   299  	return int(w.offset)
   300  }
   301  
   302  // Close will finalize the table. Calling Append is not possible
   303  // after Close, but calling BlocksLen, EntriesLen and BytesLen
   304  // is still possible.
   305  func (w *Writer) Close() error {
   306  	defer func() {
   307  		if w.bpool != nil {
   308  			// Buffer.Bytes() returns [offset:] of the buffer.
   309  			// We need to Reset() so that the offset = 0, resulting
   310  			// in buf.Bytes() returning the whole allocated bytes.
   311  			w.dataBlock.buf.Reset()
   312  			w.bpool.Put(w.dataBlock.buf.Bytes())
   313  		}
   314  	}()
   315  
   316  	if w.err != nil {
   317  		return w.err
   318  	}
   319  
   320  	// Write the last data block. Or empty data block if there
   321  	// aren't any data blocks at all.
   322  	if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
   323  		if err := w.finishBlock(); err != nil {
   324  			w.err = err
   325  			return w.err
   326  		}
   327  	}
   328  	if err := w.flushPendingBH(nil); err != nil {
   329  		return err
   330  	}
   331  
   332  	// Write the filter block.
   333  	var filterBH blockHandle
   334  	if err := w.filterBlock.finish(); err != nil {
   335  		return err
   336  	}
   337  	if buf := &w.filterBlock.buf; buf.Len() > 0 {
   338  		filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
   339  		if w.err != nil {
   340  			return w.err
   341  		}
   342  	}
   343  
   344  	// Write the metaindex block.
   345  	if filterBH.length > 0 {
   346  		key := []byte("filter." + w.filter.Name())
   347  		n := encodeBlockHandle(w.scratch[:20], filterBH)
   348  		if err := w.dataBlock.append(key, w.scratch[:n]); err != nil {
   349  			return err
   350  		}
   351  	}
   352  	if err := w.dataBlock.finish(); err != nil {
   353  		return err
   354  	}
   355  	metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
   356  	if err != nil {
   357  		w.err = err
   358  		return w.err
   359  	}
   360  
   361  	// Write the index block.
   362  	if err := w.indexBlock.finish(); err != nil {
   363  		return err
   364  	}
   365  	indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
   366  	if err != nil {
   367  		w.err = err
   368  		return w.err
   369  	}
   370  
   371  	// Write the table footer.
   372  	footer := w.scratch[:footerLen]
   373  	for i := range footer {
   374  		footer[i] = 0
   375  	}
   376  	n := encodeBlockHandle(footer, metaindexBH)
   377  	encodeBlockHandle(footer[n:], indexBH)
   378  	copy(footer[footerLen-len(magic):], magic)
   379  	if _, err := w.writer.Write(footer); err != nil {
   380  		w.err = err
   381  		return w.err
   382  	}
   383  	w.offset += footerLen
   384  
   385  	w.err = errors.New("leveldb/table: writer is closed")
   386  	return nil
   387  }
   388  
   389  // NewWriter creates a new initialized table writer for the file.
   390  //
   391  // Table writer is not safe for concurrent use.
   392  func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Writer {
   393  	var bufBytes []byte
   394  	if pool == nil {
   395  		bufBytes = make([]byte, size)
   396  	} else {
   397  		bufBytes = pool.Get(size)
   398  	}
   399  	bufBytes = bufBytes[:0]
   400  
   401  	w := &Writer{
   402  		writer:          f,
   403  		cmp:             o.GetComparer(),
   404  		filter:          o.GetFilter(),
   405  		compression:     o.GetCompression(),
   406  		blockSize:       o.GetBlockSize(),
   407  		comparerScratch: make([]byte, 0),
   408  		bpool:           pool,
   409  		dataBlock:       blockWriter{buf: *util.NewBuffer(bufBytes)},
   410  	}
   411  	// data block
   412  	w.dataBlock.restartInterval = o.GetBlockRestartInterval()
   413  	// The first 20-bytes are used for encoding block handle.
   414  	w.dataBlock.scratch = w.scratch[20:]
   415  	// index block
   416  	w.indexBlock.restartInterval = 1
   417  	w.indexBlock.scratch = w.scratch[20:]
   418  	// filter block
   419  	if w.filter != nil {
   420  		w.filterBlock.generator = w.filter.NewGenerator()
   421  		w.filterBlock.baseLg = uint(o.GetFilterBaseLg())
   422  		w.filterBlock.flush(0)
   423  	}
   424  	return w
   425  }
   426  

View as plain text