...

Source file src/edge-infra.dev/pkg/sds/display/k8s/controllers/displayctl/devicewatcher_runnable.go

Documentation: edge-infra.dev/pkg/sds/display/k8s/controllers/displayctl

     1  package displayctl
     2  
     3  import (
     4  	"context"
     5  	"io"
     6  	"slices"
     7  	"time"
     8  
     9  	ctrl "sigs.k8s.io/controller-runtime"
    10  	"sigs.k8s.io/controller-runtime/pkg/client"
    11  
    12  	"edge-infra.dev/pkg/lib/kernel/udev"
    13  	"edge-infra.dev/pkg/lib/kernel/udev/reader"
    14  	"edge-infra.dev/pkg/sds/display/constants"
    15  	v2 "edge-infra.dev/pkg/sds/display/k8s/apis/v2"
    16  )
    17  
    18  const (
    19  	timeout              = time.Second * 3
    20  	deviceWatcherMessage = "new device event observed, updating NodeDisplayConfig"
    21  )
    22  
    23  // Events actions that trigger reconciles for each device sub-system.
    24  var udevSubSystemTriggers = map[string][]udev.ActionType{
    25  	"input": {udev.AddAction, udev.RemoveAction, udev.ChangeAction},
    26  	"drm":   {udev.ChangeAction},
    27  }
    28  
    29  // DeviceWatcherRunnable is a RunnableFunc which can be ran by a controller
    30  // manager. It watches for new udev events and updates the host's NodeDisplayConfig
    31  // to trigger the display controller to apply its configuration to the display
    32  // manager. This ensures the correct configuration is applied whenever devices are
    33  // added or removed.
    34  type DeviceWatcherRunnable struct {
    35  	Name     string
    36  	Hostname string
    37  	Client   client.Client
    38  }
    39  
    40  func NewDeviceWatcherRunnable(hostname string, c client.Client) *DeviceWatcherRunnable {
    41  	return &DeviceWatcherRunnable{
    42  		Name:     constants.DeviceWatcherName,
    43  		Hostname: hostname,
    44  		Client:   c,
    45  	}
    46  }
    47  
    48  func (r *DeviceWatcherRunnable) SetupWithManager(mgr ctrl.Manager) error {
    49  	return mgr.Add(r)
    50  }
    51  
    52  func (r *DeviceWatcherRunnable) Start(ctx context.Context) error {
    53  	log := ctrl.LoggerFrom(ctx).WithName(r.Name)
    54  	log.Info("starting device watcher")
    55  
    56  	annotator := newNodeDisplayConfigAnnotator(r.Hostname, v2.DevicesUpdatedAtAnnotation, r.Client, log, deviceWatcherMessage)
    57  
    58  	decoder, uEventReader, err := createDecoder()
    59  	if err != nil {
    60  		return err
    61  	}
    62  	defer uEventReader.Close()
    63  
    64  	uEventChan, streamErrChan := reader.StreamUEvents(ctx, decoder)
    65  
    66  	// until context is cancelled, watch for udev events and update the
    67  	// annotation to trigger a reconcile
    68  	//
    69  	// rate-limit the annotation updates as many udev events appear each
    70  	// time a device is (dis)connected
    71  	for {
    72  		select {
    73  		case uEvent := <-uEventChan:
    74  			if isTargetUEvent(uEvent) {
    75  				annotator.AnnotateRateLimited(ctx, time.Now().Format(time.RFC3339), timeout)
    76  			}
    77  		case err := <-streamErrChan:
    78  			return err
    79  		case err := <-annotator.ErrChan:
    80  			return err
    81  		case <-ctx.Done():
    82  			log.Info("context cancelled, exiting device watcher")
    83  			return nil
    84  		}
    85  	}
    86  }
    87  
    88  func createDecoder() (reader.Decoder, io.ReadCloser, error) {
    89  	uEventReader, fd, err := udev.NewUEventReader(udev.Netlink)
    90  	if err != nil {
    91  		return nil, nil, err
    92  	}
    93  	decoder := reader.NewSocketReader(fd)
    94  	return decoder, uEventReader, nil
    95  }
    96  
    97  func isTargetUEvent(uEvent *udev.UEvent) bool {
    98  	if events, ok := udevSubSystemTriggers[uEvent.SubSystem]; ok {
    99  		return slices.Contains(events, uEvent.Action)
   100  	}
   101  	return false
   102  }
   103  

View as plain text