...

Source file src/github.com/datawire/ambassador/v2/pkg/memory/memory.go

Documentation: github.com/datawire/ambassador/v2/pkg/memory

     1  package memory
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io/ioutil"
     8  	"math"
     9  	"os"
    10  	"sort"
    11  	"strconv"
    12  	"strings"
    13  	"sync"
    14  	"syscall"
    15  	"time"
    16  
    17  	"github.com/datawire/ambassador/v2/pkg/debug"
    18  	"github.com/datawire/dlib/dlog"
    19  )
    20  
    21  // The Watch method will check memory usage every 10 seconds and log it if it jumps more than 10Gi
    22  // up or down. Additionally if memory usage exceeds 50% of the cgroup limit, it will log usage every
    23  // minute. Usage is also unconditionally logged before returning. This function only returns if the
    24  // context is canceled.
    25  func (usage *MemoryUsage) Watch(ctx context.Context) {
    26  	dbg := debug.FromContext(ctx)
    27  	memory := dbg.Value("memory")
    28  	memory.Store(usage.ShortString())
    29  
    30  	ticker := time.NewTicker(10 * time.Second)
    31  	defer ticker.Stop()
    32  
    33  	for {
    34  		select {
    35  		case now := <-ticker.C:
    36  			usage.Refresh(ctx)
    37  			memory.Store(usage.ShortString())
    38  			usage.maybeDo(now, func() {
    39  				dlog.Infoln(ctx, usage.String())
    40  			})
    41  		case <-ctx.Done():
    42  			usage.Refresh(ctx)
    43  			dlog.Infoln(ctx, usage.String())
    44  			return
    45  		}
    46  	}
    47  }
    48  
    49  func (m *MemoryUsage) ShortString() string {
    50  	m.mutex.Lock()
    51  	defer m.mutex.Unlock()
    52  	return fmt.Sprintf("%s of %s (%d%%)", m.usage.String(), m.limit.String(), m.percentUsed())
    53  }
    54  
    55  // Return true if conditions for action are satisifed. We take action if memory has changed more
    56  // than 10Gi since our previous action. We also take action once per minute if usage is greather
    57  // than 50% of our limit.
    58  func (m *MemoryUsage) shouldDo(now time.Time) bool {
    59  	const jump = 10 * 1024 * 1024
    60  	delta := m.previous - m.usage
    61  	if delta >= jump || delta <= -jump {
    62  		return true
    63  	}
    64  
    65  	if m.percentUsed() > 50 && now.Sub(m.lastAction) >= 60*time.Second {
    66  		return true
    67  	}
    68  
    69  	return false
    70  }
    71  
    72  // Do something if warranted.
    73  func (m *MemoryUsage) maybeDo(now time.Time, f func()) {
    74  	m.mutex.Lock()
    75  	if m.shouldDo(now) {
    76  		m.previous = m.usage
    77  		m.lastAction = now
    78  		m.mutex.Unlock()
    79  		f()
    80  	} else {
    81  		m.mutex.Unlock()
    82  	}
    83  }
    84  
    85  // The GetMemoryUsage function returns MemoryUsage info for the entire cgroup.
    86  func GetMemoryUsage(ctx context.Context) *MemoryUsage {
    87  	usage, limit := readUsage(ctx)
    88  	return &MemoryUsage{
    89  		usage:      usage,
    90  		limit:      limit,
    91  		perProcess: readPerProcess(ctx),
    92  
    93  		readUsage:      readUsage,
    94  		readPerProcess: readPerProcess,
    95  	}
    96  }
    97  
    98  // The MemoryUsage struct to holds memory usage and memory limit information about a cgroup.
    99  type MemoryUsage struct {
   100  	usage      memory
   101  	limit      memory
   102  	perProcess map[int]*ProcessUsage
   103  	previous   memory
   104  	lastAction time.Time
   105  
   106  	// these allow mocking for tests
   107  	readUsage      func(context.Context) (memory, memory)
   108  	readPerProcess func(context.Context) map[int]*ProcessUsage
   109  
   110  	// Protects the whole structure
   111  	mutex sync.Mutex
   112  }
   113  
   114  // The ProcessUsage struct holds per process memory usage information.
   115  type ProcessUsage struct {
   116  	Pid     int
   117  	Cmdline []string
   118  	Usage   memory
   119  
   120  	// This is zero if the process is still running. If the process has exited, this counts how many
   121  	// refreshes have happened. We GC after 10 refreshes.
   122  	RefreshesSinceExit int
   123  }
   124  
   125  type memory int64
   126  
   127  // Pretty print memory in gigabytes.
   128  func (m memory) String() string {
   129  	if m == unlimited {
   130  		return "Unlimited"
   131  	} else {
   132  		const GiB = 1024 * 1024 * 1024
   133  		return fmt.Sprintf("%.2fGi", float64(m)/GiB)
   134  	}
   135  }
   136  
   137  // The MemoryUsage.Refresh method updates memory usage information.
   138  func (m *MemoryUsage) Refresh(ctx context.Context) {
   139  	m.mutex.Lock()
   140  	defer m.mutex.Unlock()
   141  
   142  	usage, limit := m.readUsage(ctx)
   143  	m.usage = usage
   144  	m.limit = limit
   145  
   146  	// GC process memory info that has been around for more than 10 refreshes.
   147  	for pid, usage := range m.perProcess {
   148  		if usage.RefreshesSinceExit > 10 {
   149  			// It's old, let's delete it.
   150  			delete(m.perProcess, pid)
   151  		} else {
   152  
   153  			// Increment the count in case the process has exited. If the process is still running,
   154  			// this whole entry will get overwritted with a new one in the loop that follows this
   155  			// one.
   156  			usage.RefreshesSinceExit += 1
   157  		}
   158  	}
   159  
   160  	for pid, usage := range m.readPerProcess(ctx) {
   161  		// Overwrite any old process info with new/updated process info.
   162  		m.perProcess[pid] = usage
   163  	}
   164  }
   165  
   166  // If there is no cgroups memory limit then the value in
   167  // /sys/fs/cgroup/memory/memory.limit_in_bytes will be math.MaxInt64 rounded down to
   168  // the nearest pagesize. We calculate this number so we can detect if there is no memory limit.
   169  var unlimited memory = (memory(math.MaxInt64) / memory(os.Getpagesize())) * memory(os.Getpagesize())
   170  
   171  // Pretty print a summary of memory usage suitable for logging.
   172  func (m *MemoryUsage) String() string {
   173  	m.mutex.Lock()
   174  	defer m.mutex.Unlock()
   175  
   176  	var msg strings.Builder
   177  	if m.limit == unlimited {
   178  		msg.WriteString(fmt.Sprintf("Memory Usage %s", m.usage.String()))
   179  	} else {
   180  		msg.WriteString(fmt.Sprintf("Memory Usage %s (%d%%)", m.usage.String(), m.percentUsed()))
   181  	}
   182  
   183  	pids := make([]int, 0, len(m.perProcess))
   184  	for pid := range m.perProcess {
   185  		pids = append(pids, pid)
   186  	}
   187  
   188  	sort.Ints(pids)
   189  
   190  	for _, pid := range pids {
   191  		usage := m.perProcess[pid]
   192  		msg.WriteString("\n  ")
   193  		msg.WriteString(usage.String())
   194  	}
   195  
   196  	return msg.String()
   197  }
   198  
   199  // Pretty print a summary of process memory usage.
   200  func (pu ProcessUsage) String() string {
   201  	status := ""
   202  	if pu.RefreshesSinceExit > 0 {
   203  		status = " (exited)"
   204  	}
   205  	return fmt.Sprintf("  PID %d, %s%s: %s", pu.Pid, pu.Usage.String(), status, strings.Join(pu.Cmdline, " "))
   206  }
   207  
   208  // The MemoryUsage.PercentUsed method returns memory usage as a percentage of memory limit.
   209  func (m *MemoryUsage) PercentUsed() int {
   210  	m.mutex.Lock()
   211  	defer m.mutex.Unlock()
   212  	return m.percentUsed()
   213  }
   214  
   215  // This the same as PercentUsed() but not protected by a lock so we can use it form places where we
   216  // already have the lock.
   217  func (m *MemoryUsage) percentUsed() int {
   218  	return int(float64(m.usage) / float64(m.limit) * 100)
   219  }
   220  
   221  // The GetCmdline helper returns the command line for a pid. If the pid does not exist or we don't
   222  // have access to read /proc/<pid>/cmdline, then it returns the empty string.
   223  func GetCmdline(ctx context.Context, pid int) []string {
   224  	bytes, err := ioutil.ReadFile(fmt.Sprintf("/proc/%d/cmdline", pid))
   225  	if err != nil {
   226  		if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) {
   227  			// Don't complain if we don't have permission or the info doesn't exist.
   228  			return nil
   229  		}
   230  		dlog.Errorf(ctx, "couldn't access cmdline for %d: %v", pid, err)
   231  		return nil
   232  	}
   233  	return strings.Split(strings.TrimSuffix(string(bytes), "\n"), "\x00")
   234  }
   235  
   236  // Helper to read the usage and limit for the cgroup.
   237  func readUsage(ctx context.Context) (memory, memory) {
   238  	limit, err := readMemory("/sys/fs/cgroup/memory/memory.limit_in_bytes")
   239  	if err != nil {
   240  		if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) {
   241  			// Don't complain if we don't have permission or the info doesn't exist.
   242  			return 0, unlimited
   243  		}
   244  		dlog.Errorf(ctx, "couldn't access memory limit: %v", err)
   245  		return 0, unlimited
   246  	}
   247  
   248  	stats, err := readMemoryStat("/sys/fs/cgroup/memory/memory.stat")
   249  	if err != nil {
   250  		if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) {
   251  			// Don't complain if we don't have permission or the info doesn't exist.
   252  			return 0, limit
   253  		}
   254  		dlog.Errorf(ctx, "couldn't access memory usage: %v", err)
   255  		return 0, limit
   256  	}
   257  
   258  	// We calculate memory usage according to the OOMKiller as (rss + cache + swap) - inactive_file.
   259  	// This is substantiated by this article[1] which claims we need to track container_memory_working_set_bytes.
   260  	// According to this stack overflow[2], container_memory_working_set_bytes is "total usage" - "inactive file".
   261  	// Best as I can tell from the cgroup docs[3], "total usage" is computed from memory.stat by
   262  	// adding (rss + cache + swap), and "inactive file" is just the inactive_file field.
   263  	//
   264  	// [1]: https://faun.pub/how-much-is-too-much-the-linux-oomkiller-and-used-memory-d32186f29c9d
   265  	// [2]: https://stackoverflow.com/questions/65428558/what-is-the-difference-between-container-memory-working-set-bytes-and-contain
   266  	// [3]: https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
   267  
   268  	totalUsage := stats.Rss + stats.Cache + stats.Swap
   269  	OOMUsage := totalUsage - stats.InactiveFile
   270  	return memory(OOMUsage), limit
   271  }
   272  
   273  // Read an int64 from a file and convert it to memory.
   274  func readMemory(fpath string) (memory, error) {
   275  	contentAsB, err := ioutil.ReadFile(fpath)
   276  	if err != nil {
   277  		return 0, err
   278  	}
   279  	contentAsStr := strings.TrimSuffix(string(contentAsB), "\n")
   280  	m, err := strconv.ParseInt(contentAsStr, 10, 64)
   281  	return memory(m), err
   282  }
   283  
   284  // The readPerProcess helper returns a map containing memory usage used for each process in the cgroup.
   285  func readPerProcess(ctx context.Context) map[int]*ProcessUsage {
   286  	result := map[int]*ProcessUsage{}
   287  
   288  	files, err := ioutil.ReadDir("/proc")
   289  	if err != nil {
   290  		dlog.Errorf(ctx, "could not access memory info: %v", err)
   291  		return nil
   292  	}
   293  
   294  	for _, file := range files {
   295  		if !file.IsDir() {
   296  			continue
   297  		}
   298  
   299  		pid, err := strconv.Atoi(file.Name())
   300  		if err != nil {
   301  			continue
   302  		}
   303  
   304  		bytes, err := ioutil.ReadFile(fmt.Sprintf("/proc/%d/smaps_rollup", pid))
   305  		if err != nil {
   306  			if errors.Is(err, os.ErrPermission) || errors.Is(err, os.ErrNotExist) || errors.Is(err, syscall.ESRCH) {
   307  				// Don't complain if we don't have permission or the info doesn't exist.
   308  				continue
   309  			}
   310  			dlog.Errorf(ctx, "couldn't access usage for %d: %v", pid, err)
   311  			continue
   312  		}
   313  
   314  		parts := strings.Fields(string(bytes))
   315  		rssStr := ""
   316  		for idx, field := range parts {
   317  			if field == "Rss:" {
   318  				rssStr = parts[idx+1]
   319  			}
   320  		}
   321  		if rssStr == "" {
   322  			continue
   323  		}
   324  		rss, err := strconv.ParseUint(rssStr, 10, 64)
   325  		if err != nil {
   326  			dlog.Errorf(ctx, "couldn't parse %s: %v", rssStr, err)
   327  			continue
   328  		}
   329  		rss = rss * 1024
   330  		result[pid] = &ProcessUsage{pid, GetCmdline(ctx, pid), memory(rss), 0}
   331  	}
   332  
   333  	return result
   334  }
   335  
   336  type memoryStat struct {
   337  	Rss          uint64 // rss field
   338  	Cache        uint64 // cache field
   339  	Swap         uint64 // swap field
   340  	InactiveFile uint64 // inactive_file field
   341  }
   342  
   343  func readMemoryStat(fpath string) (memoryStat, error) {
   344  	bytes, err := ioutil.ReadFile(fpath)
   345  	if err != nil {
   346  		return memoryStat{}, err
   347  	}
   348  
   349  	return parseMemoryStat(string(bytes))
   350  }
   351  
   352  func parseMemoryStat(content string) (memoryStat, error) {
   353  	result := memoryStat{}
   354  	lines := strings.Split(content, "\n")
   355  	for _, line := range lines {
   356  		line = strings.TrimSuffix(line, "\n")
   357  		parts := strings.Fields(line)
   358  		if len(parts) != 2 {
   359  			continue
   360  		}
   361  
   362  		n, err := strconv.ParseUint(parts[1], 10, 64)
   363  		if err != nil {
   364  			return result, err
   365  		}
   366  
   367  		switch parts[0] {
   368  		case "rss":
   369  			result.Rss = n
   370  		case "swap":
   371  			result.Swap = n
   372  		case "cache":
   373  			result.Cache = n
   374  		case "inactive_file":
   375  			result.InactiveFile = n
   376  		}
   377  	}
   378  	return result, nil
   379  }
   380  

View as plain text