...
1
2
3
4
5
6
7 package flowrate
8
9 import (
10 "math"
11 "sync"
12 "time"
13 )
14
15
16 type Monitor struct {
17 mu sync.Mutex
18 active bool
19 start time.Duration
20 bytes int64
21 samples int64
22
23 rSample float64
24 rEMA float64
25 rPeak float64
26 rWindow float64
27
28 sBytes int64
29 sLast time.Duration
30 sRate time.Duration
31
32 tBytes int64
33 tLast time.Duration
34 }
35
36
37
38
39
40
41
42
43
44
45
46
47
48 func New(sampleRate, windowSize time.Duration) *Monitor {
49 if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
50 sampleRate = 5 * clockRate
51 }
52 if windowSize <= 0 {
53 windowSize = 1 * time.Second
54 }
55 now := clock()
56 return &Monitor{
57 active: true,
58 start: now,
59 rWindow: windowSize.Seconds(),
60 sLast: now,
61 sRate: sampleRate,
62 tLast: now,
63 }
64 }
65
66
67
68 func (m *Monitor) Update(n int) int {
69 m.mu.Lock()
70 m.update(n)
71 m.mu.Unlock()
72 return n
73 }
74
75
76
77 func (m *Monitor) IO(n int, err error) (int, error) {
78 return m.Update(n), err
79 }
80
81
82
83
84 func (m *Monitor) Done() int64 {
85 m.mu.Lock()
86 if now := m.update(0); m.sBytes > 0 {
87 m.reset(now)
88 }
89 m.active = false
90 m.tLast = 0
91 n := m.bytes
92 m.mu.Unlock()
93 return n
94 }
95
96
97 const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
98
99
100
101 type Status struct {
102 Active bool
103 Start time.Time
104 Duration time.Duration
105 Idle time.Duration
106 Bytes int64
107 Samples int64
108 InstRate int64
109 CurRate int64
110 AvgRate int64
111 PeakRate int64
112 BytesRem int64
113 TimeRem time.Duration
114 Progress Percent
115 }
116
117
118
119 func (m *Monitor) Status() Status {
120 m.mu.Lock()
121 now := m.update(0)
122 s := Status{
123 Active: m.active,
124 Start: clockToTime(m.start),
125 Duration: m.sLast - m.start,
126 Idle: now - m.tLast,
127 Bytes: m.bytes,
128 Samples: m.samples,
129 PeakRate: round(m.rPeak),
130 BytesRem: m.tBytes - m.bytes,
131 Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
132 }
133 if s.BytesRem < 0 {
134 s.BytesRem = 0
135 }
136 if s.Duration > 0 {
137 rAvg := float64(s.Bytes) / s.Duration.Seconds()
138 s.AvgRate = round(rAvg)
139 if s.Active {
140 s.InstRate = round(m.rSample)
141 s.CurRate = round(m.rEMA)
142 if s.BytesRem > 0 {
143 if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
144 ns := float64(s.BytesRem) / tRate * 1e9
145 if ns > float64(timeRemLimit) {
146 ns = float64(timeRemLimit)
147 }
148 s.TimeRem = clockRound(time.Duration(ns))
149 }
150 }
151 }
152 }
153 m.mu.Unlock()
154 return s
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168 func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
169 if want < 1 || rate < 1 {
170 return want
171 }
172 m.mu.Lock()
173
174
175 limit := round(float64(rate) * m.sRate.Seconds())
176 if limit <= 0 {
177 limit = 1
178 }
179
180
181 if now := m.update(0); block {
182 for m.sBytes >= limit && m.active {
183 now = m.waitNextSample(now)
184 }
185 }
186
187
188 if limit -= m.sBytes; limit > int64(want) || !m.active {
189 limit = int64(want)
190 }
191 m.mu.Unlock()
192
193 if limit < 0 {
194 limit = 0
195 }
196 return int(limit)
197 }
198
199
200
201 func (m *Monitor) SetTransferSize(bytes int64) {
202 if bytes < 0 {
203 bytes = 0
204 }
205 m.mu.Lock()
206 m.tBytes = bytes
207 m.mu.Unlock()
208 }
209
210
211
212
213 func (m *Monitor) update(n int) (now time.Duration) {
214 if !m.active {
215 return
216 }
217 if now = clock(); n > 0 {
218 m.tLast = now
219 }
220 m.sBytes += int64(n)
221 if sTime := now - m.sLast; sTime >= m.sRate {
222 t := sTime.Seconds()
223 if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
224 m.rPeak = m.rSample
225 }
226
227
228
229 if m.samples > 0 {
230 w := math.Exp(-t / m.rWindow)
231 m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
232 } else {
233 m.rEMA = m.rSample
234 }
235 m.reset(now)
236 }
237 return
238 }
239
240
241 func (m *Monitor) reset(sampleTime time.Duration) {
242 m.bytes += m.sBytes
243 m.samples++
244 m.sBytes = 0
245 m.sLast = sampleTime
246 }
247
248
249
250
251 func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
252 const minWait = 5 * time.Millisecond
253 current := m.sLast
254
255
256 for m.sLast == current && m.active {
257 d := current + m.sRate - now
258 m.mu.Unlock()
259 if d < minWait {
260 d = minWait
261 }
262 time.Sleep(d)
263 m.mu.Lock()
264 now = m.update(0)
265 }
266 return now
267 }
268
View as plain text