...

Source file src/go.etcd.io/etcd/pkg/v3/ioutil/pagewriter.go

Documentation: go.etcd.io/etcd/pkg/v3/ioutil

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package ioutil
    16  
    17  import (
    18  	"fmt"
    19  	"io"
    20  )
    21  
    22  var defaultBufferBytes = 128 * 1024
    23  
    24  // PageWriter implements the io.Writer interface so that writes will
    25  // either be in page chunks or from flushing.
    26  type PageWriter struct {
    27  	w io.Writer
    28  	// pageOffset tracks the page offset of the base of the buffer
    29  	pageOffset int
    30  	// pageBytes is the number of bytes per page
    31  	pageBytes int
    32  	// bufferedBytes counts the number of bytes pending for write in the buffer
    33  	bufferedBytes int
    34  	// buf holds the write buffer
    35  	buf []byte
    36  	// bufWatermarkBytes is the number of bytes the buffer can hold before it needs
    37  	// to be flushed. It is less than len(buf) so there is space for slack writes
    38  	// to bring the writer to page alignment.
    39  	bufWatermarkBytes int
    40  }
    41  
    42  // NewPageWriter creates a new PageWriter. pageBytes is the number of bytes
    43  // to write per page. pageOffset is the starting offset of io.Writer.
    44  func NewPageWriter(w io.Writer, pageBytes, pageOffset int) *PageWriter {
    45  	if pageBytes <= 0 {
    46  		panic(fmt.Sprintf("assertion failed: invalid pageBytes (%d) value, it must be greater than 0", pageBytes))
    47  	}
    48  	return &PageWriter{
    49  		w:                 w,
    50  		pageOffset:        pageOffset,
    51  		pageBytes:         pageBytes,
    52  		buf:               make([]byte, defaultBufferBytes+pageBytes),
    53  		bufWatermarkBytes: defaultBufferBytes,
    54  	}
    55  }
    56  
    57  func (pw *PageWriter) Write(p []byte) (n int, err error) {
    58  	if len(p)+pw.bufferedBytes <= pw.bufWatermarkBytes {
    59  		// no overflow
    60  		copy(pw.buf[pw.bufferedBytes:], p)
    61  		pw.bufferedBytes += len(p)
    62  		return len(p), nil
    63  	}
    64  	// complete the slack page in the buffer if unaligned
    65  	slack := pw.pageBytes - ((pw.pageOffset + pw.bufferedBytes) % pw.pageBytes)
    66  	if slack != pw.pageBytes {
    67  		partial := slack > len(p)
    68  		if partial {
    69  			// not enough data to complete the slack page
    70  			slack = len(p)
    71  		}
    72  		// special case: writing to slack page in buffer
    73  		copy(pw.buf[pw.bufferedBytes:], p[:slack])
    74  		pw.bufferedBytes += slack
    75  		n = slack
    76  		p = p[slack:]
    77  		if partial {
    78  			// avoid forcing an unaligned flush
    79  			return n, nil
    80  		}
    81  	}
    82  	// buffer contents are now page-aligned; clear out
    83  	if err = pw.Flush(); err != nil {
    84  		return n, err
    85  	}
    86  	// directly write all complete pages without copying
    87  	if len(p) > pw.pageBytes {
    88  		pages := len(p) / pw.pageBytes
    89  		c, werr := pw.w.Write(p[:pages*pw.pageBytes])
    90  		n += c
    91  		if werr != nil {
    92  			return n, werr
    93  		}
    94  		p = p[pages*pw.pageBytes:]
    95  	}
    96  	// write remaining tail to buffer
    97  	c, werr := pw.Write(p)
    98  	n += c
    99  	return n, werr
   100  }
   101  
   102  // Flush flushes buffered data.
   103  func (pw *PageWriter) Flush() error {
   104  	_, err := pw.flush()
   105  	return err
   106  }
   107  
   108  // FlushN flushes buffered data and returns the number of written bytes.
   109  func (pw *PageWriter) FlushN() (int, error) {
   110  	return pw.flush()
   111  }
   112  
   113  func (pw *PageWriter) flush() (int, error) {
   114  	if pw.bufferedBytes == 0 {
   115  		return 0, nil
   116  	}
   117  	n, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
   118  	pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
   119  	pw.bufferedBytes = 0
   120  	return n, err
   121  }
   122  

View as plain text