1 package memory
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io/ioutil"
8 "math"
9 "os"
10 "sort"
11 "strconv"
12 "strings"
13 "sync"
14 "syscall"
15 "time"
16
17 "github.com/datawire/ambassador/v2/pkg/debug"
18 "github.com/datawire/dlib/dlog"
19 )
20
21
22
23
24
25 func (usage *MemoryUsage) Watch(ctx context.Context) {
26 dbg := debug.FromContext(ctx)
27 memory := dbg.Value("memory")
28 memory.Store(usage.ShortString())
29
30 ticker := time.NewTicker(10 * time.Second)
31 defer ticker.Stop()
32
33 for {
34 select {
35 case now := <-ticker.C:
36 usage.Refresh(ctx)
37 memory.Store(usage.ShortString())
38 usage.maybeDo(now, func() {
39 dlog.Infoln(ctx, usage.String())
40 })
41 case <-ctx.Done():
42 usage.Refresh(ctx)
43 dlog.Infoln(ctx, usage.String())
44 return
45 }
46 }
47 }
48
49 func (m *MemoryUsage) ShortString() string {
50 m.mutex.Lock()
51 defer m.mutex.Unlock()
52 return fmt.Sprintf("%s of %s (%d%%)", m.usage.String(), m.limit.String(), m.percentUsed())
53 }
54
55
56
57
58 func (m *MemoryUsage) shouldDo(now time.Time) bool {
59 const jump = 10 * 1024 * 1024
60 delta := m.previous - m.usage
61 if delta >= jump || delta <= -jump {
62 return true
63 }
64
65 if m.percentUsed() > 50 && now.Sub(m.lastAction) >= 60*time.Second {
66 return true
67 }
68
69 return false
70 }
71
72
73 func (m *MemoryUsage) maybeDo(now time.Time, f func()) {
74 m.mutex.Lock()
75 if m.shouldDo(now) {
76 m.previous = m.usage
77 m.lastAction = now
78 m.mutex.Unlock()
79 f()
80 } else {
81 m.mutex.Unlock()
82 }
83 }
84
85
86 func GetMemoryUsage(ctx context.Context) *MemoryUsage {
87 usage, limit := readUsage(ctx)
88 return &MemoryUsage{
89 usage: usage,
90 limit: limit,
91 perProcess: readPerProcess(ctx),
92
93 readUsage: readUsage,
94 readPerProcess: readPerProcess,
95 }
96 }
97
98
99 type MemoryUsage struct {
100 usage memory
101 limit memory
102 perProcess map[int]*ProcessUsage
103 previous memory
104 lastAction time.Time
105
106
107 readUsage func(context.Context) (memory, memory)
108 readPerProcess func(context.Context) map[int]*ProcessUsage
109
110
111 mutex sync.Mutex
112 }
113
114
115 type ProcessUsage struct {
116 Pid int
117 Cmdline []string
118 Usage memory
119
120
121
122 RefreshesSinceExit int
123 }
124
125 type memory int64
126
127
128 func (m memory) String() string {
129 if m == unlimited {
130 return "Unlimited"
131 } else {
132 const GiB = 1024 * 1024 * 1024
133 return fmt.Sprintf("%.2fGi", float64(m)/GiB)
134 }
135 }
136
137
138 func (m *MemoryUsage) Refresh(ctx context.Context) {
139 m.mutex.Lock()
140 defer m.mutex.Unlock()
141
142 usage, limit := m.readUsage(ctx)
143 m.usage = usage
144 m.limit = limit
145
146
147 for pid, usage := range m.perProcess {
148 if usage.RefreshesSinceExit > 10 {
149
150 delete(m.perProcess, pid)
151 } else {
152
153
154
155
156 usage.RefreshesSinceExit += 1
157 }
158 }
159
160 for pid, usage := range m.readPerProcess(ctx) {
161
162 m.perProcess[pid] = usage
163 }
164 }
165
166
167
168
169 var unlimited memory = (memory(math.MaxInt64) / memory(os.Getpagesize())) * memory(os.Getpagesize())
170
171
172 func (m *MemoryUsage) String() string {
173 m.mutex.Lock()
174 defer m.mutex.Unlock()
175
176 var msg strings.Builder
177 if m.limit == unlimited {
178 msg.WriteString(fmt.Sprintf("Memory Usage %s", m.usage.String()))
179 } else {
180 msg.WriteString(fmt.Sprintf("Memory Usage %s (%d%%)", m.usage.String(), m.percentUsed()))
181 }
182
183 pids := make([]int, 0, len(m.perProcess))
184 for pid := range m.perProcess {
185 pids = append(pids, pid)
186 }
187
188 sort.Ints(pids)
189
190 for _, pid := range pids {
191 usage := m.perProcess[pid]
192 msg.WriteString("\n ")
193 msg.WriteString(usage.String())
194 }
195
196 return msg.String()
197 }
198
199
200 func (pu ProcessUsage) String() string {
201 status := ""
202 if pu.RefreshesSinceExit > 0 {
203 status = " (exited)"
204 }
205 return fmt.Sprintf(" PID %d, %s%s: %s", pu.Pid, pu.Usage.String(), status, strings.Join(pu.Cmdline, " "))
206 }
207
208
209 func (m *MemoryUsage) PercentUsed() int {
210 m.mutex.Lock()
211 defer m.mutex.Unlock()
212 return m.percentUsed()
213 }
214
215
216
217 func (m *MemoryUsage) percentUsed() int {
218 return int(float64(m.usage) / float64(m.limit) * 100)
219 }
220
221
222
223 func GetCmdline(ctx context.Context, pid int) []string {
224 bytes, err := ioutil.ReadFile(fmt.Sprintf("/proc/%d/cmdline", pid))
225 if err != nil {
226 if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) {
227
228 return nil
229 }
230 dlog.Errorf(ctx, "couldn't access cmdline for %d: %v", pid, err)
231 return nil
232 }
233 return strings.Split(strings.TrimSuffix(string(bytes), "\n"), "\x00")
234 }
235
236
237 func readUsage(ctx context.Context) (memory, memory) {
238 limit, err := readMemory("/sys/fs/cgroup/memory/memory.limit_in_bytes")
239 if err != nil {
240 if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) {
241
242 return 0, unlimited
243 }
244 dlog.Errorf(ctx, "couldn't access memory limit: %v", err)
245 return 0, unlimited
246 }
247
248 stats, err := readMemoryStat("/sys/fs/cgroup/memory/memory.stat")
249 if err != nil {
250 if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) {
251
252 return 0, limit
253 }
254 dlog.Errorf(ctx, "couldn't access memory usage: %v", err)
255 return 0, limit
256 }
257
258
259
260
261
262
263
264
265
266
267
268 totalUsage := stats.Rss + stats.Cache + stats.Swap
269 OOMUsage := totalUsage - stats.InactiveFile
270 return memory(OOMUsage), limit
271 }
272
273
274 func readMemory(fpath string) (memory, error) {
275 contentAsB, err := ioutil.ReadFile(fpath)
276 if err != nil {
277 return 0, err
278 }
279 contentAsStr := strings.TrimSuffix(string(contentAsB), "\n")
280 m, err := strconv.ParseInt(contentAsStr, 10, 64)
281 return memory(m), err
282 }
283
284
285 func readPerProcess(ctx context.Context) map[int]*ProcessUsage {
286 result := map[int]*ProcessUsage{}
287
288 files, err := ioutil.ReadDir("/proc")
289 if err != nil {
290 dlog.Errorf(ctx, "could not access memory info: %v", err)
291 return nil
292 }
293
294 for _, file := range files {
295 if !file.IsDir() {
296 continue
297 }
298
299 pid, err := strconv.Atoi(file.Name())
300 if err != nil {
301 continue
302 }
303
304 bytes, err := ioutil.ReadFile(fmt.Sprintf("/proc/%d/smaps_rollup", pid))
305 if err != nil {
306 if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) || errors.Is(err, syscall.ESRCH) {
307
308 continue
309 }
310 dlog.Errorf(ctx, "couldn't access usage for %d: %v", pid, err)
311 continue
312 }
313
314 parts := strings.Fields(string(bytes))
315 rssStr := ""
316 for idx, field := range parts {
317 if field == "Rss:" {
318 rssStr = parts[idx+1]
319 }
320 }
321 if rssStr == "" {
322 continue
323 }
324 rss, err := strconv.ParseUint(rssStr, 10, 64)
325 if err != nil {
326 dlog.Errorf(ctx, "couldn't parse %s: %v", rssStr, err)
327 continue
328 }
329 rss = rss * 1024
330 result[pid] = &ProcessUsage{pid, GetCmdline(ctx, pid), memory(rss), 0}
331 }
332
333 return result
334 }
335
336 type memoryStat struct {
337 Rss uint64
338 Cache uint64
339 Swap uint64
340 InactiveFile uint64
341 }
342
343 func readMemoryStat(fpath string) (memoryStat, error) {
344 bytes, err := ioutil.ReadFile(fpath)
345 if err != nil {
346 return memoryStat{}, err
347 }
348
349 return parseMemoryStat(string(bytes))
350 }
351
352 func parseMemoryStat(content string) (memoryStat, error) {
353 result := memoryStat{}
354 lines := strings.Split(content, "\n")
355 for _, line := range lines {
356 line = strings.TrimSuffix(line, "\n")
357 parts := strings.Fields(line)
358 if len(parts) != 2 {
359 continue
360 }
361
362 n, err := strconv.ParseUint(parts[1], 10, 64)
363 if err != nil {
364 return result, err
365 }
366
367 switch parts[0] {
368 case "rss":
369 result.Rss = n
370 case "swap":
371 result.Swap = n
372 case "cache":
373 result.Cache = n
374 case "inactive_file":
375 result.InactiveFile = n
376 }
377 }
378 return result, nil
379 }
380
View as plain text