1
2
3 package udevproxy
4
5 import (
6 "context"
7 "iter"
8 "log/slog"
9 "maps"
10 "slices"
11
12 "github.com/containerd/containerd/containers"
13 typeurl "github.com/containerd/typeurl/v2"
14 "github.com/opencontainers/runtime-spec/specs-go"
15
16 cc "edge-infra.dev/pkg/sds/devices/agent/common"
17
18 "edge-infra.dev/pkg/lib/kernel/udev"
19 "edge-infra.dev/pkg/sds/devices/logger"
20 )
21
22 var UDevProxyFn = NewUdevProxy
23
24 type proxyRequest struct {
25 path string
26 log *slog.Logger
27 }
28
29
30 func ReplayRemoveUEvents(ctx context.Context, uevents []*udev.UEvent, containers map[string]*containers.Container) {
31 removeUEvents := []*udev.UEvent{}
32 for _, ue := range uevents {
33 if !isRemoveUEvent(ue) {
34 continue
35 }
36 removeUEvents = append(removeUEvents, ue)
37 }
38 ReplayUEventsToContainers(ctx, removeUEvents, containers)
39 }
40
41
42 func ReplayUEventsToContainers(ctx context.Context, uevents []*udev.UEvent, containers map[string]*containers.Container) {
43 log := logger.FromContext(ctx)
44 proxyRequests := fetchProxyRequests(ctx, maps.All(containers))
45
46 log.Log(ctx, logger.LevelTrace, "sending udev events to container network namespace", "uevents", uevents, "proxy requests", proxyRequests, "containers", slices.Collect(maps.Keys(containers)))
47 slices.SortFunc(uevents, timeComparison())
48
49 ueventLog := map[string]udev.ActionType{}
50 for _, uevent := range uevents {
51 ueventLog[uevent.SysPath] = uevent.Action
52 }
53
54 udevProxy := UDevProxyFn()
55 for _, req := range proxyRequests {
56 if err := udevProxy.Send(req.path, uevents); err != nil {
57 req.log.Error("error sending event to container network namespace", "error", err)
58 continue
59 }
60 req.log.Debug("successfully replayed udev events to container", "uevents", ueventLog)
61 }
62 }
63
64
65 func fetchProxyRequests(ctx context.Context, containers iter.Seq2[string, *containers.Container]) []proxyRequest {
66 log := logger.FromContext(ctx)
67 proxyRequests := map[string]proxyRequest{}
68 for _, ctr := range containers {
69 spec := &specs.Spec{}
70 if err := typeurl.UnmarshalTo(ctr.Spec, spec); err != nil {
71 log.Error("error unmarshalling container spec", "error", err)
72 continue
73 }
74
75 log := logger.New()
76 logLevel, ok := ctr.Labels[cc.AnnDeviceLogLevel]
77 if ok {
78 log = logger.New(logger.WithLevel(logger.ToLevel(logLevel)))
79 }
80
81 pathFound := false
82 for _, namespace := range spec.Linux.Namespaces {
83 if namespace.Type != specs.NetworkNamespace || namespace.Path == "" {
84 continue
85 }
86 proxyRequests[namespace.Path] = proxyRequest{
87 log: log.With("pod", ctr.Labels[cc.AnnPodName], "namespace", ctr.Labels[cc.AnnPodNamespace], "container", ctr.Labels[cc.AnnContainerName], "path", namespace.Path),
88 path: namespace.Path,
89 }
90 pathFound = true
91 break
92 }
93
94 if !pathFound {
95 log.Error("could not find container network namespace", "container", ctr.Labels[cc.AnnContainerName])
96 }
97 }
98 return slices.Collect(maps.Values(proxyRequests))
99 }
100
101 func isRemoveUEvent(udevEvent *udev.UEvent) bool {
102 return slices.Contains([]udev.ActionType{udev.RemoveAction, udev.UnbindAction}, udevEvent.Action)
103 }
104
105
106 func timeComparison() func(a, b *udev.UEvent) int {
107 return func(a, b *udev.UEvent) int {
108 if a.Timestamp.Before(b.Timestamp) {
109 return -1
110 }
111 if a.Timestamp.Equal(b.Timestamp) {
112 return 0
113 }
114 return 1
115 }
116 }
117
View as plain text