1 package agent
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "path/filepath"
8 "strings"
9
10 "edge-infra.dev/pkg/lib/kernel/devices"
11 cc "edge-infra.dev/pkg/sds/devices/agent/common"
12 "edge-infra.dev/pkg/sds/devices/class"
13 dsv1 "edge-infra.dev/pkg/sds/devices/k8s/apis/v1"
14
15 "github.com/containerd/containerd/containers"
16 typeurl "github.com/containerd/typeurl/v2"
17 specs "github.com/opencontainers/runtime-spec/specs-go"
18
19 "edge-infra.dev/pkg/sds/devices/agent/cgroups"
20 "edge-infra.dev/pkg/sds/devices/agent/common"
21 "edge-infra.dev/pkg/sds/devices/logger"
22 )
23
24 var (
25
26 ErrMissingContainerID = errors.New("missing container id")
27 )
28
29 const (
30 annContainerName = "io.kubernetes.container.name"
31 annPodName = "io.kubernetes.pod.name"
32 annPodNamespace = "io.kubernetes.pod.namespace"
33 burstableQos = "kubepods-burstable"
34 bestEffortQos = "kubepods-besteffort"
35 )
36
37 var cgroupRequestFn = cgroups.NewCgroupRequest
38
39
40 func ApplyCgroupsToContainer(ctx context.Context, requestID string, ctr *containers.Container, classes map[string]*dsv1.DeviceClass) {
41 log := logger.FromContext(ctx)
42
43 ctrName := ctr.Labels[annContainerName]
44 podName := ctr.Labels[annPodName]
45 podNamespace := ctr.Labels[annPodNamespace]
46
47 ctrLog := log.With("requestId", requestID, "pod", podName, "namespace", podNamespace, "containerId", ctr.ID, "container", ctrName)
48
49 spec := &specs.Spec{}
50 if err := typeurl.UnmarshalTo(ctr.Spec, spec); err != nil {
51 ctrLog.Error("error unmarshalling container spec", "error", err)
52 return
53 }
54
55 cgroupPath, err := containerCGroupPath(spec.Linux.CgroupsPath)
56 if err != nil {
57 ctrLog.Error("invalid cgroup path", "error", err, "path", spec.Linux.CgroupsPath)
58 return
59 }
60
61
62 requestedDevices := map[string]devices.Device{}
63 requestedClasses := []string{}
64 for className := range ctr.Labels {
65 if _, ok := classes[className]; !ok {
66 continue
67 }
68 requestedClasses = append(requestedClasses, className)
69 for _, dev := range classes[className].DeviceIter() {
70 requestedDevices[dev.Path()] = dev
71 }
72 }
73
74 isContainerVM := isContainerVirtualMachine(ctr)
75 ctrCtx := withContainerLogger(ctx, requestID, ctr, requestedClasses, fullCgroupPath(cgroupPath))
76 cgroupRequestFn(ctrName, ctr.ID, podNamespace, cgroupPath, requestedDevices, isContainerVM).Apply(ctrCtx)
77 }
78
79
80 func isContainerVirtualMachine(ctr *containers.Container) bool {
81 isComputeContainer := ctr.Labels[cc.AnnContainerName] == "compute"
82 requestsKVMResource := false
83 for k := range ctr.Labels {
84 if !class.IsDeviceClass(k) {
85 continue
86 }
87 if k == class.FmtClassLabel("kvm") {
88 requestsKVMResource = true
89 }
90 }
91 return isComputeContainer && requestsKVMResource
92 }
93
94
95
96 func withContainerLogger(ctx context.Context, requestID string, ctr *containers.Container, requestedClasses []string, cgroupPath string) context.Context {
97 logLevel := ctr.Labels[common.AnnDeviceLogLevel]
98 ctrName := ctr.Labels[annContainerName]
99 podName := ctr.Labels[annPodName]
100 podNamespace := ctr.Labels[annPodNamespace]
101 opts := []logger.Option{
102 logger.WithLevel(logger.ToLevel(logLevel)),
103 }
104 return logger.IntoContext(ctx, logger.New(opts...).WithGroup(ctrName).With("requestId", requestID, "container", ctrName, "containerId", ctr.ID, "pod", podName, "namespace", podNamespace, "requestedClasses", requestedClasses, "cgroupPath", cgroupPath))
105 }
106
107
108 func containerCGroupPath(cgroupPath string) (string, error) {
109 splitPath := strings.Split(cgroupPath, ":")
110 if len(splitPath) != 3 {
111 return "", ErrMissingContainerID
112 }
113 qos := burstableQos
114 if strings.Contains(cgroupPath, bestEffortQos) {
115 qos = bestEffortQos
116 }
117 cgroupPathFmt := "%s.slice/%s/%s-%s.scope"
118 return fmt.Sprintf(cgroupPathFmt, qos, splitPath[0], splitPath[1], splitPath[2]), nil
119 }
120
121
122 func fullCgroupPath(cgroupPath string) string {
123 return filepath.Join("/sys/fs/cgroup/kubepods.slice/", cgroupPath)
124 }
125
View as plain text