package displayctl import ( "context" "fmt" "path/filepath" "time" "github.com/fsnotify/fsnotify" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/sds/display/constants" "edge-infra.dev/pkg/sds/display/displaymanager/manager" v2 "edge-infra.dev/pkg/sds/display/k8s/apis/v2" ) const displayManagerWatcherMessage = "display manager socket refreshed, updating NodeDisplayConfig" // DisplayManagerWatcherRunnable is a RunnableFunc which can be ran by a controller // manager. It watches for the display manager's socket being created and updates // the host's NodeDisplayConfig to trigger the display controller to apply its // configuration to the display manager. This ensures X is always updated whenever // it restarts. type DisplayManagerWatcherRunnable struct { Name string Client client.Client manager.DisplayManager } func NewDisplayManagerWatcherRunnable(displayManager manager.DisplayManager, c client.Client) *DisplayManagerWatcherRunnable { return &DisplayManagerWatcherRunnable{ Name: constants.DisplayManagerWatcherName, Client: c, DisplayManager: displayManager, } } func (r *DisplayManagerWatcherRunnable) SetupWithManager(mgr ctrl.Manager) error { return mgr.Add(r) } // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete // Start runs the display manager watcher. It watches for the display manager's // socket being created or deleted as a way to tell if it has started or stopped. // // When this happens, the node's display UI resource is set to unavailable. This // tells UI pods that they cannot start yet. Currently running UI pods are // descheduled. They will be scheduled once displayctl has configured the display // manager and enabled the resource again. // // The host's NodeDisplayConfig is also updated on socket events to trigger // a reconcile from the display controller. // // This thread runs indefinitely alongside the controller until the context is // cancelled or an error occurs. func (r *DisplayManagerWatcherRunnable) Start(ctx context.Context) error { log := ctrl.LoggerFrom(ctx).WithName(r.Name).WithValues("socket", r.Socket()) log.Info("starting display manager watcher") annotator := newNodeDisplayConfigAnnotator(r.Hostname(), v2.DisplayManagerRestartedAtAnnotation, r.Client, log, displayManagerWatcherMessage) watcher, err := createFSWatcher(r.Socket()) if err != nil { return fmt.Errorf("unable to create socket watcher for %s: %w", r.Socket(), err) } defer watcher.Close() for { select { case event, ok := <-watcher.Events: if !ok { log.Info("file watcher stopped, exiting display manager watcher") return nil } if isTargetSocketEvent(event, r.Socket()) { // disable the UI request node resource if err := updateUIRequestNodeResource(ctx, r.Hostname(), false, log, r.Client); err != nil { return err } // update NodeDisplayConfig annotation to trigger reconcile if err := annotator.Annotate(ctx, time.Now().Format(time.RFC3339)); err != nil { return err } } case err, ok := <-watcher.Errors: if !ok { log.Info("file watcher stopped, exiting display manager watcher") return nil } return err case <-ctx.Done(): log.Info("context cancelled, exiting display manager watcher") return nil } } } // Creates a new fsnotify watcher for the socket's directory. func createFSWatcher(socket string) (*fsnotify.Watcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err } socketDir := filepath.Dir(socket) if err := watcher.Add(socketDir); err != nil { return nil, err } return watcher, nil } // Returns true for create and delete events for the socket file. func isTargetSocketEvent(event fsnotify.Event, socket string) bool { if event.Name != socket { return false } if event.Op&fsnotify.Create == fsnotify.Create { return true } return event.Op&fsnotify.Remove == fsnotify.Remove }