...

Source file src/github.com/mxk/go-flowrate/flowrate/flowrate.go

Documentation: github.com/mxk/go-flowrate/flowrate

     1  //
     2  // Written by Maxim Khitrov (November 2012)
     3  //
     4  
     5  // Package flowrate provides the tools for monitoring and limiting the flow rate
     6  // of an arbitrary data stream.
     7  package flowrate
     8  
     9  import (
    10  	"math"
    11  	"sync"
    12  	"time"
    13  )
    14  
    15  // Monitor monitors and limits the transfer rate of a data stream.
    16  type Monitor struct {
    17  	mu      sync.Mutex    // Mutex guarding access to all internal fields
    18  	active  bool          // Flag indicating an active transfer
    19  	start   time.Duration // Transfer start time (clock() value)
    20  	bytes   int64         // Total number of bytes transferred
    21  	samples int64         // Total number of samples taken
    22  
    23  	rSample float64 // Most recent transfer rate sample (bytes per second)
    24  	rEMA    float64 // Exponential moving average of rSample
    25  	rPeak   float64 // Peak transfer rate (max of all rSamples)
    26  	rWindow float64 // rEMA window (seconds)
    27  
    28  	sBytes int64         // Number of bytes transferred since sLast
    29  	sLast  time.Duration // Most recent sample time (stop time when inactive)
    30  	sRate  time.Duration // Sampling rate
    31  
    32  	tBytes int64         // Number of bytes expected in the current transfer
    33  	tLast  time.Duration // Time of the most recent transfer of at least 1 byte
    34  }
    35  
    36  // New creates a new flow control monitor. Instantaneous transfer rate is
    37  // measured and updated for each sampleRate interval. windowSize determines the
    38  // weight of each sample in the exponential moving average (EMA) calculation.
    39  // The exact formulas are:
    40  //
    41  // 	sampleTime = currentTime - prevSampleTime
    42  // 	sampleRate = byteCount / sampleTime
    43  // 	weight     = 1 - exp(-sampleTime/windowSize)
    44  // 	newRate    = weight*sampleRate + (1-weight)*oldRate
    45  //
    46  // The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
    47  // respectively.
    48  func New(sampleRate, windowSize time.Duration) *Monitor {
    49  	if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
    50  		sampleRate = 5 * clockRate
    51  	}
    52  	if windowSize <= 0 {
    53  		windowSize = 1 * time.Second
    54  	}
    55  	now := clock()
    56  	return &Monitor{
    57  		active:  true,
    58  		start:   now,
    59  		rWindow: windowSize.Seconds(),
    60  		sLast:   now,
    61  		sRate:   sampleRate,
    62  		tLast:   now,
    63  	}
    64  }
    65  
    66  // Update records the transfer of n bytes and returns n. It should be called
    67  // after each Read/Write operation, even if n is 0.
    68  func (m *Monitor) Update(n int) int {
    69  	m.mu.Lock()
    70  	m.update(n)
    71  	m.mu.Unlock()
    72  	return n
    73  }
    74  
    75  // IO is a convenience method intended to wrap io.Reader and io.Writer method
    76  // execution. It calls m.Update(n) and then returns (n, err) unmodified.
    77  func (m *Monitor) IO(n int, err error) (int, error) {
    78  	return m.Update(n), err
    79  }
    80  
    81  // Done marks the transfer as finished and prevents any further updates or
    82  // limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
    83  // Limit methods become NOOPs. It returns the total number of bytes transferred.
    84  func (m *Monitor) Done() int64 {
    85  	m.mu.Lock()
    86  	if now := m.update(0); m.sBytes > 0 {
    87  		m.reset(now)
    88  	}
    89  	m.active = false
    90  	m.tLast = 0
    91  	n := m.bytes
    92  	m.mu.Unlock()
    93  	return n
    94  }
    95  
    96  // timeRemLimit is the maximum Status.TimeRem value.
    97  const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
    98  
    99  // Status represents the current Monitor status. All transfer rates are in bytes
   100  // per second rounded to the nearest byte.
   101  type Status struct {
   102  	Active   bool          // Flag indicating an active transfer
   103  	Start    time.Time     // Transfer start time
   104  	Duration time.Duration // Time period covered by the statistics
   105  	Idle     time.Duration // Time since the last transfer of at least 1 byte
   106  	Bytes    int64         // Total number of bytes transferred
   107  	Samples  int64         // Total number of samples taken
   108  	InstRate int64         // Instantaneous transfer rate
   109  	CurRate  int64         // Current transfer rate (EMA of InstRate)
   110  	AvgRate  int64         // Average transfer rate (Bytes / Duration)
   111  	PeakRate int64         // Maximum instantaneous transfer rate
   112  	BytesRem int64         // Number of bytes remaining in the transfer
   113  	TimeRem  time.Duration // Estimated time to completion
   114  	Progress Percent       // Overall transfer progress
   115  }
   116  
   117  // Status returns current transfer status information. The returned value
   118  // becomes static after a call to Done.
   119  func (m *Monitor) Status() Status {
   120  	m.mu.Lock()
   121  	now := m.update(0)
   122  	s := Status{
   123  		Active:   m.active,
   124  		Start:    clockToTime(m.start),
   125  		Duration: m.sLast - m.start,
   126  		Idle:     now - m.tLast,
   127  		Bytes:    m.bytes,
   128  		Samples:  m.samples,
   129  		PeakRate: round(m.rPeak),
   130  		BytesRem: m.tBytes - m.bytes,
   131  		Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
   132  	}
   133  	if s.BytesRem < 0 {
   134  		s.BytesRem = 0
   135  	}
   136  	if s.Duration > 0 {
   137  		rAvg := float64(s.Bytes) / s.Duration.Seconds()
   138  		s.AvgRate = round(rAvg)
   139  		if s.Active {
   140  			s.InstRate = round(m.rSample)
   141  			s.CurRate = round(m.rEMA)
   142  			if s.BytesRem > 0 {
   143  				if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
   144  					ns := float64(s.BytesRem) / tRate * 1e9
   145  					if ns > float64(timeRemLimit) {
   146  						ns = float64(timeRemLimit)
   147  					}
   148  					s.TimeRem = clockRound(time.Duration(ns))
   149  				}
   150  			}
   151  		}
   152  	}
   153  	m.mu.Unlock()
   154  	return s
   155  }
   156  
   157  // Limit restricts the instantaneous (per-sample) data flow to rate bytes per
   158  // second. It returns the maximum number of bytes (0 <= n <= want) that may be
   159  // transferred immediately without exceeding the limit. If block == true, the
   160  // call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
   161  // or the transfer is inactive (after a call to Done).
   162  //
   163  // At least one byte is always allowed to be transferred in any given sampling
   164  // period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
   165  // is 10 bytes per second.
   166  //
   167  // For usage examples, see the implementation of Reader and Writer in io.go.
   168  func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
   169  	if want < 1 || rate < 1 {
   170  		return want
   171  	}
   172  	m.mu.Lock()
   173  
   174  	// Determine the maximum number of bytes that can be sent in one sample
   175  	limit := round(float64(rate) * m.sRate.Seconds())
   176  	if limit <= 0 {
   177  		limit = 1
   178  	}
   179  
   180  	// If block == true, wait until m.sBytes < limit
   181  	if now := m.update(0); block {
   182  		for m.sBytes >= limit && m.active {
   183  			now = m.waitNextSample(now)
   184  		}
   185  	}
   186  
   187  	// Make limit <= want (unlimited if the transfer is no longer active)
   188  	if limit -= m.sBytes; limit > int64(want) || !m.active {
   189  		limit = int64(want)
   190  	}
   191  	m.mu.Unlock()
   192  
   193  	if limit < 0 {
   194  		limit = 0
   195  	}
   196  	return int(limit)
   197  }
   198  
   199  // SetTransferSize specifies the total size of the data transfer, which allows
   200  // the Monitor to calculate the overall progress and time to completion.
   201  func (m *Monitor) SetTransferSize(bytes int64) {
   202  	if bytes < 0 {
   203  		bytes = 0
   204  	}
   205  	m.mu.Lock()
   206  	m.tBytes = bytes
   207  	m.mu.Unlock()
   208  }
   209  
   210  // update accumulates the transferred byte count for the current sample until
   211  // clock() - m.sLast >= m.sRate. The monitor status is updated once the current
   212  // sample is done.
   213  func (m *Monitor) update(n int) (now time.Duration) {
   214  	if !m.active {
   215  		return
   216  	}
   217  	if now = clock(); n > 0 {
   218  		m.tLast = now
   219  	}
   220  	m.sBytes += int64(n)
   221  	if sTime := now - m.sLast; sTime >= m.sRate {
   222  		t := sTime.Seconds()
   223  		if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
   224  			m.rPeak = m.rSample
   225  		}
   226  
   227  		// Exponential moving average using a method similar to *nix load
   228  		// average calculation. Longer sampling periods carry greater weight.
   229  		if m.samples > 0 {
   230  			w := math.Exp(-t / m.rWindow)
   231  			m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
   232  		} else {
   233  			m.rEMA = m.rSample
   234  		}
   235  		m.reset(now)
   236  	}
   237  	return
   238  }
   239  
   240  // reset clears the current sample state in preparation for the next sample.
   241  func (m *Monitor) reset(sampleTime time.Duration) {
   242  	m.bytes += m.sBytes
   243  	m.samples++
   244  	m.sBytes = 0
   245  	m.sLast = sampleTime
   246  }
   247  
   248  // waitNextSample sleeps for the remainder of the current sample. The lock is
   249  // released and reacquired during the actual sleep period, so it's possible for
   250  // the transfer to be inactive when this method returns.
   251  func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
   252  	const minWait = 5 * time.Millisecond
   253  	current := m.sLast
   254  
   255  	// sleep until the last sample time changes (ideally, just one iteration)
   256  	for m.sLast == current && m.active {
   257  		d := current + m.sRate - now
   258  		m.mu.Unlock()
   259  		if d < minWait {
   260  			d = minWait
   261  		}
   262  		time.Sleep(d)
   263  		m.mu.Lock()
   264  		now = m.update(0)
   265  	}
   266  	return now
   267  }
   268  

View as plain text