...

Source file src/github.com/beorn7/perks/quantile/stream.go

Documentation: github.com/beorn7/perks/quantile

     1  // Package quantile computes approximate quantiles over an unbounded data
     2  // stream within low memory and CPU bounds.
     3  //
     4  // A small amount of accuracy is traded to achieve the above properties.
     5  //
     6  // Multiple streams can be merged before calling Query to generate a single set
     7  // of results. This is meaningful when the streams represent the same type of
     8  // data. See Merge and Samples.
     9  //
    10  // For more detailed information about the algorithm used, see:
    11  //
    12  // Effective Computation of Biased Quantiles over Data Streams
    13  //
    14  // http://www.cs.rutgers.edu/~muthu/bquant.pdf
    15  package quantile
    16  
    17  import (
    18  	"math"
    19  	"sort"
    20  )
    21  
    22  // Sample holds an observed value and meta information for compression. JSON
    23  // tags have been added for convenience.
    24  type Sample struct {
    25  	Value float64 `json:",string"`
    26  	Width float64 `json:",string"`
    27  	Delta float64 `json:",string"`
    28  }
    29  
    30  // Samples represents a slice of samples. It implements sort.Interface.
    31  type Samples []Sample
    32  
    33  func (a Samples) Len() int           { return len(a) }
    34  func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
    35  func (a Samples) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
    36  
    37  type invariant func(s *stream, r float64) float64
    38  
    39  // NewLowBiased returns an initialized Stream for low-biased quantiles
    40  // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
    41  // error guarantees can still be given even for the lower ranks of the data
    42  // distribution.
    43  //
    44  // The provided epsilon is a relative error, i.e. the true quantile of a value
    45  // returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
    46  //
    47  // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
    48  // properties.
    49  func NewLowBiased(epsilon float64) *Stream {
    50  	ƒ := func(s *stream, r float64) float64 {
    51  		return 2 * epsilon * r
    52  	}
    53  	return newStream(ƒ)
    54  }
    55  
    56  // NewHighBiased returns an initialized Stream for high-biased quantiles
    57  // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
    58  // error guarantees can still be given even for the higher ranks of the data
    59  // distribution.
    60  //
    61  // The provided epsilon is a relative error, i.e. the true quantile of a value
    62  // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
    63  //
    64  // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
    65  // properties.
    66  func NewHighBiased(epsilon float64) *Stream {
    67  	ƒ := func(s *stream, r float64) float64 {
    68  		return 2 * epsilon * (s.n - r)
    69  	}
    70  	return newStream(ƒ)
    71  }
    72  
    73  // NewTargeted returns an initialized Stream concerned with a particular set of
    74  // quantile values that are supplied a priori. Knowing these a priori reduces
    75  // space and computation time. The targets map maps the desired quantiles to
    76  // their absolute errors, i.e. the true quantile of a value returned by a query
    77  // is guaranteed to be within (Quantile±Epsilon).
    78  //
    79  // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
    80  func NewTargeted(targetMap map[float64]float64) *Stream {
    81  	// Convert map to slice to avoid slow iterations on a map.
    82  	// ƒ is called on the hot path, so converting the map to a slice
    83  	// beforehand results in significant CPU savings.
    84  	targets := targetMapToSlice(targetMap)
    85  
    86  	ƒ := func(s *stream, r float64) float64 {
    87  		var m = math.MaxFloat64
    88  		var f float64
    89  		for _, t := range targets {
    90  			if t.quantile*s.n <= r {
    91  				f = (2 * t.epsilon * r) / t.quantile
    92  			} else {
    93  				f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
    94  			}
    95  			if f < m {
    96  				m = f
    97  			}
    98  		}
    99  		return m
   100  	}
   101  	return newStream(ƒ)
   102  }
   103  
   104  type target struct {
   105  	quantile float64
   106  	epsilon  float64
   107  }
   108  
   109  func targetMapToSlice(targetMap map[float64]float64) []target {
   110  	targets := make([]target, 0, len(targetMap))
   111  
   112  	for quantile, epsilon := range targetMap {
   113  		t := target{
   114  			quantile: quantile,
   115  			epsilon:  epsilon,
   116  		}
   117  		targets = append(targets, t)
   118  	}
   119  
   120  	return targets
   121  }
   122  
   123  // Stream computes quantiles for a stream of float64s. It is not thread-safe by
   124  // design. Take care when using across multiple goroutines.
   125  type Stream struct {
   126  	*stream
   127  	b      Samples
   128  	sorted bool
   129  }
   130  
   131  func newStream(ƒ invariant) *Stream {
   132  	x := &stream{ƒ: ƒ}
   133  	return &Stream{x, make(Samples, 0, 500), true}
   134  }
   135  
   136  // Insert inserts v into the stream.
   137  func (s *Stream) Insert(v float64) {
   138  	s.insert(Sample{Value: v, Width: 1})
   139  }
   140  
   141  func (s *Stream) insert(sample Sample) {
   142  	s.b = append(s.b, sample)
   143  	s.sorted = false
   144  	if len(s.b) == cap(s.b) {
   145  		s.flush()
   146  	}
   147  }
   148  
   149  // Query returns the computed qth percentiles value. If s was created with
   150  // NewTargeted, and q is not in the set of quantiles provided a priori, Query
   151  // will return an unspecified result.
   152  func (s *Stream) Query(q float64) float64 {
   153  	if !s.flushed() {
   154  		// Fast path when there hasn't been enough data for a flush;
   155  		// this also yields better accuracy for small sets of data.
   156  		l := len(s.b)
   157  		if l == 0 {
   158  			return 0
   159  		}
   160  		i := int(math.Ceil(float64(l) * q))
   161  		if i > 0 {
   162  			i -= 1
   163  		}
   164  		s.maybeSort()
   165  		return s.b[i].Value
   166  	}
   167  	s.flush()
   168  	return s.stream.query(q)
   169  }
   170  
   171  // Merge merges samples into the underlying streams samples. This is handy when
   172  // merging multiple streams from separate threads, database shards, etc.
   173  //
   174  // ATTENTION: This method is broken and does not yield correct results. The
   175  // underlying algorithm is not capable of merging streams correctly.
   176  func (s *Stream) Merge(samples Samples) {
   177  	sort.Sort(samples)
   178  	s.stream.merge(samples)
   179  }
   180  
   181  // Reset reinitializes and clears the list reusing the samples buffer memory.
   182  func (s *Stream) Reset() {
   183  	s.stream.reset()
   184  	s.b = s.b[:0]
   185  }
   186  
   187  // Samples returns stream samples held by s.
   188  func (s *Stream) Samples() Samples {
   189  	if !s.flushed() {
   190  		return s.b
   191  	}
   192  	s.flush()
   193  	return s.stream.samples()
   194  }
   195  
   196  // Count returns the total number of samples observed in the stream
   197  // since initialization.
   198  func (s *Stream) Count() int {
   199  	return len(s.b) + s.stream.count()
   200  }
   201  
   202  func (s *Stream) flush() {
   203  	s.maybeSort()
   204  	s.stream.merge(s.b)
   205  	s.b = s.b[:0]
   206  }
   207  
   208  func (s *Stream) maybeSort() {
   209  	if !s.sorted {
   210  		s.sorted = true
   211  		sort.Sort(s.b)
   212  	}
   213  }
   214  
   215  func (s *Stream) flushed() bool {
   216  	return len(s.stream.l) > 0
   217  }
   218  
   219  type stream struct {
   220  	n float64
   221  	l []Sample
   222  	ƒ invariant
   223  }
   224  
   225  func (s *stream) reset() {
   226  	s.l = s.l[:0]
   227  	s.n = 0
   228  }
   229  
   230  func (s *stream) insert(v float64) {
   231  	s.merge(Samples{{v, 1, 0}})
   232  }
   233  
   234  func (s *stream) merge(samples Samples) {
   235  	// TODO(beorn7): This tries to merge not only individual samples, but
   236  	// whole summaries. The paper doesn't mention merging summaries at
   237  	// all. Unittests show that the merging is inaccurate. Find out how to
   238  	// do merges properly.
   239  	var r float64
   240  	i := 0
   241  	for _, sample := range samples {
   242  		for ; i < len(s.l); i++ {
   243  			c := s.l[i]
   244  			if c.Value > sample.Value {
   245  				// Insert at position i.
   246  				s.l = append(s.l, Sample{})
   247  				copy(s.l[i+1:], s.l[i:])
   248  				s.l[i] = Sample{
   249  					sample.Value,
   250  					sample.Width,
   251  					math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
   252  					// TODO(beorn7): How to calculate delta correctly?
   253  				}
   254  				i++
   255  				goto inserted
   256  			}
   257  			r += c.Width
   258  		}
   259  		s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
   260  		i++
   261  	inserted:
   262  		s.n += sample.Width
   263  		r += sample.Width
   264  	}
   265  	s.compress()
   266  }
   267  
   268  func (s *stream) count() int {
   269  	return int(s.n)
   270  }
   271  
   272  func (s *stream) query(q float64) float64 {
   273  	t := math.Ceil(q * s.n)
   274  	t += math.Ceil(s.ƒ(s, t) / 2)
   275  	p := s.l[0]
   276  	var r float64
   277  	for _, c := range s.l[1:] {
   278  		r += p.Width
   279  		if r+c.Width+c.Delta > t {
   280  			return p.Value
   281  		}
   282  		p = c
   283  	}
   284  	return p.Value
   285  }
   286  
   287  func (s *stream) compress() {
   288  	if len(s.l) < 2 {
   289  		return
   290  	}
   291  	x := s.l[len(s.l)-1]
   292  	xi := len(s.l) - 1
   293  	r := s.n - 1 - x.Width
   294  
   295  	for i := len(s.l) - 2; i >= 0; i-- {
   296  		c := s.l[i]
   297  		if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
   298  			x.Width += c.Width
   299  			s.l[xi] = x
   300  			// Remove element at i.
   301  			copy(s.l[i:], s.l[i+1:])
   302  			s.l = s.l[:len(s.l)-1]
   303  			xi -= 1
   304  		} else {
   305  			x = c
   306  			xi = i
   307  		}
   308  		r -= c.Width
   309  	}
   310  }
   311  
   312  func (s *stream) samples() Samples {
   313  	samples := make(Samples, len(s.l))
   314  	copy(samples, s.l)
   315  	return samples
   316  }
   317  

View as plain text