...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package quantile
16
17 import (
18 "math"
19 "sort"
20 )
21
22
23
24 type Sample struct {
25 Value float64 `json:",string"`
26 Width float64 `json:",string"`
27 Delta float64 `json:",string"`
28 }
29
30
31 type Samples []Sample
32
33 func (a Samples) Len() int { return len(a) }
34 func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
35 func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
36
37 type invariant func(s *stream, r float64) float64
38
39
40
41
42
43
44
45
46
47
48
49 func NewLowBiased(epsilon float64) *Stream {
50 ƒ := func(s *stream, r float64) float64 {
51 return 2 * epsilon * r
52 }
53 return newStream(ƒ)
54 }
55
56
57
58
59
60
61
62
63
64
65
66 func NewHighBiased(epsilon float64) *Stream {
67 ƒ := func(s *stream, r float64) float64 {
68 return 2 * epsilon * (s.n - r)
69 }
70 return newStream(ƒ)
71 }
72
73
74
75
76
77
78
79
80 func NewTargeted(targetMap map[float64]float64) *Stream {
81
82
83
84 targets := targetMapToSlice(targetMap)
85
86 ƒ := func(s *stream, r float64) float64 {
87 var m = math.MaxFloat64
88 var f float64
89 for _, t := range targets {
90 if t.quantile*s.n <= r {
91 f = (2 * t.epsilon * r) / t.quantile
92 } else {
93 f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
94 }
95 if f < m {
96 m = f
97 }
98 }
99 return m
100 }
101 return newStream(ƒ)
102 }
103
104 type target struct {
105 quantile float64
106 epsilon float64
107 }
108
109 func targetMapToSlice(targetMap map[float64]float64) []target {
110 targets := make([]target, 0, len(targetMap))
111
112 for quantile, epsilon := range targetMap {
113 t := target{
114 quantile: quantile,
115 epsilon: epsilon,
116 }
117 targets = append(targets, t)
118 }
119
120 return targets
121 }
122
123
124
125 type Stream struct {
126 *stream
127 b Samples
128 sorted bool
129 }
130
131 func newStream(ƒ invariant) *Stream {
132 x := &stream{ƒ: ƒ}
133 return &Stream{x, make(Samples, 0, 500), true}
134 }
135
136
137 func (s *Stream) Insert(v float64) {
138 s.insert(Sample{Value: v, Width: 1})
139 }
140
141 func (s *Stream) insert(sample Sample) {
142 s.b = append(s.b, sample)
143 s.sorted = false
144 if len(s.b) == cap(s.b) {
145 s.flush()
146 }
147 }
148
149
150
151
152 func (s *Stream) Query(q float64) float64 {
153 if !s.flushed() {
154
155
156 l := len(s.b)
157 if l == 0 {
158 return 0
159 }
160 i := int(math.Ceil(float64(l) * q))
161 if i > 0 {
162 i -= 1
163 }
164 s.maybeSort()
165 return s.b[i].Value
166 }
167 s.flush()
168 return s.stream.query(q)
169 }
170
171
172
173
174
175
176 func (s *Stream) Merge(samples Samples) {
177 sort.Sort(samples)
178 s.stream.merge(samples)
179 }
180
181
182 func (s *Stream) Reset() {
183 s.stream.reset()
184 s.b = s.b[:0]
185 }
186
187
188 func (s *Stream) Samples() Samples {
189 if !s.flushed() {
190 return s.b
191 }
192 s.flush()
193 return s.stream.samples()
194 }
195
196
197
198 func (s *Stream) Count() int {
199 return len(s.b) + s.stream.count()
200 }
201
202 func (s *Stream) flush() {
203 s.maybeSort()
204 s.stream.merge(s.b)
205 s.b = s.b[:0]
206 }
207
208 func (s *Stream) maybeSort() {
209 if !s.sorted {
210 s.sorted = true
211 sort.Sort(s.b)
212 }
213 }
214
215 func (s *Stream) flushed() bool {
216 return len(s.stream.l) > 0
217 }
218
219 type stream struct {
220 n float64
221 l []Sample
222 ƒ invariant
223 }
224
225 func (s *stream) reset() {
226 s.l = s.l[:0]
227 s.n = 0
228 }
229
230 func (s *stream) insert(v float64) {
231 s.merge(Samples{{v, 1, 0}})
232 }
233
234 func (s *stream) merge(samples Samples) {
235
236
237
238
239 var r float64
240 i := 0
241 for _, sample := range samples {
242 for ; i < len(s.l); i++ {
243 c := s.l[i]
244 if c.Value > sample.Value {
245
246 s.l = append(s.l, Sample{})
247 copy(s.l[i+1:], s.l[i:])
248 s.l[i] = Sample{
249 sample.Value,
250 sample.Width,
251 math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
252
253 }
254 i++
255 goto inserted
256 }
257 r += c.Width
258 }
259 s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
260 i++
261 inserted:
262 s.n += sample.Width
263 r += sample.Width
264 }
265 s.compress()
266 }
267
268 func (s *stream) count() int {
269 return int(s.n)
270 }
271
272 func (s *stream) query(q float64) float64 {
273 t := math.Ceil(q * s.n)
274 t += math.Ceil(s.ƒ(s, t) / 2)
275 p := s.l[0]
276 var r float64
277 for _, c := range s.l[1:] {
278 r += p.Width
279 if r+c.Width+c.Delta > t {
280 return p.Value
281 }
282 p = c
283 }
284 return p.Value
285 }
286
287 func (s *stream) compress() {
288 if len(s.l) < 2 {
289 return
290 }
291 x := s.l[len(s.l)-1]
292 xi := len(s.l) - 1
293 r := s.n - 1 - x.Width
294
295 for i := len(s.l) - 2; i >= 0; i-- {
296 c := s.l[i]
297 if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
298 x.Width += c.Width
299 s.l[xi] = x
300
301 copy(s.l[i:], s.l[i+1:])
302 s.l = s.l[:len(s.l)-1]
303 xi -= 1
304 } else {
305 x = c
306 xi = i
307 }
308 r -= c.Width
309 }
310 }
311
312 func (s *stream) samples() Samples {
313 samples := make(Samples, len(s.l))
314 copy(samples, s.l)
315 return samples
316 }
317
View as plain text