...

Source file src/k8s.io/kubernetes/pkg/kubelet/eviction/threshold_notifier_linux.go

Documentation: k8s.io/kubernetes/pkg/kubelet/eviction

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package eviction
    18  
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"time"
    23  
    24  	libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
    25  	"golang.org/x/sys/unix"
    26  	"k8s.io/klog/v2"
    27  )
    28  
    29  const (
    30  	// eventSize is the number of bytes returned by a successful read from an eventfd
    31  	// see http://man7.org/linux/man-pages/man2/eventfd.2.html for more information
    32  	eventSize = 8
    33  	// numFdEvents is the number of events we can record at once.
    34  	// If EpollWait finds more than this, they will be missed.
    35  	numFdEvents = 6
    36  )
    37  
    38  type linuxCgroupNotifier struct {
    39  	eventfd  int
    40  	epfd     int
    41  	stop     chan struct{}
    42  	stopLock sync.Mutex
    43  }
    44  
    45  var _ CgroupNotifier = &linuxCgroupNotifier{}
    46  
    47  // NewCgroupNotifier returns a linuxCgroupNotifier, which performs cgroup control operations required
    48  // to receive notifications from the cgroup when the threshold is crossed in either direction.
    49  func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
    50  	// cgroupv2 does not support monitoring cgroup memory thresholds using cgroup.event_control.
    51  	// Instead long term, on cgroupv2 kubelet should rely on combining usage of memory.low on root pods cgroup with inotify notifications on memory.events and or PSI pressure.
    52  	// For now, let's return a fake "disabled" cgroup notifier on cgroupv2.
    53  	// https://github.com/kubernetes/kubernetes/issues/106331
    54  	if libcontainercgroups.IsCgroup2UnifiedMode() {
    55  		return &disabledThresholdNotifier{}, nil
    56  	}
    57  
    58  	var watchfd, eventfd, epfd, controlfd int
    59  	var err error
    60  	watchfd, err = unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY|unix.O_CLOEXEC, 0)
    61  	if err != nil {
    62  		return nil, err
    63  	}
    64  	defer unix.Close(watchfd)
    65  	controlfd, err = unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY|unix.O_CLOEXEC, 0)
    66  	if err != nil {
    67  		return nil, err
    68  	}
    69  	defer unix.Close(controlfd)
    70  	eventfd, err = unix.Eventfd(0, unix.EFD_CLOEXEC)
    71  	if err != nil {
    72  		return nil, err
    73  	}
    74  	if eventfd < 0 {
    75  		err = fmt.Errorf("eventfd call failed")
    76  		return nil, err
    77  	}
    78  	defer func() {
    79  		// Close eventfd if we get an error later in initialization
    80  		if err != nil {
    81  			unix.Close(eventfd)
    82  		}
    83  	}()
    84  	epfd, err = unix.EpollCreate1(unix.EPOLL_CLOEXEC)
    85  	if err != nil {
    86  		return nil, err
    87  	}
    88  	if epfd < 0 {
    89  		err = fmt.Errorf("EpollCreate1 call failed")
    90  		return nil, err
    91  	}
    92  	defer func() {
    93  		// Close epfd if we get an error later in initialization
    94  		if err != nil {
    95  			unix.Close(epfd)
    96  		}
    97  	}()
    98  	config := fmt.Sprintf("%d %d %d", eventfd, watchfd, threshold)
    99  	_, err = unix.Write(controlfd, []byte(config))
   100  	if err != nil {
   101  		return nil, err
   102  	}
   103  	return &linuxCgroupNotifier{
   104  		eventfd: eventfd,
   105  		epfd:    epfd,
   106  		stop:    make(chan struct{}),
   107  	}, nil
   108  }
   109  
   110  func (n *linuxCgroupNotifier) Start(eventCh chan<- struct{}) {
   111  	err := unix.EpollCtl(n.epfd, unix.EPOLL_CTL_ADD, n.eventfd, &unix.EpollEvent{
   112  		Fd:     int32(n.eventfd),
   113  		Events: unix.EPOLLIN,
   114  	})
   115  	if err != nil {
   116  		klog.InfoS("Eviction manager: error adding epoll eventfd", "err", err)
   117  		return
   118  	}
   119  	buf := make([]byte, eventSize)
   120  	for {
   121  		select {
   122  		case <-n.stop:
   123  			return
   124  		default:
   125  		}
   126  		event, err := wait(n.epfd, n.eventfd, notifierRefreshInterval)
   127  		if err != nil {
   128  			klog.InfoS("Eviction manager: error while waiting for memcg events", "err", err)
   129  			return
   130  		} else if !event {
   131  			// Timeout on wait.  This is expected if the threshold was not crossed
   132  			continue
   133  		}
   134  		// Consume the event from the eventfd
   135  		_, err = unix.Read(n.eventfd, buf)
   136  		if err != nil {
   137  			klog.InfoS("Eviction manager: error reading memcg events", "err", err)
   138  			return
   139  		}
   140  		eventCh <- struct{}{}
   141  	}
   142  }
   143  
   144  // wait waits up to notifierRefreshInterval for an event on the Epoll FD for the
   145  // eventfd we are concerned about.  It returns an error if one occurs, and true
   146  // if the consumer should read from the eventfd.
   147  func wait(epfd, eventfd int, timeout time.Duration) (bool, error) {
   148  	events := make([]unix.EpollEvent, numFdEvents+1)
   149  	timeoutMS := int(timeout / time.Millisecond)
   150  	n, err := unix.EpollWait(epfd, events, timeoutMS)
   151  	if n == -1 {
   152  		if err == unix.EINTR {
   153  			// Interrupt, ignore the error
   154  			return false, nil
   155  		}
   156  		return false, err
   157  	}
   158  	if n == 0 {
   159  		// Timeout
   160  		return false, nil
   161  	}
   162  	if n > numFdEvents {
   163  		return false, fmt.Errorf("epoll_wait returned more events than we know what to do with")
   164  	}
   165  	for _, event := range events[:n] {
   166  		if event.Fd == int32(eventfd) {
   167  			if event.Events&unix.EPOLLHUP != 0 || event.Events&unix.EPOLLERR != 0 || event.Events&unix.EPOLLIN != 0 {
   168  				// EPOLLHUP: should not happen, but if it does, treat it as a wakeup.
   169  
   170  				// EPOLLERR: If an error is waiting on the file descriptor, we should pretend
   171  				// something is ready to read, and let unix.Read pick up the error.
   172  
   173  				// EPOLLIN: There is data to read.
   174  				return true, nil
   175  			}
   176  		}
   177  	}
   178  	// An event occurred that we don't care about.
   179  	return false, nil
   180  }
   181  
   182  func (n *linuxCgroupNotifier) Stop() {
   183  	n.stopLock.Lock()
   184  	defer n.stopLock.Unlock()
   185  	select {
   186  	case <-n.stop:
   187  		// the linuxCgroupNotifier is already stopped
   188  		return
   189  	default:
   190  	}
   191  	unix.Close(n.eventfd)
   192  	unix.Close(n.epfd)
   193  	close(n.stop)
   194  }
   195  
   196  // disabledThresholdNotifier is a fake diasbled threshold notifier that performs no-ops.
   197  type disabledThresholdNotifier struct{}
   198  
   199  func (*disabledThresholdNotifier) Start(_ chan<- struct{}) {}
   200  func (*disabledThresholdNotifier) Stop()                   {}
   201  

View as plain text