...

Source file src/edge-infra.dev/pkg/sds/display/k8s/controllers/displayctl/displaymanagerwatcher_runnable.go

Documentation: edge-infra.dev/pkg/sds/display/k8s/controllers/displayctl

     1  package displayctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"path/filepath"
     7  	"time"
     8  
     9  	"github.com/fsnotify/fsnotify"
    10  	ctrl "sigs.k8s.io/controller-runtime"
    11  	"sigs.k8s.io/controller-runtime/pkg/client"
    12  
    13  	"edge-infra.dev/pkg/sds/display/constants"
    14  	"edge-infra.dev/pkg/sds/display/displaymanager/manager"
    15  	v2 "edge-infra.dev/pkg/sds/display/k8s/apis/v2"
    16  )
    17  
    18  const displayManagerWatcherMessage = "display manager socket refreshed, updating NodeDisplayConfig"
    19  
    20  // DisplayManagerWatcherRunnable is a RunnableFunc which can be ran by a controller
    21  // manager. It watches for the display manager's socket being created and updates
    22  // the host's NodeDisplayConfig to trigger the display controller to apply its
    23  // configuration to the display manager. This ensures X is always updated whenever
    24  // it restarts.
    25  type DisplayManagerWatcherRunnable struct {
    26  	Name   string
    27  	Client client.Client
    28  
    29  	manager.DisplayManager
    30  }
    31  
    32  func NewDisplayManagerWatcherRunnable(displayManager manager.DisplayManager, c client.Client) *DisplayManagerWatcherRunnable {
    33  	return &DisplayManagerWatcherRunnable{
    34  		Name:           constants.DisplayManagerWatcherName,
    35  		Client:         c,
    36  		DisplayManager: displayManager,
    37  	}
    38  }
    39  
    40  func (r *DisplayManagerWatcherRunnable) SetupWithManager(mgr ctrl.Manager) error {
    41  	return mgr.Add(r)
    42  }
    43  
    44  // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete
    45  
    46  // Start runs the display manager watcher. It watches for the display manager's
    47  // socket being created or deleted as a way to tell if it has started or stopped.
    48  //
    49  // When this happens, the node's display UI resource is set to unavailable. This
    50  // tells UI pods that they cannot start yet. Currently running UI pods are
    51  // descheduled. They will be scheduled once displayctl has configured the display
    52  // manager and enabled the resource again.
    53  //
    54  // The host's NodeDisplayConfig is also updated on socket events to trigger
    55  // a reconcile from the display controller.
    56  //
    57  // This thread runs indefinitely alongside the controller until the context is
    58  // cancelled or an error occurs.
    59  func (r *DisplayManagerWatcherRunnable) Start(ctx context.Context) error {
    60  	log := ctrl.LoggerFrom(ctx).WithName(r.Name).WithValues("socket", r.Socket())
    61  	log.Info("starting display manager watcher")
    62  
    63  	annotator := newNodeDisplayConfigAnnotator(r.Hostname(), v2.DisplayManagerRestartedAtAnnotation, r.Client, log, displayManagerWatcherMessage)
    64  
    65  	watcher, err := createFSWatcher(r.Socket())
    66  	if err != nil {
    67  		return fmt.Errorf("unable to create socket watcher for %s: %w", r.Socket(), err)
    68  	}
    69  	defer watcher.Close()
    70  
    71  	for {
    72  		select {
    73  		case event, ok := <-watcher.Events:
    74  			if !ok {
    75  				log.Info("file watcher stopped, exiting display manager watcher")
    76  				return nil
    77  			}
    78  			if isTargetSocketEvent(event, r.Socket()) {
    79  				// disable the UI request node resource
    80  				if err := updateUIRequestNodeResource(ctx, r.Hostname(), false, log, r.Client); err != nil {
    81  					return err
    82  				}
    83  				// update NodeDisplayConfig annotation to trigger reconcile
    84  				if err := annotator.Annotate(ctx, time.Now().Format(time.RFC3339)); err != nil {
    85  					return err
    86  				}
    87  			}
    88  		case err, ok := <-watcher.Errors:
    89  			if !ok {
    90  				log.Info("file watcher stopped, exiting display manager watcher")
    91  				return nil
    92  			}
    93  			return err
    94  		case <-ctx.Done():
    95  			log.Info("context cancelled, exiting display manager watcher")
    96  			return nil
    97  		}
    98  	}
    99  }
   100  
   101  // Creates a new fsnotify watcher for the socket's directory.
   102  func createFSWatcher(socket string) (*fsnotify.Watcher, error) {
   103  	watcher, err := fsnotify.NewWatcher()
   104  	if err != nil {
   105  		return nil, err
   106  	}
   107  
   108  	socketDir := filepath.Dir(socket)
   109  	if err := watcher.Add(socketDir); err != nil {
   110  		return nil, err
   111  	}
   112  
   113  	return watcher, nil
   114  }
   115  
   116  // Returns true for create and delete events for the socket file.
   117  func isTargetSocketEvent(event fsnotify.Event, socket string) bool {
   118  	if event.Name != socket {
   119  		return false
   120  	}
   121  	if event.Op&fsnotify.Create == fsnotify.Create {
   122  		return true
   123  	}
   124  	return event.Op&fsnotify.Remove == fsnotify.Remove
   125  }
   126  

View as plain text