//go:build linux package udevproxy import ( "context" "iter" "log/slog" "maps" "slices" "github.com/containerd/containerd/containers" typeurl "github.com/containerd/typeurl/v2" "github.com/opencontainers/runtime-spec/specs-go" cc "edge-infra.dev/pkg/sds/devices/agent/common" "edge-infra.dev/pkg/lib/kernel/udev" "edge-infra.dev/pkg/sds/devices/logger" ) var UDevProxyFn = NewUdevProxy type proxyRequest struct { path string log *slog.Logger } // replayRemoveUEvents replays the remove uevents to the clients func ReplayRemoveUEvents(ctx context.Context, uevents []*udev.UEvent, containers map[string]*containers.Container) { removeUEvents := []*udev.UEvent{} for _, ue := range uevents { if !isRemoveUEvent(ue) { continue } removeUEvents = append(removeUEvents, ue) } ReplayUEventsToContainers(ctx, removeUEvents, containers) } // replayUEvents given a list of containers will attempt to replay the uevents to each containers network namespace func ReplayUEventsToContainers(ctx context.Context, uevents []*udev.UEvent, containers map[string]*containers.Container) { log := logger.FromContext(ctx) proxyRequests := fetchProxyRequests(ctx, maps.All(containers)) log.Log(ctx, logger.LevelTrace, "sending udev events to container network namespace", "uevents", uevents, "proxy requests", proxyRequests, "containers", slices.Collect(maps.Keys(containers))) slices.SortFunc(uevents, timeComparison()) ueventLog := map[string]udev.ActionType{} for _, uevent := range uevents { ueventLog[uevent.SysPath] = uevent.Action } udevProxy := UDevProxyFn() for _, req := range proxyRequests { if err := udevProxy.Send(req.path, uevents); err != nil { req.log.Error("error sending event to container network namespace", "error", err) continue } req.log.Debug("successfully replayed udev events to container", "uevents", ueventLog) } } // fetchProxyRequests will iterate a list of containers and fetch the paths to the network namespace file-descriptors. func fetchProxyRequests(ctx context.Context, containers iter.Seq2[string, *containers.Container]) []proxyRequest { log := logger.FromContext(ctx) proxyRequests := map[string]proxyRequest{} for _, ctr := range containers { spec := &specs.Spec{} if err := typeurl.UnmarshalTo(ctr.Spec, spec); err != nil { log.Error("error unmarshalling container spec", "error", err) continue } log := logger.New() logLevel, ok := ctr.Labels[cc.AnnDeviceLogLevel] if ok { log = logger.New(logger.WithLevel(logger.ToLevel(logLevel))) } pathFound := false for _, namespace := range spec.Linux.Namespaces { if namespace.Type != specs.NetworkNamespace || namespace.Path == "" { continue } proxyRequests[namespace.Path] = proxyRequest{ log: log.With("pod", ctr.Labels[cc.AnnPodName], "namespace", ctr.Labels[cc.AnnPodNamespace], "container", ctr.Labels[cc.AnnContainerName], "path", namespace.Path), path: namespace.Path, } pathFound = true break } if !pathFound { log.Error("could not find container network namespace", "container", ctr.Labels[cc.AnnContainerName]) } } return slices.Collect(maps.Values(proxyRequests)) } func isRemoveUEvent(udevEvent *udev.UEvent) bool { return slices.Contains([]udev.ActionType{udev.RemoveAction, udev.UnbindAction}, udevEvent.Action) } // returns cmp function for sorting uevents by timestamp func timeComparison() func(a, b *udev.UEvent) int { return func(a, b *udev.UEvent) int { if a.Timestamp.Before(b.Timestamp) { return -1 } if a.Timestamp.Equal(b.Timestamp) { return 0 } return 1 } }