...

Source file src/github.com/syndtr/goleveldb/leveldb/journal/journal.go

Documentation: github.com/syndtr/goleveldb/leveldb/journal

     1  // Copyright 2011 The LevelDB-Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
     6  // License, authors and contributors informations can be found at bellow URLs respectively:
     7  // 	https://code.google.com/p/leveldb-go/source/browse/LICENSE
     8  //	https://code.google.com/p/leveldb-go/source/browse/AUTHORS
     9  //  https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
    10  
    11  // Package journal reads and writes sequences of journals. Each journal is a stream
    12  // of bytes that completes before the next journal starts.
    13  //
    14  // When reading, call Next to obtain an io.Reader for the next journal. Next will
    15  // return io.EOF when there are no more journals. It is valid to call Next
    16  // without reading the current journal to exhaustion.
    17  //
    18  // When writing, call Next to obtain an io.Writer for the next journal. Calling
    19  // Next finishes the current journal. Call Close to finish the final journal.
    20  //
    21  // Optionally, call Flush to finish the current journal and flush the underlying
    22  // writer without starting a new journal. To start a new journal after flushing,
    23  // call Next.
    24  //
    25  // Neither Readers or Writers are safe to use concurrently.
    26  //
    27  // Example code:
    28  //	func read(r io.Reader) ([]string, error) {
    29  //		var ss []string
    30  //		journals := journal.NewReader(r, nil, true, true)
    31  //		for {
    32  //			j, err := journals.Next()
    33  //			if err == io.EOF {
    34  //				break
    35  //			}
    36  //			if err != nil {
    37  //				return nil, err
    38  //			}
    39  //			s, err := ioutil.ReadAll(j)
    40  //			if err != nil {
    41  //				return nil, err
    42  //			}
    43  //			ss = append(ss, string(s))
    44  //		}
    45  //		return ss, nil
    46  //	}
    47  //
    48  //	func write(w io.Writer, ss []string) error {
    49  //		journals := journal.NewWriter(w)
    50  //		for _, s := range ss {
    51  //			j, err := journals.Next()
    52  //			if err != nil {
    53  //				return err
    54  //			}
    55  //			if _, err := j.Write([]byte(s)), err != nil {
    56  //				return err
    57  //			}
    58  //		}
    59  //		return journals.Close()
    60  //	}
    61  //
    62  // The wire format is that the stream is divided into 32KiB blocks, and each
    63  // block contains a number of tightly packed chunks. Chunks cannot cross block
    64  // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
    65  // block must be zero.
    66  //
    67  // A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
    68  // byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
    69  // followed by a payload. The checksum is over the chunk type and the payload.
    70  //
    71  // There are four chunk types: whether the chunk is the full journal, or the
    72  // first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
    73  // has one first chunk, zero or more middle chunks, and one last chunk.
    74  //
    75  // The wire format allows for limited recovery in the face of data corruption:
    76  // on a format error (such as a checksum mismatch), the reader moves to the
    77  // next block and looks for the next full or first chunk.
    78  package journal
    79  
    80  import (
    81  	"encoding/binary"
    82  	"fmt"
    83  	"io"
    84  
    85  	"github.com/syndtr/goleveldb/leveldb/errors"
    86  	"github.com/syndtr/goleveldb/leveldb/storage"
    87  	"github.com/syndtr/goleveldb/leveldb/util"
    88  )
    89  
    90  // These constants are part of the wire format and should not be changed.
    91  const (
    92  	fullChunkType   = 1
    93  	firstChunkType  = 2
    94  	middleChunkType = 3
    95  	lastChunkType   = 4
    96  )
    97  
    98  const (
    99  	blockSize  = 32 * 1024
   100  	headerSize = 7
   101  )
   102  
   103  type flusher interface {
   104  	Flush() error
   105  }
   106  
   107  // ErrCorrupted is the error type that generated by corrupted block or chunk.
   108  type ErrCorrupted struct {
   109  	Size   int
   110  	Reason string
   111  }
   112  
   113  func (e *ErrCorrupted) Error() string {
   114  	return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
   115  }
   116  
   117  // Dropper is the interface that wrap simple Drop method. The Drop
   118  // method will be called when the journal reader dropping a block or chunk.
   119  type Dropper interface {
   120  	Drop(err error)
   121  }
   122  
   123  // Reader reads journals from an underlying io.Reader.
   124  type Reader struct {
   125  	// r is the underlying reader.
   126  	r io.Reader
   127  	// the dropper.
   128  	dropper Dropper
   129  	// strict flag.
   130  	strict bool
   131  	// checksum flag.
   132  	checksum bool
   133  	// seq is the sequence number of the current journal.
   134  	seq int
   135  	// buf[i:j] is the unread portion of the current chunk's payload.
   136  	// The low bound, i, excludes the chunk header.
   137  	i, j int
   138  	// n is the number of bytes of buf that are valid. Once reading has started,
   139  	// only the final block can have n < blockSize.
   140  	n int
   141  	// last is whether the current chunk is the last chunk of the journal.
   142  	last bool
   143  	// err is any accumulated error.
   144  	err error
   145  	// buf is the buffer.
   146  	buf [blockSize]byte
   147  }
   148  
   149  // NewReader returns a new reader. The dropper may be nil, and if
   150  // strict is true then corrupted or invalid chunk will halt the journal
   151  // reader entirely.
   152  func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
   153  	return &Reader{
   154  		r:        r,
   155  		dropper:  dropper,
   156  		strict:   strict,
   157  		checksum: checksum,
   158  		last:     true,
   159  	}
   160  }
   161  
   162  var errSkip = errors.New("leveldb/journal: skipped")
   163  
   164  func (r *Reader) corrupt(n int, reason string, skip bool) error {
   165  	if r.dropper != nil {
   166  		r.dropper.Drop(&ErrCorrupted{n, reason})
   167  	}
   168  	if r.strict && !skip {
   169  		r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
   170  		return r.err
   171  	}
   172  	return errSkip
   173  }
   174  
   175  // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
   176  // next block into the buffer if necessary.
   177  func (r *Reader) nextChunk(first bool) error {
   178  	for {
   179  		if r.j+headerSize <= r.n {
   180  			checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
   181  			length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
   182  			chunkType := r.buf[r.j+6]
   183  			unprocBlock := r.n - r.j
   184  			if checksum == 0 && length == 0 && chunkType == 0 {
   185  				// Drop entire block.
   186  				r.i = r.n
   187  				r.j = r.n
   188  				return r.corrupt(unprocBlock, "zero header", false)
   189  			}
   190  			if chunkType < fullChunkType || chunkType > lastChunkType {
   191  				// Drop entire block.
   192  				r.i = r.n
   193  				r.j = r.n
   194  				return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
   195  			}
   196  			r.i = r.j + headerSize
   197  			r.j = r.j + headerSize + int(length)
   198  			if r.j > r.n {
   199  				// Drop entire block.
   200  				r.i = r.n
   201  				r.j = r.n
   202  				return r.corrupt(unprocBlock, "chunk length overflows block", false)
   203  			} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
   204  				// Drop entire block.
   205  				r.i = r.n
   206  				r.j = r.n
   207  				return r.corrupt(unprocBlock, "checksum mismatch", false)
   208  			}
   209  			if first && chunkType != fullChunkType && chunkType != firstChunkType {
   210  				chunkLength := (r.j - r.i) + headerSize
   211  				r.i = r.j
   212  				// Report the error, but skip it.
   213  				return r.corrupt(chunkLength, "orphan chunk", true)
   214  			}
   215  			r.last = chunkType == fullChunkType || chunkType == lastChunkType
   216  			return nil
   217  		}
   218  
   219  		// The last block.
   220  		if r.n < blockSize && r.n > 0 {
   221  			if !first {
   222  				return r.corrupt(0, "missing chunk part", false)
   223  			}
   224  			r.err = io.EOF
   225  			return r.err
   226  		}
   227  
   228  		// Read block.
   229  		n, err := io.ReadFull(r.r, r.buf[:])
   230  		if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
   231  			return err
   232  		}
   233  		if n == 0 {
   234  			if !first {
   235  				return r.corrupt(0, "missing chunk part", false)
   236  			}
   237  			r.err = io.EOF
   238  			return r.err
   239  		}
   240  		r.i, r.j, r.n = 0, 0, n
   241  	}
   242  }
   243  
   244  // Next returns a reader for the next journal. It returns io.EOF if there are no
   245  // more journals. The reader returned becomes stale after the next Next call,
   246  // and should no longer be used. If strict is false, the reader will returns
   247  // io.ErrUnexpectedEOF error when found corrupted journal.
   248  func (r *Reader) Next() (io.Reader, error) {
   249  	r.seq++
   250  	if r.err != nil {
   251  		return nil, r.err
   252  	}
   253  	r.i = r.j
   254  	for {
   255  		if err := r.nextChunk(true); err == nil {
   256  			break
   257  		} else if err != errSkip {
   258  			return nil, err
   259  		}
   260  	}
   261  	return &singleReader{r, r.seq, nil}, nil
   262  }
   263  
   264  // Reset resets the journal reader, allows reuse of the journal reader. Reset returns
   265  // last accumulated error.
   266  func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
   267  	r.seq++
   268  	err := r.err
   269  	r.r = reader
   270  	r.dropper = dropper
   271  	r.strict = strict
   272  	r.checksum = checksum
   273  	r.i = 0
   274  	r.j = 0
   275  	r.n = 0
   276  	r.last = true
   277  	r.err = nil
   278  	return err
   279  }
   280  
   281  type singleReader struct {
   282  	r   *Reader
   283  	seq int
   284  	err error
   285  }
   286  
   287  func (x *singleReader) Read(p []byte) (int, error) {
   288  	r := x.r
   289  	if r.seq != x.seq {
   290  		return 0, errors.New("leveldb/journal: stale reader")
   291  	}
   292  	if x.err != nil {
   293  		return 0, x.err
   294  	}
   295  	if r.err != nil {
   296  		return 0, r.err
   297  	}
   298  	for r.i == r.j {
   299  		if r.last {
   300  			return 0, io.EOF
   301  		}
   302  		x.err = r.nextChunk(false)
   303  		if x.err != nil {
   304  			if x.err == errSkip {
   305  				x.err = io.ErrUnexpectedEOF
   306  			}
   307  			return 0, x.err
   308  		}
   309  	}
   310  	n := copy(p, r.buf[r.i:r.j])
   311  	r.i += n
   312  	return n, nil
   313  }
   314  
   315  func (x *singleReader) ReadByte() (byte, error) {
   316  	r := x.r
   317  	if r.seq != x.seq {
   318  		return 0, errors.New("leveldb/journal: stale reader")
   319  	}
   320  	if x.err != nil {
   321  		return 0, x.err
   322  	}
   323  	if r.err != nil {
   324  		return 0, r.err
   325  	}
   326  	for r.i == r.j {
   327  		if r.last {
   328  			return 0, io.EOF
   329  		}
   330  		x.err = r.nextChunk(false)
   331  		if x.err != nil {
   332  			if x.err == errSkip {
   333  				x.err = io.ErrUnexpectedEOF
   334  			}
   335  			return 0, x.err
   336  		}
   337  	}
   338  	c := r.buf[r.i]
   339  	r.i++
   340  	return c, nil
   341  }
   342  
   343  // Writer writes journals to an underlying io.Writer.
   344  type Writer struct {
   345  	// w is the underlying writer.
   346  	w io.Writer
   347  	// seq is the sequence number of the current journal.
   348  	seq int
   349  	// f is w as a flusher.
   350  	f flusher
   351  	// buf[i:j] is the bytes that will become the current chunk.
   352  	// The low bound, i, includes the chunk header.
   353  	i, j int
   354  	// buf[:written] has already been written to w.
   355  	// written is zero unless Flush has been called.
   356  	written int
   357  	// blockNumber is the zero based block number currently held in buf.
   358  	blockNumber int64
   359  	// first is whether the current chunk is the first chunk of the journal.
   360  	first bool
   361  	// pending is whether a chunk is buffered but not yet written.
   362  	pending bool
   363  	// err is any accumulated error.
   364  	err error
   365  	// buf is the buffer.
   366  	buf [blockSize]byte
   367  }
   368  
   369  // NewWriter returns a new Writer.
   370  func NewWriter(w io.Writer) *Writer {
   371  	f, _ := w.(flusher)
   372  	return &Writer{
   373  		w: w,
   374  		f: f,
   375  	}
   376  }
   377  
   378  // fillHeader fills in the header for the pending chunk.
   379  func (w *Writer) fillHeader(last bool) {
   380  	if w.i+headerSize > w.j || w.j > blockSize {
   381  		panic("leveldb/journal: bad writer state")
   382  	}
   383  	if last {
   384  		if w.first {
   385  			w.buf[w.i+6] = fullChunkType
   386  		} else {
   387  			w.buf[w.i+6] = lastChunkType
   388  		}
   389  	} else {
   390  		if w.first {
   391  			w.buf[w.i+6] = firstChunkType
   392  		} else {
   393  			w.buf[w.i+6] = middleChunkType
   394  		}
   395  	}
   396  	binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
   397  	binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
   398  }
   399  
   400  // writeBlock writes the buffered block to the underlying writer, and reserves
   401  // space for the next chunk's header.
   402  func (w *Writer) writeBlock() {
   403  	_, w.err = w.w.Write(w.buf[w.written:])
   404  	w.i = 0
   405  	w.j = headerSize
   406  	w.written = 0
   407  	w.blockNumber++
   408  }
   409  
   410  // writePending finishes the current journal and writes the buffer to the
   411  // underlying writer.
   412  func (w *Writer) writePending() {
   413  	if w.err != nil {
   414  		return
   415  	}
   416  	if w.pending {
   417  		w.fillHeader(true)
   418  		w.pending = false
   419  	}
   420  	_, w.err = w.w.Write(w.buf[w.written:w.j])
   421  	w.written = w.j
   422  }
   423  
   424  // Close finishes the current journal and closes the writer.
   425  func (w *Writer) Close() error {
   426  	w.seq++
   427  	w.writePending()
   428  	if w.err != nil {
   429  		return w.err
   430  	}
   431  	w.err = errors.New("leveldb/journal: closed Writer")
   432  	return nil
   433  }
   434  
   435  // Flush finishes the current journal, writes to the underlying writer, and
   436  // flushes it if that writer implements interface{ Flush() error }.
   437  func (w *Writer) Flush() error {
   438  	w.seq++
   439  	w.writePending()
   440  	if w.err != nil {
   441  		return w.err
   442  	}
   443  	if w.f != nil {
   444  		w.err = w.f.Flush()
   445  		return w.err
   446  	}
   447  	return nil
   448  }
   449  
   450  // Reset resets the journal writer, allows reuse of the journal writer. Reset
   451  // will also closes the journal writer if not already.
   452  func (w *Writer) Reset(writer io.Writer) (err error) {
   453  	w.seq++
   454  	if w.err == nil {
   455  		w.writePending()
   456  		err = w.err
   457  	}
   458  	w.w = writer
   459  	w.f, _ = writer.(flusher)
   460  	w.i = 0
   461  	w.j = 0
   462  	w.written = 0
   463  	w.blockNumber = 0
   464  	w.first = false
   465  	w.pending = false
   466  	w.err = nil
   467  	return
   468  }
   469  
   470  // Next returns a writer for the next journal. The writer returned becomes stale
   471  // after the next Close, Flush or Next call, and should no longer be used.
   472  func (w *Writer) Next() (io.Writer, error) {
   473  	w.seq++
   474  	if w.err != nil {
   475  		return nil, w.err
   476  	}
   477  	if w.pending {
   478  		w.fillHeader(true)
   479  	}
   480  	w.i = w.j
   481  	w.j += headerSize
   482  	// Check if there is room in the block for the header.
   483  	if w.j > blockSize {
   484  		// Fill in the rest of the block with zeroes.
   485  		for k := w.i; k < blockSize; k++ {
   486  			w.buf[k] = 0
   487  		}
   488  		w.writeBlock()
   489  		if w.err != nil {
   490  			return nil, w.err
   491  		}
   492  	}
   493  	w.first = true
   494  	w.pending = true
   495  	return singleWriter{w, w.seq}, nil
   496  }
   497  
   498  // Size returns the current size of the file.
   499  func (w *Writer) Size() int64 {
   500  	if w == nil {
   501  		return 0
   502  	}
   503  	return w.blockNumber*blockSize + int64(w.j)
   504  }
   505  
   506  type singleWriter struct {
   507  	w   *Writer
   508  	seq int
   509  }
   510  
   511  func (x singleWriter) Write(p []byte) (int, error) {
   512  	w := x.w
   513  	if w.seq != x.seq {
   514  		return 0, errors.New("leveldb/journal: stale writer")
   515  	}
   516  	if w.err != nil {
   517  		return 0, w.err
   518  	}
   519  	n0 := len(p)
   520  	for len(p) > 0 {
   521  		// Write a block, if it is full.
   522  		if w.j == blockSize {
   523  			w.fillHeader(false)
   524  			w.writeBlock()
   525  			if w.err != nil {
   526  				return 0, w.err
   527  			}
   528  			w.first = false
   529  		}
   530  		// Copy bytes into the buffer.
   531  		n := copy(w.buf[w.j:], p)
   532  		w.j += n
   533  		p = p[n:]
   534  	}
   535  	return n0, nil
   536  }
   537  

View as plain text