1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package view
17
18 import (
19 "math"
20 "time"
21
22 "go.opencensus.io/metric/metricdata"
23 )
24
25
26
27
28 type AggregationData interface {
29 isAggregationData() bool
30 addSample(v float64, attachments map[string]interface{}, t time.Time)
31 clone() AggregationData
32 equal(other AggregationData) bool
33 toPoint(t metricdata.Type, time time.Time) metricdata.Point
34 StartTime() time.Time
35 }
36
37 const epsilon = 1e-9
38
39
40
41
42
43 type CountData struct {
44 Start time.Time
45 Value int64
46 }
47
48 func (a *CountData) isAggregationData() bool { return true }
49
50 func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
51 a.Value = a.Value + 1
52 }
53
54 func (a *CountData) clone() AggregationData {
55 return &CountData{Value: a.Value, Start: a.Start}
56 }
57
58 func (a *CountData) equal(other AggregationData) bool {
59 a2, ok := other.(*CountData)
60 if !ok {
61 return false
62 }
63
64 return a.Start.Equal(a2.Start) && a.Value == a2.Value
65 }
66
67 func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
68 switch metricType {
69 case metricdata.TypeCumulativeInt64:
70 return metricdata.NewInt64Point(t, a.Value)
71 default:
72 panic("unsupported metricdata.Type")
73 }
74 }
75
76
77 func (a *CountData) StartTime() time.Time {
78 return a.Start
79 }
80
81
82
83
84
85 type SumData struct {
86 Start time.Time
87 Value float64
88 }
89
90 func (a *SumData) isAggregationData() bool { return true }
91
92 func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
93 a.Value += v
94 }
95
96 func (a *SumData) clone() AggregationData {
97 return &SumData{Value: a.Value, Start: a.Start}
98 }
99
100 func (a *SumData) equal(other AggregationData) bool {
101 a2, ok := other.(*SumData)
102 if !ok {
103 return false
104 }
105 return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon
106 }
107
108 func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
109 switch metricType {
110 case metricdata.TypeCumulativeInt64:
111 return metricdata.NewInt64Point(t, int64(a.Value))
112 case metricdata.TypeCumulativeFloat64:
113 return metricdata.NewFloat64Point(t, a.Value)
114 default:
115 panic("unsupported metricdata.Type")
116 }
117 }
118
119
120 func (a *SumData) StartTime() time.Time {
121 return a.Start
122 }
123
124
125
126
127
128
129
130
131 type DistributionData struct {
132 Count int64
133 Min float64
134 Max float64
135 Mean float64
136 SumOfSquaredDev float64
137 CountPerBucket []int64
138
139
140 ExemplarsPerBucket []*metricdata.Exemplar
141 bounds []float64
142 Start time.Time
143 }
144
145 func newDistributionData(agg *Aggregation, t time.Time) *DistributionData {
146 bucketCount := len(agg.Buckets) + 1
147 return &DistributionData{
148 CountPerBucket: make([]int64, bucketCount),
149 ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
150 bounds: agg.Buckets,
151 Min: math.MaxFloat64,
152 Max: math.SmallestNonzeroFloat64,
153 Start: t,
154 }
155 }
156
157
158 func (a *DistributionData) Sum() float64 { return a.Mean * float64(a.Count) }
159
160 func (a *DistributionData) variance() float64 {
161 if a.Count <= 1 {
162 return 0
163 }
164 return a.SumOfSquaredDev / float64(a.Count-1)
165 }
166
167 func (a *DistributionData) isAggregationData() bool { return true }
168
169
170 func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
171 if v < a.Min {
172 a.Min = v
173 }
174 if v > a.Max {
175 a.Max = v
176 }
177 a.Count++
178 a.addToBucket(v, attachments, t)
179
180 if a.Count == 1 {
181 a.Mean = v
182 return
183 }
184
185 oldMean := a.Mean
186 a.Mean = a.Mean + (v-a.Mean)/float64(a.Count)
187 a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
188 }
189
190 func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
191 var count *int64
192 var i int
193 var b float64
194 for i, b = range a.bounds {
195 if v < b {
196 count = &a.CountPerBucket[i]
197 break
198 }
199 }
200 if count == nil {
201 i = len(a.bounds)
202 count = &a.CountPerBucket[i]
203 }
204 *count++
205 if exemplar := getExemplar(v, attachments, t); exemplar != nil {
206 a.ExemplarsPerBucket[i] = exemplar
207 }
208 }
209
210 func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
211 if len(attachments) == 0 {
212 return nil
213 }
214 return &metricdata.Exemplar{
215 Value: v,
216 Timestamp: t,
217 Attachments: attachments,
218 }
219 }
220
221 func (a *DistributionData) clone() AggregationData {
222 c := *a
223 c.CountPerBucket = append([]int64(nil), a.CountPerBucket...)
224 c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...)
225 return &c
226 }
227
228 func (a *DistributionData) equal(other AggregationData) bool {
229 a2, ok := other.(*DistributionData)
230 if !ok {
231 return false
232 }
233 if a2 == nil {
234 return false
235 }
236 if len(a.CountPerBucket) != len(a2.CountPerBucket) {
237 return false
238 }
239 for i := range a.CountPerBucket {
240 if a.CountPerBucket[i] != a2.CountPerBucket[i] {
241 return false
242 }
243 }
244 return a.Start.Equal(a2.Start) &&
245 a.Count == a2.Count &&
246 a.Min == a2.Min &&
247 a.Max == a2.Max &&
248 math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
249 }
250
251 func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
252 switch metricType {
253 case metricdata.TypeCumulativeDistribution:
254 buckets := []metricdata.Bucket{}
255 for i := 0; i < len(a.CountPerBucket); i++ {
256 buckets = append(buckets, metricdata.Bucket{
257 Count: a.CountPerBucket[i],
258 Exemplar: a.ExemplarsPerBucket[i],
259 })
260 }
261 bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds}
262
263 val := &metricdata.Distribution{
264 Count: a.Count,
265 Sum: a.Sum(),
266 SumOfSquaredDeviation: a.SumOfSquaredDev,
267 BucketOptions: bucketOptions,
268 Buckets: buckets,
269 }
270 return metricdata.NewDistributionPoint(t, val)
271
272 default:
273
274 panic("unsupported metricdata.Type")
275 }
276 }
277
278
279 func (a *DistributionData) StartTime() time.Time {
280 return a.Start
281 }
282
283
284 type LastValueData struct {
285 Value float64
286 }
287
288 func (l *LastValueData) isAggregationData() bool {
289 return true
290 }
291
292 func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
293 l.Value = v
294 }
295
296 func (l *LastValueData) clone() AggregationData {
297 return &LastValueData{l.Value}
298 }
299
300 func (l *LastValueData) equal(other AggregationData) bool {
301 a2, ok := other.(*LastValueData)
302 if !ok {
303 return false
304 }
305 return l.Value == a2.Value
306 }
307
308 func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
309 switch metricType {
310 case metricdata.TypeGaugeInt64:
311 return metricdata.NewInt64Point(t, int64(l.Value))
312 case metricdata.TypeGaugeFloat64:
313 return metricdata.NewFloat64Point(t, l.Value)
314 default:
315 panic("unsupported metricdata.Type")
316 }
317 }
318
319
320
321 func (l *LastValueData) StartTime() time.Time {
322 return time.Time{}
323 }
324
325
326
327 func ClearStart(data AggregationData) {
328 switch data := data.(type) {
329 case *CountData:
330 data.Start = time.Time{}
331 case *SumData:
332 data.Start = time.Time{}
333 case *DistributionData:
334 data.Start = time.Time{}
335 }
336 }
337
View as plain text