...

Source file src/github.com/dsoprea/go-utility/v2/filesystem/bounceback.go

Documentation: github.com/dsoprea/go-utility/v2/filesystem

     1  package rifs
     2  
     3  import (
     4  	"fmt"
     5  	"io"
     6  
     7  	"github.com/dsoprea/go-logging"
     8  )
     9  
    10  // BouncebackStats describes operation counts.
    11  type BouncebackStats struct {
    12  	reads  int
    13  	writes int
    14  	seeks  int
    15  	syncs  int
    16  }
    17  
    18  func (bbs BouncebackStats) String() string {
    19  	return fmt.Sprintf(
    20  		"BouncebackStats<READS=(%d) WRITES=(%d) SEEKS=(%d) SYNCS=(%d)>",
    21  		bbs.reads, bbs.writes, bbs.seeks, bbs.syncs)
    22  }
    23  
    24  type bouncebackBase struct {
    25  	currentPosition int64
    26  
    27  	stats BouncebackStats
    28  }
    29  
    30  // Position returns the position that we're supposed to be at.
    31  func (bb *bouncebackBase) Position() int64 {
    32  
    33  	// TODO(dustin): Add test
    34  
    35  	return bb.currentPosition
    36  }
    37  
    38  // StatsReads returns the number of reads that have been attempted.
    39  func (bb *bouncebackBase) StatsReads() int {
    40  
    41  	// TODO(dustin): Add test
    42  
    43  	return bb.stats.reads
    44  }
    45  
    46  // StatsWrites returns the number of write operations.
    47  func (bb *bouncebackBase) StatsWrites() int {
    48  
    49  	// TODO(dustin): Add test
    50  
    51  	return bb.stats.writes
    52  }
    53  
    54  // StatsSeeks returns the number of seeks.
    55  func (bb *bouncebackBase) StatsSeeks() int {
    56  
    57  	// TODO(dustin): Add test
    58  
    59  	return bb.stats.seeks
    60  }
    61  
    62  // StatsSyncs returns the number of corrective seeks ("bounce-backs").
    63  func (bb *bouncebackBase) StatsSyncs() int {
    64  
    65  	// TODO(dustin): Add test
    66  
    67  	return bb.stats.syncs
    68  }
    69  
    70  // Seek does a seek to an arbitrary place in the `io.ReadSeeker`.
    71  func (bb *bouncebackBase) seek(s io.Seeker, offset int64, whence int) (newPosition int64, err error) {
    72  	defer func() {
    73  		if state := recover(); state != nil {
    74  			err = log.Wrap(state.(error))
    75  		}
    76  	}()
    77  
    78  	// If the seek is relative, make sure we're where we're supposed to be *first*.
    79  	if whence != io.SeekStart {
    80  		err = bb.checkPosition(s)
    81  		log.PanicIf(err)
    82  	}
    83  
    84  	bb.stats.seeks++
    85  
    86  	newPosition, err = s.Seek(offset, whence)
    87  	log.PanicIf(err)
    88  
    89  	// Update our internal tracking.
    90  	bb.currentPosition = newPosition
    91  
    92  	return newPosition, nil
    93  }
    94  
    95  func (bb *bouncebackBase) checkPosition(s io.Seeker) (err error) {
    96  	defer func() {
    97  		if state := recover(); state != nil {
    98  			err = log.Wrap(state.(error))
    99  		}
   100  	}()
   101  
   102  	// Make sure we're where we're supposed to be.
   103  
   104  	// This should have no overhead, and enables us to collect stats.
   105  	realCurrentPosition, err := s.Seek(0, io.SeekCurrent)
   106  	log.PanicIf(err)
   107  
   108  	if realCurrentPosition != bb.currentPosition {
   109  		bb.stats.syncs++
   110  
   111  		_, err = s.Seek(bb.currentPosition, io.SeekStart)
   112  		log.PanicIf(err)
   113  	}
   114  
   115  	return nil
   116  }
   117  
   118  // BouncebackReader wraps a ReadSeeker, keeps track of our position, and
   119  // seeks back to it before writing. This allows an underlying ReadWriteSeeker
   120  // with an unstable position can still be used for a prolonged series of writes.
   121  type BouncebackReader struct {
   122  	rs io.ReadSeeker
   123  
   124  	bouncebackBase
   125  }
   126  
   127  // NewBouncebackReader returns a `*BouncebackReader` struct.
   128  func NewBouncebackReader(rs io.ReadSeeker) (br *BouncebackReader, err error) {
   129  	defer func() {
   130  		if state := recover(); state != nil {
   131  			err = log.Wrap(state.(error))
   132  		}
   133  	}()
   134  
   135  	initialPosition, err := rs.Seek(0, io.SeekCurrent)
   136  	log.PanicIf(err)
   137  
   138  	bb := bouncebackBase{
   139  		currentPosition: initialPosition,
   140  	}
   141  
   142  	br = &BouncebackReader{
   143  		rs:             rs,
   144  		bouncebackBase: bb,
   145  	}
   146  
   147  	return br, nil
   148  }
   149  
   150  // Seek does a seek to an arbitrary place in the `io.ReadSeeker`.
   151  func (br *BouncebackReader) Seek(offset int64, whence int) (newPosition int64, err error) {
   152  	defer func() {
   153  		if state := recover(); state != nil {
   154  			err = log.Wrap(state.(error))
   155  		}
   156  	}()
   157  
   158  	newPosition, err = br.bouncebackBase.seek(br.rs, offset, whence)
   159  	log.PanicIf(err)
   160  
   161  	return newPosition, nil
   162  }
   163  
   164  // Seek does a standard read.
   165  func (br *BouncebackReader) Read(p []byte) (n int, err error) {
   166  	defer func() {
   167  		if state := recover(); state != nil {
   168  			err = log.Wrap(state.(error))
   169  		}
   170  	}()
   171  
   172  	br.bouncebackBase.stats.reads++
   173  
   174  	err = br.bouncebackBase.checkPosition(br.rs)
   175  	log.PanicIf(err)
   176  
   177  	// Do read.
   178  
   179  	n, err = br.rs.Read(p)
   180  	if err != nil {
   181  		if err == io.EOF {
   182  			return 0, io.EOF
   183  		}
   184  
   185  		log.Panic(err)
   186  	}
   187  
   188  	// Update our internal tracking.
   189  	br.bouncebackBase.currentPosition += int64(n)
   190  
   191  	return n, nil
   192  }
   193  
   194  // BouncebackWriter wraps a WriteSeeker, keeps track of our position, and
   195  // seeks back to it before writing. This allows an underlying ReadWriteSeeker
   196  // with an unstable position can still be used for a prolonged series of writes.
   197  type BouncebackWriter struct {
   198  	ws io.WriteSeeker
   199  
   200  	bouncebackBase
   201  }
   202  
   203  // NewBouncebackWriter returns a new `BouncebackWriter` struct.
   204  func NewBouncebackWriter(ws io.WriteSeeker) (bw *BouncebackWriter, err error) {
   205  	defer func() {
   206  		if state := recover(); state != nil {
   207  			err = log.Wrap(state.(error))
   208  		}
   209  	}()
   210  
   211  	initialPosition, err := ws.Seek(0, io.SeekCurrent)
   212  	log.PanicIf(err)
   213  
   214  	bb := bouncebackBase{
   215  		currentPosition: initialPosition,
   216  	}
   217  
   218  	bw = &BouncebackWriter{
   219  		ws:             ws,
   220  		bouncebackBase: bb,
   221  	}
   222  
   223  	return bw, nil
   224  }
   225  
   226  // Seek puts us at a specific position in the internal writer for the next
   227  // write/seek.
   228  func (bw *BouncebackWriter) Seek(offset int64, whence int) (newPosition int64, err error) {
   229  	defer func() {
   230  		if state := recover(); state != nil {
   231  			err = log.Wrap(state.(error))
   232  		}
   233  	}()
   234  
   235  	newPosition, err = bw.bouncebackBase.seek(bw.ws, offset, whence)
   236  	log.PanicIf(err)
   237  
   238  	return newPosition, nil
   239  }
   240  
   241  // Write performs a write against the internal `WriteSeeker` starting at the
   242  // position that we're supposed to be at.
   243  func (bw *BouncebackWriter) Write(p []byte) (n int, err error) {
   244  	defer func() {
   245  		if state := recover(); state != nil {
   246  			err = log.Wrap(state.(error))
   247  		}
   248  	}()
   249  
   250  	bw.bouncebackBase.stats.writes++
   251  
   252  	// Make sure we're where we're supposed to be.
   253  
   254  	realCurrentPosition, err := bw.ws.Seek(0, io.SeekCurrent)
   255  	log.PanicIf(err)
   256  
   257  	if realCurrentPosition != bw.bouncebackBase.currentPosition {
   258  		bw.bouncebackBase.stats.seeks++
   259  
   260  		_, err = bw.ws.Seek(bw.bouncebackBase.currentPosition, io.SeekStart)
   261  		log.PanicIf(err)
   262  	}
   263  
   264  	// Do write.
   265  
   266  	n, err = bw.ws.Write(p)
   267  	log.PanicIf(err)
   268  
   269  	// Update our internal tracking.
   270  	bw.bouncebackBase.currentPosition += int64(n)
   271  
   272  	return n, nil
   273  }
   274  

View as plain text