1 package containers
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "strconv"
8 "strings"
9 "time"
10
11 "github.com/containerd/containerd"
12 "github.com/containerd/containerd/api/services/tasks/v1"
13 "github.com/containerd/containerd/containers"
14 "k8s.io/apimachinery/pkg/util/wait"
15
16 criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
17
18 "edge-infra.dev/pkg/sds/devices/class"
19
20 cc "edge-infra.dev/pkg/sds/devices/agent/common"
21 "edge-infra.dev/pkg/sds/devices/logger"
22 )
23
24
25 var notRunningStates = map[criruntime.ContainerState]string{
26 criruntime.ContainerState_CONTAINER_EXITED: "",
27 }
28
29
30 var defaultBackoff = wait.Backoff{
31 Steps: 9,
32 Duration: 10 * time.Millisecond,
33 Factor: 2,
34 Jitter: 0.1,
35 Cap: time.Millisecond * 2560,
36 }
37
38 var (
39
40
41 ErrPodSandboxDoesNotExist = errors.New("pod sandbox does not exist")
42
43 ignoreNamespaces = map[string]string{"kube-system": "", "device-system": ""}
44
45 ignoreContainers = map[string]string{"etcd": "", "linkerd-init": "", "linkerd-proxy": ""}
46 )
47
48 const (
49 annContainerName = "io.kubernetes.container.name"
50 annPodName = "io.kubernetes.pod.name"
51 annPodNamespace = "io.kubernetes.pod.namespace"
52 )
53
54
55 func FetchAllContainers(ctx context.Context, ctrClient *containerd.Client, runtimeClient criruntime.RuntimeServiceClient) (map[string]*containers.Container, error) {
56 log := logger.FromContext(ctx)
57 containerService := ctrClient.ContainerService()
58 containersStore, err := containerService.List(ctx)
59 if err != nil {
60 return nil, err
61 }
62
63 containers := map[string]*containers.Container{}
64 for _, ctr := range containersStore {
65 running, err := containerIsRunning(ctx, runtimeClient, ctr.ID)
66 if err != nil {
67 log.Error("error checking container state", "ctrID", ctr.ID, "error", err)
68 continue
69 }
70 if !running {
71 continue
72 }
73
74 requestCtr, err := addContainerDeviceRequests(ctx, runtimeClient, &ctr)
75 if err != nil {
76 log.Error("error fetching device requests", "ctrID", ctr.ID, "error", err)
77 continue
78 }
79 if requestCtr == nil {
80 continue
81 }
82 containers[requestCtr.ID] = requestCtr
83 }
84 return containers, nil
85 }
86
87
88
89 func FetchContainer(ctx context.Context, ctrClient *containerd.Client, runtimeClient criruntime.RuntimeServiceClient, ctrID string) (*containers.Container, error) {
90 if running, err := containerIsRunning(ctx, runtimeClient, ctrID); err != nil || !running {
91 return nil, err
92 }
93
94 ctr, err := getContainer(ctx, ctrClient, ctrID)
95 if err != nil {
96 return nil, err
97 }
98
99 requestCtr, err := addContainerDeviceRequests(ctx, runtimeClient, &ctr)
100 if err != nil {
101 return nil, err
102 }
103 if requestCtr == nil {
104 return nil, nil
105 }
106 return requestCtr, nil
107 }
108
109
110 func FetchContainerParentProcessID(ctx context.Context, ctrClient *containerd.Client, ctrID string) (string, error) {
111 taskResult, err := ctrClient.TaskService().ListPids(ctx, &tasks.ListPidsRequest{
112 ContainerID: ctrID,
113 })
114 if err != nil {
115 return "", fmt.Errorf("could not find container process ids: %w", err)
116 }
117 if len(taskResult.Processes) == 0 {
118 return "", fmt.Errorf("could not find container process ids")
119 }
120 return strconv.Itoa(int(taskResult.Processes[0].Pid)), nil
121 }
122
123
124 func FetchContainerRootPath(ctx context.Context, ctrClient *containerd.Client, ctr *containers.Container) (string, error) {
125 pid, err := FetchContainerParentProcessID(ctx, ctrClient, ctr.ID)
126 if err != nil {
127 return "", fmt.Errorf("could not find container %s process id: %w", ctr.ID, err)
128 }
129 return fmt.Sprintf("/proc/%s/root", pid), nil
130 }
131
132
133 func WithContainerLogger(ctx context.Context, ctr *containers.Container) context.Context {
134 logLevel := ctr.Labels[cc.AnnDeviceLogLevel]
135 ctrName := ctr.Labels[cc.AnnContainerName]
136 podName := ctr.Labels[annPodName]
137 podNamespace := ctr.Labels[annPodNamespace]
138 opts := []logger.Option{
139 logger.WithLevel(logger.ToLevel(logLevel)),
140 }
141 return logger.IntoContext(ctx, logger.New(opts...).WithGroup(ctrName).With("container", ctrName, "containerId", ctr.ID, "pod", podName, "namespace", podNamespace))
142 }
143
144
145 func containerIsRunning(ctx context.Context, runtimeClient criruntime.RuntimeServiceClient, ctrID string) (bool, error) {
146 var state criruntime.ContainerState
147 var lastErr error
148
149
150 podSandbox, err := runtimeClient.PodSandboxStatus(ctx, &criruntime.PodSandboxStatusRequest{PodSandboxId: ctrID})
151 if err == nil && podSandbox != nil {
152 return false, nil
153 }
154
155 if err := wait.ExponentialBackoffWithContext(ctx, defaultBackoff, func(ctx context.Context) (done bool, err error) {
156 status, err := runtimeClient.ContainerStatus(ctx, &criruntime.ContainerStatusRequest{ContainerId: ctrID})
157 if err != nil {
158 lastErr = err
159 return false, nil
160 }
161 state = status.Status.State
162 return true, nil
163 }); err != nil {
164 return false, fmt.Errorf("error fetching container runtime state: %w, %w", err, lastErr)
165 }
166 _, ok := notRunningStates[state]
167 if ok {
168 return false, nil
169 }
170 return true, nil
171 }
172
173
174 func getContainer(ctx context.Context, ctrClient *containerd.Client, ctrID string) (containers.Container, error) {
175 var container containers.Container
176 var lastError error
177 containerService := ctrClient.ContainerService()
178 if err := wait.ExponentialBackoffWithContext(ctx, defaultBackoff, func(_ context.Context) (done bool, err error) {
179 container, err = containerService.Get(ctx, ctrID)
180 if err != nil {
181 lastError = err
182 return false, nil
183 }
184 return true, nil
185 }); err != nil {
186 return container, fmt.Errorf("could not fetch container %w: %s: %w", err, ctrID, lastError)
187 }
188 return container, nil
189 }
190
191
192 func addContainerDeviceRequests(ctx context.Context, runtimeClient criruntime.RuntimeServiceClient, ctr *containers.Container) (*containers.Container, error) {
193 ctrName := ctr.Labels[cc.AnnContainerName]
194 podName := ctr.Labels[cc.AnnPodName]
195 podNamespace := ctr.Labels[cc.AnnPodNamespace]
196
197 if _, ok := ignoreContainers[ctrName]; ok {
198 return nil, nil
199 }
200
201 if _, ok := ignoreNamespaces[podNamespace]; ok {
202 return nil, nil
203 }
204
205 if ctrName == "" || podName == "" {
206 return nil, fmt.Errorf("missing pod or container name, container: %s, pod: %s", ctrName, podName)
207 }
208
209 podSandboxList, err := runtimeClient.ListPodSandbox(ctx, &criruntime.ListPodSandboxRequest{
210 Filter: &criruntime.PodSandboxFilter{
211 LabelSelector: map[string]string{
212 cc.AnnPodName: podName,
213 },
214 }},
215 )
216 if err != nil {
217 return nil, err
218 }
219 if podSandboxList == nil || len(podSandboxList.Items) == 0 {
220 return nil, ErrPodSandboxDoesNotExist
221 }
222
223 pod := podSandboxList.Items[0]
224 requestsDevice := false
225 for key, value := range pod.Annotations {
226 if !class.IsDeviceClass(key) {
227 continue
228 }
229
230 classes := strings.Split(value, ",")
231 parsedCtrName, err := parseContainerName(key)
232 if err != nil {
233 return nil, err
234 }
235
236 if parsedCtrName != ctrName {
237 continue
238 }
239
240 for _, className := range classes {
241 if className == "" {
242 continue
243 }
244
245 requestsDevice = true
246 ctr.Labels[class.FmtClassLabel(className)] = "requested"
247 }
248 }
249
250
251 if !requestsDevice {
252 return nil, nil
253 }
254
255 ctr.Labels[cc.AnnDeviceLogLevel] = "info"
256 logLevel, ok := pod.Annotations[cc.AnnDeviceLogLevel]
257 if ok {
258 ctr.Labels[cc.AnnDeviceLogLevel] = logLevel
259 }
260 return ctr, nil
261 }
262
263
264 func parseContainerName(className string) (string, error) {
265 return class.BaseName(className)
266 }
267
View as plain text