package displayctl import ( "context" "io" "slices" "time" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/lib/kernel/udev" "edge-infra.dev/pkg/lib/kernel/udev/reader" "edge-infra.dev/pkg/sds/display/constants" v2 "edge-infra.dev/pkg/sds/display/k8s/apis/v2" ) const ( timeout = time.Second * 3 deviceWatcherMessage = "new device event observed, updating NodeDisplayConfig" ) // Events actions that trigger reconciles for each device sub-system. var udevSubSystemTriggers = map[string][]udev.ActionType{ "input": {udev.AddAction, udev.RemoveAction, udev.ChangeAction}, "drm": {udev.ChangeAction}, } // DeviceWatcherRunnable is a RunnableFunc which can be ran by a controller // manager. It watches for new udev events and updates the host's NodeDisplayConfig // to trigger the display controller to apply its configuration to the display // manager. This ensures the correct configuration is applied whenever devices are // added or removed. type DeviceWatcherRunnable struct { Name string Hostname string Client client.Client } func NewDeviceWatcherRunnable(hostname string, c client.Client) *DeviceWatcherRunnable { return &DeviceWatcherRunnable{ Name: constants.DeviceWatcherName, Hostname: hostname, Client: c, } } func (r *DeviceWatcherRunnable) SetupWithManager(mgr ctrl.Manager) error { return mgr.Add(r) } func (r *DeviceWatcherRunnable) Start(ctx context.Context) error { log := ctrl.LoggerFrom(ctx).WithName(r.Name) log.Info("starting device watcher") annotator := newNodeDisplayConfigAnnotator(r.Hostname, v2.DevicesUpdatedAtAnnotation, r.Client, log, deviceWatcherMessage) decoder, uEventReader, err := createDecoder() if err != nil { return err } defer uEventReader.Close() uEventChan, streamErrChan := reader.StreamUEvents(ctx, decoder) // until context is cancelled, watch for udev events and update the // annotation to trigger a reconcile // // rate-limit the annotation updates as many udev events appear each // time a device is (dis)connected for { select { case uEvent := <-uEventChan: if isTargetUEvent(uEvent) { annotator.AnnotateRateLimited(ctx, time.Now().Format(time.RFC3339), timeout) } case err := <-streamErrChan: return err case err := <-annotator.ErrChan: return err case <-ctx.Done(): log.Info("context cancelled, exiting device watcher") return nil } } } func createDecoder() (reader.Decoder, io.ReadCloser, error) { uEventReader, fd, err := udev.NewUEventReader(udev.Netlink) if err != nil { return nil, nil, err } decoder := reader.NewSocketReader(fd) return decoder, uEventReader, nil } func isTargetUEvent(uEvent *udev.UEvent) bool { if events, ok := udevSubSystemTriggers[uEvent.SubSystem]; ok { return slices.Contains(events, uEvent.Action) } return false }