...

Source file src/go.opencensus.io/stats/view/aggregation_data.go

Documentation: go.opencensus.io/stats/view

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package view
    17  
    18  import (
    19  	"math"
    20  	"time"
    21  
    22  	"go.opencensus.io/metric/metricdata"
    23  )
    24  
    25  // AggregationData represents an aggregated value from a collection.
    26  // They are reported on the view data during exporting.
    27  // Mosts users won't directly access aggregration data.
    28  type AggregationData interface {
    29  	isAggregationData() bool
    30  	addSample(v float64, attachments map[string]interface{}, t time.Time)
    31  	clone() AggregationData
    32  	equal(other AggregationData) bool
    33  	toPoint(t metricdata.Type, time time.Time) metricdata.Point
    34  	StartTime() time.Time
    35  }
    36  
    37  const epsilon = 1e-9
    38  
    39  // CountData is the aggregated data for the Count aggregation.
    40  // A count aggregation processes data and counts the recordings.
    41  //
    42  // Most users won't directly access count data.
    43  type CountData struct {
    44  	Start time.Time
    45  	Value int64
    46  }
    47  
    48  func (a *CountData) isAggregationData() bool { return true }
    49  
    50  func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
    51  	a.Value = a.Value + 1
    52  }
    53  
    54  func (a *CountData) clone() AggregationData {
    55  	return &CountData{Value: a.Value, Start: a.Start}
    56  }
    57  
    58  func (a *CountData) equal(other AggregationData) bool {
    59  	a2, ok := other.(*CountData)
    60  	if !ok {
    61  		return false
    62  	}
    63  
    64  	return a.Start.Equal(a2.Start) && a.Value == a2.Value
    65  }
    66  
    67  func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
    68  	switch metricType {
    69  	case metricdata.TypeCumulativeInt64:
    70  		return metricdata.NewInt64Point(t, a.Value)
    71  	default:
    72  		panic("unsupported metricdata.Type")
    73  	}
    74  }
    75  
    76  // StartTime returns the start time of the data being aggregated by CountData.
    77  func (a *CountData) StartTime() time.Time {
    78  	return a.Start
    79  }
    80  
    81  // SumData is the aggregated data for the Sum aggregation.
    82  // A sum aggregation processes data and sums up the recordings.
    83  //
    84  // Most users won't directly access sum data.
    85  type SumData struct {
    86  	Start time.Time
    87  	Value float64
    88  }
    89  
    90  func (a *SumData) isAggregationData() bool { return true }
    91  
    92  func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
    93  	a.Value += v
    94  }
    95  
    96  func (a *SumData) clone() AggregationData {
    97  	return &SumData{Value: a.Value, Start: a.Start}
    98  }
    99  
   100  func (a *SumData) equal(other AggregationData) bool {
   101  	a2, ok := other.(*SumData)
   102  	if !ok {
   103  		return false
   104  	}
   105  	return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon
   106  }
   107  
   108  func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
   109  	switch metricType {
   110  	case metricdata.TypeCumulativeInt64:
   111  		return metricdata.NewInt64Point(t, int64(a.Value))
   112  	case metricdata.TypeCumulativeFloat64:
   113  		return metricdata.NewFloat64Point(t, a.Value)
   114  	default:
   115  		panic("unsupported metricdata.Type")
   116  	}
   117  }
   118  
   119  // StartTime returns the start time of the data being aggregated by SumData.
   120  func (a *SumData) StartTime() time.Time {
   121  	return a.Start
   122  }
   123  
   124  // DistributionData is the aggregated data for the
   125  // Distribution aggregation.
   126  //
   127  // Most users won't directly access distribution data.
   128  //
   129  // For a distribution with N bounds, the associated DistributionData will have
   130  // N+1 buckets.
   131  type DistributionData struct {
   132  	Count           int64   // number of data points aggregated
   133  	Min             float64 // minimum value in the distribution
   134  	Max             float64 // max value in the distribution
   135  	Mean            float64 // mean of the distribution
   136  	SumOfSquaredDev float64 // sum of the squared deviation from the mean
   137  	CountPerBucket  []int64 // number of occurrences per bucket
   138  	// ExemplarsPerBucket is slice the same length as CountPerBucket containing
   139  	// an exemplar for the associated bucket, or nil.
   140  	ExemplarsPerBucket []*metricdata.Exemplar
   141  	bounds             []float64 // histogram distribution of the values
   142  	Start              time.Time
   143  }
   144  
   145  func newDistributionData(agg *Aggregation, t time.Time) *DistributionData {
   146  	bucketCount := len(agg.Buckets) + 1
   147  	return &DistributionData{
   148  		CountPerBucket:     make([]int64, bucketCount),
   149  		ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
   150  		bounds:             agg.Buckets,
   151  		Min:                math.MaxFloat64,
   152  		Max:                math.SmallestNonzeroFloat64,
   153  		Start:              t,
   154  	}
   155  }
   156  
   157  // Sum returns the sum of all samples collected.
   158  func (a *DistributionData) Sum() float64 { return a.Mean * float64(a.Count) }
   159  
   160  func (a *DistributionData) variance() float64 {
   161  	if a.Count <= 1 {
   162  		return 0
   163  	}
   164  	return a.SumOfSquaredDev / float64(a.Count-1)
   165  }
   166  
   167  func (a *DistributionData) isAggregationData() bool { return true }
   168  
   169  // TODO(songy23): support exemplar attachments.
   170  func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
   171  	if v < a.Min {
   172  		a.Min = v
   173  	}
   174  	if v > a.Max {
   175  		a.Max = v
   176  	}
   177  	a.Count++
   178  	a.addToBucket(v, attachments, t)
   179  
   180  	if a.Count == 1 {
   181  		a.Mean = v
   182  		return
   183  	}
   184  
   185  	oldMean := a.Mean
   186  	a.Mean = a.Mean + (v-a.Mean)/float64(a.Count)
   187  	a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
   188  }
   189  
   190  func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
   191  	var count *int64
   192  	var i int
   193  	var b float64
   194  	for i, b = range a.bounds {
   195  		if v < b {
   196  			count = &a.CountPerBucket[i]
   197  			break
   198  		}
   199  	}
   200  	if count == nil { // Last bucket.
   201  		i = len(a.bounds)
   202  		count = &a.CountPerBucket[i]
   203  	}
   204  	*count++
   205  	if exemplar := getExemplar(v, attachments, t); exemplar != nil {
   206  		a.ExemplarsPerBucket[i] = exemplar
   207  	}
   208  }
   209  
   210  func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
   211  	if len(attachments) == 0 {
   212  		return nil
   213  	}
   214  	return &metricdata.Exemplar{
   215  		Value:       v,
   216  		Timestamp:   t,
   217  		Attachments: attachments,
   218  	}
   219  }
   220  
   221  func (a *DistributionData) clone() AggregationData {
   222  	c := *a
   223  	c.CountPerBucket = append([]int64(nil), a.CountPerBucket...)
   224  	c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...)
   225  	return &c
   226  }
   227  
   228  func (a *DistributionData) equal(other AggregationData) bool {
   229  	a2, ok := other.(*DistributionData)
   230  	if !ok {
   231  		return false
   232  	}
   233  	if a2 == nil {
   234  		return false
   235  	}
   236  	if len(a.CountPerBucket) != len(a2.CountPerBucket) {
   237  		return false
   238  	}
   239  	for i := range a.CountPerBucket {
   240  		if a.CountPerBucket[i] != a2.CountPerBucket[i] {
   241  			return false
   242  		}
   243  	}
   244  	return a.Start.Equal(a2.Start) &&
   245  		a.Count == a2.Count &&
   246  		a.Min == a2.Min &&
   247  		a.Max == a2.Max &&
   248  		math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
   249  }
   250  
   251  func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
   252  	switch metricType {
   253  	case metricdata.TypeCumulativeDistribution:
   254  		buckets := []metricdata.Bucket{}
   255  		for i := 0; i < len(a.CountPerBucket); i++ {
   256  			buckets = append(buckets, metricdata.Bucket{
   257  				Count:    a.CountPerBucket[i],
   258  				Exemplar: a.ExemplarsPerBucket[i],
   259  			})
   260  		}
   261  		bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds}
   262  
   263  		val := &metricdata.Distribution{
   264  			Count:                 a.Count,
   265  			Sum:                   a.Sum(),
   266  			SumOfSquaredDeviation: a.SumOfSquaredDev,
   267  			BucketOptions:         bucketOptions,
   268  			Buckets:               buckets,
   269  		}
   270  		return metricdata.NewDistributionPoint(t, val)
   271  
   272  	default:
   273  		// TODO: [rghetia] when we have a use case for TypeGaugeDistribution.
   274  		panic("unsupported metricdata.Type")
   275  	}
   276  }
   277  
   278  // StartTime returns the start time of the data being aggregated by DistributionData.
   279  func (a *DistributionData) StartTime() time.Time {
   280  	return a.Start
   281  }
   282  
   283  // LastValueData returns the last value recorded for LastValue aggregation.
   284  type LastValueData struct {
   285  	Value float64
   286  }
   287  
   288  func (l *LastValueData) isAggregationData() bool {
   289  	return true
   290  }
   291  
   292  func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
   293  	l.Value = v
   294  }
   295  
   296  func (l *LastValueData) clone() AggregationData {
   297  	return &LastValueData{l.Value}
   298  }
   299  
   300  func (l *LastValueData) equal(other AggregationData) bool {
   301  	a2, ok := other.(*LastValueData)
   302  	if !ok {
   303  		return false
   304  	}
   305  	return l.Value == a2.Value
   306  }
   307  
   308  func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
   309  	switch metricType {
   310  	case metricdata.TypeGaugeInt64:
   311  		return metricdata.NewInt64Point(t, int64(l.Value))
   312  	case metricdata.TypeGaugeFloat64:
   313  		return metricdata.NewFloat64Point(t, l.Value)
   314  	default:
   315  		panic("unsupported metricdata.Type")
   316  	}
   317  }
   318  
   319  // StartTime returns an empty time value as start time is not recorded when using last value
   320  // aggregation.
   321  func (l *LastValueData) StartTime() time.Time {
   322  	return time.Time{}
   323  }
   324  
   325  // ClearStart clears the Start field from data if present. Useful for testing in cases where the
   326  // start time will be nondeterministic.
   327  func ClearStart(data AggregationData) {
   328  	switch data := data.(type) {
   329  	case *CountData:
   330  		data.Start = time.Time{}
   331  	case *SumData:
   332  		data.Start = time.Time{}
   333  	case *DistributionData:
   334  		data.Start = time.Time{}
   335  	}
   336  }
   337  

View as plain text