...

Source file src/edge-infra.dev/pkg/sds/lanoutage/detector/lanoutage.go

Documentation: edge-infra.dev/pkg/sds/lanoutage/detector

     1  package detector
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"time"
     8  
     9  	"github.com/spf13/afero"
    10  	ctrl "sigs.k8s.io/controller-runtime"
    11  
    12  	"edge-infra.dev/pkg/lib/fog"
    13  	etcdconstants "edge-infra.dev/pkg/sds/etcd/operator/constants"
    14  	"edge-infra.dev/pkg/sds/lanoutage/detector/internal/config"
    15  	"edge-infra.dev/pkg/sds/lanoutage/detector/internal/constants"
    16  	"edge-infra.dev/pkg/sds/lanoutage/detector/internal/interlock"
    17  	"edge-infra.dev/pkg/sds/lib/dbus/systemd"
    18  )
    19  
    20  const (
    21  	shortEnterTime time.Duration = 2 * time.Minute
    22  	shortLeaveTime time.Duration = 0 * time.Second
    23  	enterTime      time.Duration = 30 * time.Minute
    24  	leaveTime      time.Duration = 10 * time.Minute
    25  )
    26  
    27  // Action encodes the operations that the reconciler can effect on the node
    28  type Action int
    29  
    30  const (
    31  	None Action = iota
    32  	Enter
    33  	Leave
    34  	Stay
    35  )
    36  
    37  func Run() error {
    38  	ctx := context.Background()
    39  
    40  	cfg, err := config.New()
    41  	if err != nil {
    42  		return err
    43  	}
    44  
    45  	log := fog.New(fog.WithLevel(cfg.LOMConfig.LogLevel())).WithName("lan-outage-detector")
    46  	ctx = fog.IntoContext(ctx, log)
    47  	ctrl.SetLogger(log)
    48  
    49  	if !cfg.ThickPos {
    50  		log.Info("Thick POS mode is disabled. LAN Outage Detector will not run.")
    51  		return nil
    52  	}
    53  
    54  	if err := removeOldLANOutageDetector(ctx); err != nil {
    55  		return fmt.Errorf("error removing old LAN Outage Detector: %w", err)
    56  	}
    57  
    58  	isLOM, err := afero.Exists(cfg.Fs, constants.LOMFlagFilepath)
    59  	if err != nil {
    60  		return fmt.Errorf("failed to check if node is in LAN Outage Mode: %w", err)
    61  	}
    62  
    63  	firstBoot, err := isFirstBoot(cfg.Fs)
    64  	if err != nil {
    65  		return fmt.Errorf("failed to check if LOD has restarted: %w", err)
    66  	}
    67  
    68  	if firstBoot {
    69  		if err := cfg.Fs.RemoveAll(etcdconstants.OperatorFilewallFilepath); err != nil {
    70  			return fmt.Errorf("failed to remove etcd operator firewall: %w", err)
    71  		}
    72  	}
    73  
    74  	log.Info("LAN Outage Detector running..")
    75  
    76  	lomReconciler := NewLOMReconciler(cfg, isLOM)
    77  	lanDetector, err := newLANDetector(lomReconciler)
    78  	if err != nil {
    79  		return fmt.Errorf("failed to initialize LAN outage detector: %w", err)
    80  	}
    81  
    82  	if firstBoot {
    83  		lomReconciler.enterDeadline = time.Now().Add(shortEnterTime)
    84  		lomReconciler.leaveDeadline = time.Now().Add(shortLeaveTime)
    85  	}
    86  
    87  	if isLOM && firstBoot {
    88  		err := lomReconciler.Enter(ctx)
    89  		if err != nil {
    90  			log.Error(err, "failed to completely re-enter LAN Outage Mode")
    91  		}
    92  	}
    93  
    94  	// make the SocketHandler before the goroutine so we can handle filesystem errors
    95  	sh, err := NewSocketHandler(constants.SocketFilepath)
    96  	if err != nil {
    97  		return fmt.Errorf("failed to create the socket handler: %w", err)
    98  	}
    99  
   100  	// accept structs
   101  	go forceClientHandler(ctx, lomReconciler, sh)
   102  	return detectLoop(ctx, &lanDetector)
   103  }
   104  
   105  // Main control loop for the program.
   106  // Detects state of control plane network connection every 10 seconds.
   107  // Resets a timer every time there is a connection change.
   108  // Once the timer times out, the LANDetector will either enter
   109  // or leave LAN Outage Mode.
   110  func detectLoop(ctx context.Context, detector *LANDetector) error {
   111  	log := fog.FromContext(ctx)
   112  
   113  	if err := detector.Detect(); err != nil {
   114  		log.Error(err, "failed to run initial cluster connectivity check")
   115  	}
   116  
   117  	log.Info("initial connection status", "connected", detector.canConnect)
   118  	wasConnected := detector.canConnect
   119  
   120  	for {
   121  		if err := interlock.UpdateInterlock(detector.canConnect, detector.isLOM); err != nil {
   122  			log.Error(err, "failed to update the Interlock API state")
   123  		}
   124  
   125  		time.Sleep(10 * time.Second)
   126  		if err := detector.Detect(); err != nil {
   127  			log.Error(err, "failed to run cluster connectivity check")
   128  			continue
   129  		}
   130  
   131  		// reset deadline on connection state change
   132  		if detector.canConnect != wasConnected {
   133  			log.Info("connection status changed", "connected", detector.canConnect)
   134  			wasConnected = detector.canConnect
   135  			detector.LOMReconciler.resetDeadline()
   136  		}
   137  
   138  		var actionTaken Action
   139  		var err error
   140  		if detector.canConnect {
   141  			actionTaken, err = onConnection(ctx, detector)
   142  			if err != nil {
   143  				log.Error(err, "failed to handle connected state")
   144  			}
   145  		} else {
   146  			actionTaken, err = noConnection(ctx, detector)
   147  			if err != nil {
   148  				log.Error(err, "failed to handle disconnected state")
   149  			}
   150  		}
   151  
   152  		// set longer deadlines and first boot flag file
   153  		// after very first action taken in LOD pod lifespan
   154  		if actionTaken != None {
   155  			detector.LOMReconciler.resetDeadline()
   156  			setFirstBoot(ctx, detector.GetFs())
   157  		}
   158  	}
   159  }
   160  
   161  // onConnection determines if the node needs to leave LAN Outage Mode
   162  // and executes the Leave action.
   163  // This depends on internal LANDetector state and a set of preconditions.
   164  func onConnection(ctx context.Context, detector *LANDetector) (Action, error) {
   165  	log := fog.FromContext(ctx)
   166  	// guard against leaving if node is not in LOM
   167  	if !detector.isLOM {
   168  		return Stay, nil
   169  	}
   170  
   171  	if detector.isPastLeaveDeadline() {
   172  		acquired, leaveErr := detector.WithLock(func() error {
   173  			return tryLeave(ctx, detector)
   174  		})
   175  
   176  		if !acquired {
   177  			log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
   178  			return None, nil
   179  		}
   180  
   181  		// check if any of the preconditions for leaving LOM were not met
   182  		if errors.Is(leaveErr, errPrecondition) {
   183  			return None, leaveErr
   184  		}
   185  
   186  		// apply cleanup for Leave action and exit
   187  		if leaveErr != nil {
   188  			log.Error(leaveErr, "there was an error in the leave LAN Outage Mode process")
   189  			log.Info("applying cleanup for leave, now entering...")
   190  
   191  			return Enter, leaveCleanup(ctx, detector)
   192  		}
   193  
   194  		return Leave, nil
   195  	}
   196  	return None, nil
   197  }
   198  
   199  // tryLeave is a wrapper function for the leaving LOM procedure.
   200  func tryLeave(ctx context.Context, detector *LANDetector) error {
   201  	required, err := detector.LeavePreconditionCheck(ctx)
   202  	if err != nil {
   203  		return fmt.Errorf("%w - failed to leave: %w", errPrecondition, err)
   204  	}
   205  
   206  	if required {
   207  		return detector.Leave(ctx)
   208  	}
   209  
   210  	return nil
   211  }
   212  
   213  // setFirstBoot creates the first boot flag file to indicate that the LOD has
   214  // started up for the first time.
   215  // TO REVIEW: currently doesn't return error as control flow doesn't depend on this
   216  func setFirstBoot(ctx context.Context, fs afero.Fs) {
   217  	log := fog.FromContext(ctx)
   218  	_, err := fs.Create(constants.FirstRunFlagFilepath)
   219  	if err != nil {
   220  		log.Error(err, "failed to create the first run flag file")
   221  	}
   222  }
   223  
   224  func leaveCleanup(ctx context.Context, detector *LANDetector) error {
   225  	log := fog.FromContext(ctx)
   226  	acquired, err := detector.WithLock(func() error {
   227  		return detector.Enter(ctx)
   228  	})
   229  
   230  	if !acquired {
   231  		log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
   232  	}
   233  
   234  	if err != nil {
   235  		return fmt.Errorf("failed to enter LAN Outage Mode in the leave fallback process: %w", err)
   236  	}
   237  	return nil
   238  }
   239  
   240  // noConnection determines if the node needs to Enter LAN Outage Mode
   241  // and executes the Enter action.
   242  // This depends on internal LANDetector state and first boot state.
   243  func noConnection(ctx context.Context, detector *LANDetector) (Action, error) {
   244  	log := fog.FromContext(ctx)
   245  	// guard against entering is in LOM
   246  	if detector.isLOM {
   247  		return Stay, nil
   248  	}
   249  
   250  	if detector.isPastEnterDeadline() {
   251  		acquired, enterErr := detector.WithLock(func() error {
   252  			return tryEnter(ctx, detector)
   253  		})
   254  
   255  		if !acquired {
   256  			log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
   257  			return None, nil
   258  		}
   259  
   260  		// check if any of the preconditions for entering LOM were not met
   261  		if errors.Is(enterErr, errPrecondition) {
   262  			return None, enterErr
   263  		}
   264  
   265  		// apply cleanup for Enter action
   266  		if enterErr != nil {
   267  			log.Error(enterErr, "there was an error in the enter LAN Outage Mode process")
   268  			log.Info("applying cleanup for enter, now leaving...")
   269  
   270  			return Leave, enterCleanup(ctx, detector)
   271  		}
   272  
   273  		return Enter, nil
   274  	}
   275  	return None, nil
   276  }
   277  
   278  // tryEnter is a wrapper function for the Enter LOM procedure.
   279  // calls EnterPreconditionCheck and returns if this fails
   280  func tryEnter(ctx context.Context, detector *LANDetector) error {
   281  	required, err := detector.EnterPreconditionCheck(ctx)
   282  	if err != nil {
   283  		return fmt.Errorf("%w - failed to enter: %w", errPrecondition, err)
   284  	}
   285  	if !required {
   286  		return nil
   287  	}
   288  	return detector.Enter(ctx)
   289  }
   290  
   291  // removeOldLANOutageDetector() removes the systemd version of the LOD if present
   292  func removeOldLANOutageDetector(ctx context.Context) error {
   293  	log := fog.FromContext(ctx)
   294  	conn, err := systemd.NewConnection(ctx)
   295  	if err != nil {
   296  		return fmt.Errorf("failed to establish a connection to systemd: %w", err)
   297  	}
   298  	defer conn.Close()
   299  
   300  	// if the service isn't active we don't need to do anything
   301  	active, err := serviceActive(ctx, conn, constants.LANOutageServiceName)
   302  	if err != nil || !active {
   303  		return err
   304  	}
   305  
   306  	log.Info("attempting to stop prexisting service", "service", constants.LANOutageServiceName)
   307  	// service exists - try to stop and remove it
   308  	if err := conn.Stop(ctx, constants.LANOutageServiceName, systemd.Replace, false); err != nil {
   309  		return err
   310  	}
   311  
   312  	// disable the service
   313  	log.Info("attempting to disable prexisting service", "service", constants.LANOutageServiceName)
   314  	return conn.Disable(ctx, []string{constants.LANOutageServiceName})
   315  }
   316  
   317  // func serviceActive() checks that the service exists and needs removing
   318  func serviceActive(ctx context.Context, conn systemd.StatusChecker, service string) (bool, error) {
   319  	log := fog.FromContext(ctx)
   320  	// check systemd for the service
   321  	exists, err := conn.Exists(ctx, service)
   322  	if err != nil {
   323  		return false, fmt.Errorf("failed to check for prexisting service: %w", err)
   324  	}
   325  	// systemd has no record of the service - just return
   326  	if !exists {
   327  		return false, nil
   328  	}
   329  	//  find ActiveState
   330  	state, err := conn.ActiveState(ctx, constants.LANOutageServiceName)
   331  	if err != nil {
   332  		return false, err
   333  	}
   334  	// service is found but inactive - return
   335  	if state == "inactive" {
   336  		return false, nil
   337  	}
   338  	log.Info("found an active service", "service", service)
   339  
   340  	return true, nil
   341  }
   342  
   343  // wrapper function for reversing an attempted Enter procedure
   344  // that has failed.
   345  func enterCleanup(ctx context.Context, detector *LANDetector) error {
   346  	log := fog.FromContext(ctx)
   347  	acquired, err := detector.WithLock(func() error {
   348  		return detector.Leave(ctx)
   349  	})
   350  
   351  	if !acquired {
   352  		log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
   353  	}
   354  	if err != nil {
   355  		return fmt.Errorf("failed to leave LAN Outage Mode in the enter fallback process: %w", err)
   356  	}
   357  
   358  	return nil
   359  }
   360  
   361  func isFirstBoot(fs afero.Fs) (bool, error) {
   362  	exists, err := afero.Exists(fs, constants.FirstRunFlagFilepath)
   363  	if err != nil {
   364  		return false, err
   365  	}
   366  	return !exists, nil
   367  }
   368  
   369  // forceClientHandler listens for connections on the unix socket and
   370  // dispatches them to the appropriate handler
   371  func forceClientHandler(ctx context.Context, reconciler *LOMReconciler, sh *SocketHandler) {
   372  	log := fog.FromContext(ctx)
   373  	log.Info("starting force client handler routine...")
   374  
   375  	for {
   376  		conn, err := sh.ReceiveConnection()
   377  		if err != nil {
   378  			log.Error(err, "failed to read unix socket")
   379  			continue
   380  		}
   381  
   382  		go handleSocketConnection(ctx, conn, reconciler)
   383  	}
   384  }
   385  
   386  // handleSocketConnection reads the message from the socket connection
   387  // and dispatches it to the appropriate handler
   388  func handleSocketConnection(ctx context.Context, conn *Conn, reconciler *LOMReconciler) {
   389  	log := fog.FromContext(ctx)
   390  
   391  	message, err := conn.Receive()
   392  	if err != nil {
   393  		log.Error(err, "failed to read message from socket connection")
   394  		if err := conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToReceiveMessage); err != nil {
   395  			log.Error(err, "failed to reply to socket request")
   396  		}
   397  		return
   398  	}
   399  
   400  	switch message {
   401  	case constants.EnterCommand:
   402  		if err := enterHandler(ctx, reconciler, conn); err != nil {
   403  			log.Error(err, "failed to process enter LAN Outage Mode request")
   404  		}
   405  	case constants.LeaveCommand:
   406  		if err := leaveHandler(ctx, reconciler, conn); err != nil {
   407  			log.Error(err, "failed to process leave LAN Outage Mode request")
   408  		}
   409  	default:
   410  		log.Info(constants.UnexpectedSocketCommandMessage)
   411  
   412  		if err := conn.Respond(constants.FailedToProcessRequestMessage, constants.UnexpectedSocketCommandMessage); err != nil {
   413  			log.Error(err, "failed to reply to socket request")
   414  		}
   415  	}
   416  }
   417  
   418  // enterHandler handles the helper force enter script by invoking
   419  // the LAN Outage Mode enter procedure and reporting the result back
   420  // to the script
   421  func enterHandler(ctx context.Context, reconciler *LOMReconciler, conn *Conn) error {
   422  	acquired, err := reconciler.WithLock(func() error {
   423  		if err := conn.Respond(constants.EnteringMessage, ""); err != nil {
   424  			return err
   425  		}
   426  
   427  		return reconciler.Enter(ctx)
   428  	})
   429  
   430  	if !acquired {
   431  		return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToAcquireLockMessage)
   432  	}
   433  	if err != nil {
   434  		fog.FromContext(ctx).Error(err, constants.FailedToEnterMessage)
   435  		return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToEnterMessage)
   436  	}
   437  
   438  	return conn.Respond(constants.EnterSuccessMessage, "")
   439  }
   440  
   441  // leaveHandler handles the helper force leave script by invoking
   442  // the LAN Outage Mode leave procedure and reporting the result back
   443  // to the script
   444  func leaveHandler(ctx context.Context, reconciler *LOMReconciler, conn *Conn) error {
   445  	acquired, err := reconciler.WithLock(func() error {
   446  		if err := conn.Respond(constants.LeavingMessage, ""); err != nil {
   447  			return err
   448  		}
   449  
   450  		return reconciler.Leave(ctx)
   451  	})
   452  
   453  	if !acquired {
   454  		return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToAcquireLockMessage)
   455  	}
   456  
   457  	if err != nil {
   458  		fog.FromContext(ctx).Error(err, constants.FailedToLeaveMessage)
   459  		return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToLeaveMessage)
   460  	}
   461  
   462  	return conn.Respond(constants.LeaveSuccessMessage, "")
   463  }
   464  

View as plain text