...

Source file src/github.com/pierrec/lz4/v4/writer.go

Documentation: github.com/pierrec/lz4/v4

     1  package lz4
     2  
     3  import (
     4  	"io"
     5  
     6  	"github.com/pierrec/lz4/v4/internal/lz4block"
     7  	"github.com/pierrec/lz4/v4/internal/lz4errors"
     8  	"github.com/pierrec/lz4/v4/internal/lz4stream"
     9  )
    10  
    11  var writerStates = []aState{
    12  	noState:     newState,
    13  	newState:    writeState,
    14  	writeState:  closedState,
    15  	closedState: newState,
    16  	errorState:  newState,
    17  }
    18  
    19  // NewWriter returns a new LZ4 frame encoder.
    20  func NewWriter(w io.Writer) *Writer {
    21  	zw := &Writer{frame: lz4stream.NewFrame()}
    22  	zw.state.init(writerStates)
    23  	_ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
    24  	zw.Reset(w)
    25  	return zw
    26  }
    27  
    28  // Writer allows writing an LZ4 stream.
    29  type Writer struct {
    30  	state   _State
    31  	src     io.Writer                 // destination writer
    32  	level   lz4block.CompressionLevel // how hard to try
    33  	num     int                       // concurrency level
    34  	frame   *lz4stream.Frame          // frame being built
    35  	data    []byte                    // pending data
    36  	idx     int                       // size of pending data
    37  	handler func(int)
    38  	legacy  bool
    39  }
    40  
    41  func (*Writer) private() {}
    42  
    43  func (w *Writer) Apply(options ...Option) (err error) {
    44  	defer w.state.check(&err)
    45  	switch w.state.state {
    46  	case newState:
    47  	case errorState:
    48  		return w.state.err
    49  	default:
    50  		return lz4errors.ErrOptionClosedOrError
    51  	}
    52  	w.Reset(w.src)
    53  	for _, o := range options {
    54  		if err = o(w); err != nil {
    55  			return
    56  		}
    57  	}
    58  	return
    59  }
    60  
    61  func (w *Writer) isNotConcurrent() bool {
    62  	return w.num == 1
    63  }
    64  
    65  // init sets up the Writer when in newState. It does not change the Writer state.
    66  func (w *Writer) init() error {
    67  	w.frame.InitW(w.src, w.num, w.legacy)
    68  	size := w.frame.Descriptor.Flags.BlockSizeIndex()
    69  	w.data = size.Get()
    70  	w.idx = 0
    71  	return w.frame.Descriptor.Write(w.frame, w.src)
    72  }
    73  
    74  func (w *Writer) Write(buf []byte) (n int, err error) {
    75  	defer w.state.check(&err)
    76  	switch w.state.state {
    77  	case writeState:
    78  	case closedState, errorState:
    79  		return 0, w.state.err
    80  	case newState:
    81  		if err = w.init(); w.state.next(err) {
    82  			return
    83  		}
    84  	default:
    85  		return 0, w.state.fail()
    86  	}
    87  
    88  	zn := len(w.data)
    89  	for len(buf) > 0 {
    90  		if w.isNotConcurrent() && w.idx == 0 && len(buf) >= zn {
    91  			// Avoid a copy as there is enough data for a block.
    92  			if err = w.write(buf[:zn], false); err != nil {
    93  				return
    94  			}
    95  			n += zn
    96  			buf = buf[zn:]
    97  			continue
    98  		}
    99  		// Accumulate the data to be compressed.
   100  		m := copy(w.data[w.idx:], buf)
   101  		n += m
   102  		w.idx += m
   103  		buf = buf[m:]
   104  
   105  		if w.idx < len(w.data) {
   106  			// Buffer not filled.
   107  			return
   108  		}
   109  
   110  		// Buffer full.
   111  		if err = w.write(w.data, true); err != nil {
   112  			return
   113  		}
   114  		if !w.isNotConcurrent() {
   115  			size := w.frame.Descriptor.Flags.BlockSizeIndex()
   116  			w.data = size.Get()
   117  		}
   118  		w.idx = 0
   119  	}
   120  	return
   121  }
   122  
   123  func (w *Writer) write(data []byte, safe bool) error {
   124  	if w.isNotConcurrent() {
   125  		block := w.frame.Blocks.Block
   126  		err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src)
   127  		w.handler(len(block.Data))
   128  		return err
   129  	}
   130  	c := make(chan *lz4stream.FrameDataBlock)
   131  	w.frame.Blocks.Blocks <- c
   132  	go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) {
   133  		b := lz4stream.NewFrameDataBlock(w.frame)
   134  		c <- b.Compress(w.frame, data, w.level)
   135  		<-c
   136  		w.handler(len(b.Data))
   137  		b.Close(w.frame)
   138  		if safe {
   139  			// safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed
   140  			lz4block.Put(data)
   141  		}
   142  	}(c, data, safe)
   143  
   144  	return nil
   145  }
   146  
   147  // Flush any buffered data to the underlying writer immediately.
   148  func (w *Writer) Flush() (err error) {
   149  	switch w.state.state {
   150  	case writeState:
   151  	case errorState:
   152  		return w.state.err
   153  	case newState:
   154  		if err = w.init(); w.state.next(err) {
   155  			return
   156  		}
   157  	default:
   158  		return nil
   159  	}
   160  
   161  	if w.idx > 0 {
   162  		// Flush pending data, disable w.data freeing as it is done later on.
   163  		if err = w.write(w.data[:w.idx], false); err != nil {
   164  			return err
   165  		}
   166  		w.idx = 0
   167  	}
   168  	return nil
   169  }
   170  
   171  // Close closes the Writer, flushing any unwritten data to the underlying writer
   172  // without closing it.
   173  func (w *Writer) Close() error {
   174  	if err := w.Flush(); err != nil {
   175  		return err
   176  	}
   177  	err := w.frame.CloseW(w.src, w.num)
   178  	// It is now safe to free the buffer.
   179  	if w.data != nil {
   180  		lz4block.Put(w.data)
   181  		w.data = nil
   182  	}
   183  	return err
   184  }
   185  
   186  // Reset clears the state of the Writer w such that it is equivalent to its
   187  // initial state from NewWriter, but instead writing to writer.
   188  // Reset keeps the previous options unless overwritten by the supplied ones.
   189  // No access to writer is performed.
   190  //
   191  // w.Close must be called before Reset or pending data may be dropped.
   192  func (w *Writer) Reset(writer io.Writer) {
   193  	w.frame.Reset(w.num)
   194  	w.state.reset()
   195  	w.src = writer
   196  }
   197  
   198  // ReadFrom efficiently reads from r and compressed into the Writer destination.
   199  func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
   200  	switch w.state.state {
   201  	case closedState, errorState:
   202  		return 0, w.state.err
   203  	case newState:
   204  		if err = w.init(); w.state.next(err) {
   205  			return
   206  		}
   207  	default:
   208  		return 0, w.state.fail()
   209  	}
   210  	defer w.state.check(&err)
   211  
   212  	size := w.frame.Descriptor.Flags.BlockSizeIndex()
   213  	var done bool
   214  	var rn int
   215  	data := size.Get()
   216  	if w.isNotConcurrent() {
   217  		// Keep the same buffer for the whole process.
   218  		defer lz4block.Put(data)
   219  	}
   220  	for !done {
   221  		rn, err = io.ReadFull(r, data)
   222  		switch err {
   223  		case nil:
   224  		case io.EOF, io.ErrUnexpectedEOF: // read may be partial
   225  			done = true
   226  		default:
   227  			return
   228  		}
   229  		n += int64(rn)
   230  		err = w.write(data[:rn], true)
   231  		if err != nil {
   232  			return
   233  		}
   234  		w.handler(rn)
   235  		if !done && !w.isNotConcurrent() {
   236  			// The buffer will be returned automatically by go routines (safe=true)
   237  			// so get a new one fo the next round.
   238  			data = size.Get()
   239  		}
   240  	}
   241  	return
   242  }
   243  

View as plain text