...

Source file src/edge-infra.dev/pkg/sds/devices/agent/containers/containers.go

Documentation: edge-infra.dev/pkg/sds/devices/agent/containers

     1  package containers
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strconv"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/containerd/containerd"
    12  	"github.com/containerd/containerd/api/services/tasks/v1"
    13  	"github.com/containerd/containerd/containers"
    14  	"k8s.io/apimachinery/pkg/util/wait"
    15  
    16  	criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
    17  
    18  	"edge-infra.dev/pkg/sds/devices/class"
    19  
    20  	cc "edge-infra.dev/pkg/sds/devices/agent/common"
    21  	"edge-infra.dev/pkg/sds/devices/logger"
    22  )
    23  
    24  // runtime states for containers that are not running
    25  var notRunningStates = map[criruntime.ContainerState]string{
    26  	criruntime.ContainerState_CONTAINER_EXITED: "",
    27  }
    28  
    29  // exponential wait 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1280ms
    30  var defaultBackoff = wait.Backoff{
    31  	Steps:    9,
    32  	Duration: 10 * time.Millisecond,
    33  	Factor:   2,
    34  	Jitter:   0.1,
    35  	Cap:      time.Millisecond * 2560,
    36  }
    37  
    38  var (
    39  	// ErrPodSandboxDoesNotExist is thrown if the pod sandbox expected
    40  	// for a container does not exist
    41  	ErrPodSandboxDoesNotExist = errors.New("pod sandbox does not exist")
    42  	// namespaces to ignore from cgroup application
    43  	ignoreNamespaces = map[string]string{"kube-system": "", "device-system": ""}
    44  	// containers to ignore from cgroup application
    45  	ignoreContainers = map[string]string{"etcd": "", "linkerd-init": "", "linkerd-proxy": ""}
    46  )
    47  
    48  const (
    49  	annContainerName = "io.kubernetes.container.name"
    50  	annPodName       = "io.kubernetes.pod.name"
    51  	annPodNamespace  = "io.kubernetes.pod.namespace"
    52  )
    53  
    54  // FetchAllContainers devices will return a list of containers that request a device class
    55  func FetchAllContainers(ctx context.Context, ctrClient *containerd.Client, runtimeClient criruntime.RuntimeServiceClient) (map[string]*containers.Container, error) {
    56  	log := logger.FromContext(ctx)
    57  	containerService := ctrClient.ContainerService()
    58  	containersStore, err := containerService.List(ctx)
    59  	if err != nil {
    60  		return nil, err
    61  	}
    62  
    63  	containers := map[string]*containers.Container{}
    64  	for _, ctr := range containersStore {
    65  		running, err := containerIsRunning(ctx, runtimeClient, ctr.ID)
    66  		if err != nil {
    67  			log.Error("error checking container state", "ctrID", ctr.ID, "error", err)
    68  			continue
    69  		}
    70  		if !running {
    71  			continue
    72  		}
    73  
    74  		requestCtr, err := addContainerDeviceRequests(ctx, runtimeClient, &ctr)
    75  		if err != nil {
    76  			log.Error("error fetching device requests", "ctrID", ctr.ID, "error", err)
    77  			continue
    78  		}
    79  		if requestCtr == nil {
    80  			continue
    81  		}
    82  		containers[requestCtr.ID] = requestCtr
    83  	}
    84  	return containers, nil
    85  }
    86  
    87  // FetchContainer will attempt to fetch a container given the container ID and populate it
    88  // with the device class request labels from the pod sandbox
    89  func FetchContainer(ctx context.Context, ctrClient *containerd.Client, runtimeClient criruntime.RuntimeServiceClient, ctrID string) (*containers.Container, error) {
    90  	if running, err := containerIsRunning(ctx, runtimeClient, ctrID); err != nil || !running {
    91  		return nil, err
    92  	}
    93  
    94  	ctr, err := getContainer(ctx, ctrClient, ctrID)
    95  	if err != nil {
    96  		return nil, err
    97  	}
    98  
    99  	requestCtr, err := addContainerDeviceRequests(ctx, runtimeClient, &ctr)
   100  	if err != nil {
   101  		return nil, err
   102  	}
   103  	if requestCtr == nil {
   104  		return nil, nil
   105  	}
   106  	return requestCtr, nil
   107  }
   108  
   109  // FetchContainerParentProcessID will fetch the parent process id for a given container
   110  func FetchContainerParentProcessID(ctx context.Context, ctrClient *containerd.Client, ctrID string) (string, error) {
   111  	taskResult, err := ctrClient.TaskService().ListPids(ctx, &tasks.ListPidsRequest{
   112  		ContainerID: ctrID,
   113  	})
   114  	if err != nil {
   115  		return "", fmt.Errorf("could not find container process ids: %w", err)
   116  	}
   117  	if len(taskResult.Processes) == 0 {
   118  		return "", fmt.Errorf("could not find container process ids")
   119  	}
   120  	return strconv.Itoa(int(taskResult.Processes[0].Pid)), nil
   121  }
   122  
   123  // FetchContainerRootPath fetches the containers root path
   124  func FetchContainerRootPath(ctx context.Context, ctrClient *containerd.Client, ctr *containers.Container) (string, error) {
   125  	pid, err := FetchContainerParentProcessID(ctx, ctrClient, ctr.ID)
   126  	if err != nil {
   127  		return "", fmt.Errorf("could not find container %s process id: %w", ctr.ID, err)
   128  	}
   129  	return fmt.Sprintf("/proc/%s/root", pid), nil
   130  }
   131  
   132  // WithContainerLogger returns a device logger instantiatd with container information in the log
   133  func WithContainerLogger(ctx context.Context, ctr *containers.Container) context.Context {
   134  	logLevel := ctr.Labels[cc.AnnDeviceLogLevel]
   135  	ctrName := ctr.Labels[cc.AnnContainerName]
   136  	podName := ctr.Labels[annPodName]
   137  	podNamespace := ctr.Labels[annPodNamespace]
   138  	opts := []logger.Option{
   139  		logger.WithLevel(logger.ToLevel(logLevel)),
   140  	}
   141  	return logger.IntoContext(ctx, logger.New(opts...).WithGroup(ctrName).With("container", ctrName, "containerId", ctr.ID, "pod", podName, "namespace", podNamespace))
   142  }
   143  
   144  // containerIsRunning will check that the container is running. If container is not found, the client will retry exponentially.
   145  func containerIsRunning(ctx context.Context, runtimeClient criruntime.RuntimeServiceClient, ctrID string) (bool, error) {
   146  	var state criruntime.ContainerState
   147  	var lastErr error
   148  
   149  	// ignore container ids that are pod sandbox
   150  	podSandbox, err := runtimeClient.PodSandboxStatus(ctx, &criruntime.PodSandboxStatusRequest{PodSandboxId: ctrID})
   151  	if err == nil && podSandbox != nil {
   152  		return false, nil
   153  	}
   154  
   155  	if err := wait.ExponentialBackoffWithContext(ctx, defaultBackoff, func(ctx context.Context) (done bool, err error) {
   156  		status, err := runtimeClient.ContainerStatus(ctx, &criruntime.ContainerStatusRequest{ContainerId: ctrID})
   157  		if err != nil {
   158  			lastErr = err
   159  			return false, nil
   160  		}
   161  		state = status.Status.State
   162  		return true, nil
   163  	}); err != nil {
   164  		return false, fmt.Errorf("error fetching container runtime state: %w, %w", err, lastErr)
   165  	}
   166  	_, ok := notRunningStates[state]
   167  	if ok {
   168  		return false, nil
   169  	}
   170  	return true, nil
   171  }
   172  
   173  // getContainer does a get request for a container and attempts exponential backoff if the container is not found.
   174  func getContainer(ctx context.Context, ctrClient *containerd.Client, ctrID string) (containers.Container, error) {
   175  	var container containers.Container
   176  	var lastError error
   177  	containerService := ctrClient.ContainerService()
   178  	if err := wait.ExponentialBackoffWithContext(ctx, defaultBackoff, func(_ context.Context) (done bool, err error) {
   179  		container, err = containerService.Get(ctx, ctrID)
   180  		if err != nil {
   181  			lastError = err
   182  			return false, nil
   183  		}
   184  		return true, nil
   185  	}); err != nil {
   186  		return container, fmt.Errorf("could not fetch container %w: %s: %w", err, ctrID, lastError)
   187  	}
   188  	return container, nil
   189  }
   190  
   191  // addContainerDeviceRequests will add the containers device request by doing a lookup on the pod sandbox from container cri
   192  func addContainerDeviceRequests(ctx context.Context, runtimeClient criruntime.RuntimeServiceClient, ctr *containers.Container) (*containers.Container, error) {
   193  	ctrName := ctr.Labels[cc.AnnContainerName]
   194  	podName := ctr.Labels[cc.AnnPodName]
   195  	podNamespace := ctr.Labels[cc.AnnPodNamespace]
   196  
   197  	if _, ok := ignoreContainers[ctrName]; ok {
   198  		return nil, nil
   199  	}
   200  
   201  	if _, ok := ignoreNamespaces[podNamespace]; ok {
   202  		return nil, nil
   203  	}
   204  
   205  	if ctrName == "" || podName == "" {
   206  		return nil, fmt.Errorf("missing pod or container name, container: %s, pod: %s", ctrName, podName)
   207  	}
   208  
   209  	podSandboxList, err := runtimeClient.ListPodSandbox(ctx, &criruntime.ListPodSandboxRequest{
   210  		Filter: &criruntime.PodSandboxFilter{
   211  			LabelSelector: map[string]string{
   212  				cc.AnnPodName: podName,
   213  			},
   214  		}},
   215  	)
   216  	if err != nil {
   217  		return nil, err
   218  	}
   219  	if podSandboxList == nil || len(podSandboxList.Items) == 0 {
   220  		return nil, ErrPodSandboxDoesNotExist
   221  	}
   222  
   223  	pod := podSandboxList.Items[0]
   224  	requestsDevice := false
   225  	for key, value := range pod.Annotations {
   226  		if !class.IsDeviceClass(key) {
   227  			continue
   228  		}
   229  
   230  		classes := strings.Split(value, ",")
   231  		parsedCtrName, err := parseContainerName(key)
   232  		if err != nil {
   233  			return nil, err
   234  		}
   235  
   236  		if parsedCtrName != ctrName {
   237  			continue
   238  		}
   239  
   240  		for _, className := range classes {
   241  			if className == "" {
   242  				continue
   243  			}
   244  
   245  			requestsDevice = true
   246  			ctr.Labels[class.FmtClassLabel(className)] = "requested"
   247  		}
   248  	}
   249  
   250  	// ignore containers that do no request devices
   251  	if !requestsDevice {
   252  		return nil, nil
   253  	}
   254  
   255  	ctr.Labels[cc.AnnDeviceLogLevel] = "info"
   256  	logLevel, ok := pod.Annotations[cc.AnnDeviceLogLevel]
   257  	if ok {
   258  		ctr.Labels[cc.AnnDeviceLogLevel] = logLevel
   259  	}
   260  	return ctr, nil
   261  }
   262  
   263  // parseContainerName will return the container name from the device class annotation
   264  func parseContainerName(className string) (string, error) {
   265  	return class.BaseName(className)
   266  }
   267  

View as plain text