...

Source file src/edge-infra.dev/pkg/sds/devices/agent/udevproxy/uevents.go

Documentation: edge-infra.dev/pkg/sds/devices/agent/udevproxy

     1  //go:build linux
     2  
     3  package udevproxy
     4  
     5  import (
     6  	"context"
     7  	"iter"
     8  	"log/slog"
     9  	"maps"
    10  	"slices"
    11  
    12  	"github.com/containerd/containerd/containers"
    13  	typeurl "github.com/containerd/typeurl/v2"
    14  	"github.com/opencontainers/runtime-spec/specs-go"
    15  
    16  	cc "edge-infra.dev/pkg/sds/devices/agent/common"
    17  
    18  	"edge-infra.dev/pkg/lib/kernel/udev"
    19  	"edge-infra.dev/pkg/sds/devices/logger"
    20  )
    21  
    22  var UDevProxyFn = NewUdevProxy
    23  
    24  type proxyRequest struct {
    25  	path string
    26  	log  *slog.Logger
    27  }
    28  
    29  // replayRemoveUEvents replays the remove uevents to the clients
    30  func ReplayRemoveUEvents(ctx context.Context, uevents []*udev.UEvent, containers map[string]*containers.Container) {
    31  	removeUEvents := []*udev.UEvent{}
    32  	for _, ue := range uevents {
    33  		if !isRemoveUEvent(ue) {
    34  			continue
    35  		}
    36  		removeUEvents = append(removeUEvents, ue)
    37  	}
    38  	ReplayUEventsToContainers(ctx, removeUEvents, containers)
    39  }
    40  
    41  // replayUEvents given a list of containers will attempt to replay the uevents to each containers network namespace
    42  func ReplayUEventsToContainers(ctx context.Context, uevents []*udev.UEvent, containers map[string]*containers.Container) {
    43  	log := logger.FromContext(ctx)
    44  	proxyRequests := fetchProxyRequests(ctx, maps.All(containers))
    45  
    46  	log.Log(ctx, logger.LevelTrace, "sending udev events to container network namespace", "uevents", uevents, "proxy requests", proxyRequests, "containers", slices.Collect(maps.Keys(containers)))
    47  	slices.SortFunc(uevents, timeComparison())
    48  
    49  	ueventLog := map[string]udev.ActionType{}
    50  	for _, uevent := range uevents {
    51  		ueventLog[uevent.SysPath] = uevent.Action
    52  	}
    53  
    54  	udevProxy := UDevProxyFn()
    55  	for _, req := range proxyRequests {
    56  		if err := udevProxy.Send(req.path, uevents); err != nil {
    57  			req.log.Error("error sending event to container network namespace", "error", err)
    58  			continue
    59  		}
    60  		req.log.Debug("successfully replayed udev events to container", "uevents", ueventLog)
    61  	}
    62  }
    63  
    64  // fetchProxyRequests will iterate a list of containers and fetch the paths to the network namespace file-descriptors.
    65  func fetchProxyRequests(ctx context.Context, containers iter.Seq2[string, *containers.Container]) []proxyRequest {
    66  	log := logger.FromContext(ctx)
    67  	proxyRequests := map[string]proxyRequest{}
    68  	for _, ctr := range containers {
    69  		spec := &specs.Spec{}
    70  		if err := typeurl.UnmarshalTo(ctr.Spec, spec); err != nil {
    71  			log.Error("error unmarshalling container spec", "error", err)
    72  			continue
    73  		}
    74  
    75  		log := logger.New()
    76  		logLevel, ok := ctr.Labels[cc.AnnDeviceLogLevel]
    77  		if ok {
    78  			log = logger.New(logger.WithLevel(logger.ToLevel(logLevel)))
    79  		}
    80  
    81  		pathFound := false
    82  		for _, namespace := range spec.Linux.Namespaces {
    83  			if namespace.Type != specs.NetworkNamespace || namespace.Path == "" {
    84  				continue
    85  			}
    86  			proxyRequests[namespace.Path] = proxyRequest{
    87  				log:  log.With("pod", ctr.Labels[cc.AnnPodName], "namespace", ctr.Labels[cc.AnnPodNamespace], "container", ctr.Labels[cc.AnnContainerName], "path", namespace.Path),
    88  				path: namespace.Path,
    89  			}
    90  			pathFound = true
    91  			break
    92  		}
    93  
    94  		if !pathFound {
    95  			log.Error("could not find container network namespace", "container", ctr.Labels[cc.AnnContainerName])
    96  		}
    97  	}
    98  	return slices.Collect(maps.Values(proxyRequests))
    99  }
   100  
   101  func isRemoveUEvent(udevEvent *udev.UEvent) bool {
   102  	return slices.Contains([]udev.ActionType{udev.RemoveAction, udev.UnbindAction}, udevEvent.Action)
   103  }
   104  
   105  // returns cmp function for sorting uevents by timestamp
   106  func timeComparison() func(a, b *udev.UEvent) int {
   107  	return func(a, b *udev.UEvent) int {
   108  		if a.Timestamp.Before(b.Timestamp) {
   109  			return -1
   110  		}
   111  		if a.Timestamp.Equal(b.Timestamp) {
   112  			return 0
   113  		}
   114  		return 1
   115  	}
   116  }
   117  

View as plain text