...

Source file src/edge-infra.dev/pkg/sds/patching/k8s/controller/controller.go

Documentation: edge-infra.dev/pkg/sds/patching/k8s/controller

     1  package controller
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"slices"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/go-logr/logr"
    12  	"github.com/hashicorp/go-version"
    13  	"github.com/spf13/afero"
    14  	ctrl "sigs.k8s.io/controller-runtime"
    15  	"sigs.k8s.io/controller-runtime/pkg/builder"
    16  	"sigs.k8s.io/controller-runtime/pkg/client"
    17  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    18  
    19  	"edge-infra.dev/pkg/k8s/meta/status"
    20  
    21  	corev1 "k8s.io/api/core/v1"
    22  	"k8s.io/apimachinery/pkg/types"
    23  
    24  	kuberecorder "k8s.io/client-go/tools/record"
    25  
    26  	"edge-infra.dev/pkg/k8s/runtime/events"
    27  
    28  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    29  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    30  	controllerReconciler "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    31  	patch "edge-infra.dev/pkg/k8s/runtime/patch"
    32  	"edge-infra.dev/pkg/sds/patching/common"
    33  	v1ienpatch "edge-infra.dev/pkg/sds/patching/k8s/apis/ienpatch/v1"
    34  	patchMetrics "edge-infra.dev/pkg/sds/patching/k8s/controller/internal/metrics"
    35  	patchManager "edge-infra.dev/pkg/sds/patching/patchmanager"
    36  )
    37  
    38  var predicates = builder.WithPredicates(predicate.GenerationChangedPredicate{})
    39  
    40  type PatchController struct {
    41  	client.Client
    42  	kuberecorder.EventRecorder
    43  	K8sClient   client.Client
    44  	Log         logr.Logger
    45  	Name        string
    46  	RequeueTime time.Duration
    47  	Conditions  controllerReconciler.Conditions
    48  	Metrics     patchMetrics.Metrics
    49  }
    50  
    51  // Create new controller. Summarize conditions will be a list of all node hostnames.
    52  // NoCacheClient is for single requests to node list without watcher instead of default for controllers.
    53  func NewPatchController(mgr ctrl.Manager, log logr.Logger) *PatchController {
    54  	noCacheClient, _ := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
    55  	k8sClient := mgr.GetClient()
    56  	mgr.GetClient()
    57  	hostname := os.Getenv("NODE_NAME")
    58  	patchConditions := reconcile.Conditions{
    59  		Target: status.ReadyCondition,
    60  		Owned: []string{
    61  			hostname,
    62  		},
    63  		Summarize:        []string{},
    64  		NegativePolarity: []string{},
    65  	}
    66  	metrics := patchMetrics.PatchingMetrics(mgr)
    67  
    68  	eventRecorder := events.NewRecorder(mgr, ctrl.Log, "patchctl")
    69  
    70  	return &PatchController{k8sClient, eventRecorder, noCacheClient, log, hostname, requeueTime, patchConditions, metrics}
    71  }
    72  
    73  func (c *PatchController) SetUpWithManager(mgr ctrl.Manager) error {
    74  	return ctrl.NewControllerManagedBy(mgr).For(&v1ienpatch.IENPatch{}, predicates).Complete(c)
    75  }
    76  
    77  // Creates a new Summarizer and summarizes and patches the IENPatches object
    78  func (c *PatchController) Summarizer(ctx context.Context, ns types.NamespacedName, ienpatch *v1ienpatch.IENPatch, result controllerReconciler.Result, patchErr error) (res ctrl.Result, recErr error) {
    79  	condition := conditions.Get(ienpatch, c.Name)
    80  	if err := c.Client.Get(ctx, ns, ienpatch); err != nil {
    81  		c.Log.Error(err, "error getting ienpatch object")
    82  	}
    83  	patcher := patch.NewSerialPatcher(ienpatch, c.Client)
    84  
    85  	c.clearDeletedNodes(ienpatch)
    86  
    87  	if condition != nil {
    88  		conditions.Set(ienpatch, condition)
    89  	}
    90  
    91  	c.patchOtherNodes(ienpatch)
    92  	s := controllerReconciler.NewSummarizer(patcher)
    93  	res, recErr = s.SummarizeAndPatch(ctx, ienpatch,
    94  		controllerReconciler.WithConditions(c.Conditions),
    95  		controllerReconciler.WithResult(result),
    96  		//controllerReconciler.WithEventRecorder(c.EventRecorder), using our own event recorder
    97  		controllerReconciler.WithError(patchErr),
    98  		controllerReconciler.WithIgnoreNotFound(),
    99  		controllerReconciler.WithProcessors(
   100  			controllerReconciler.RecordReconcileReq,
   101  			controllerReconciler.RecordResult,
   102  		),
   103  		controllerReconciler.WithFieldOwner(c.Name),
   104  	)
   105  	if recErr != nil && strings.Contains(fmt.Sprint(recErr), "modified by a different process") {
   106  		c.Log.Info("Conflict when patching Ready status, retrying")
   107  		return c.Summarizer(ctx, ns, ienpatch, result, patchErr)
   108  	}
   109  	// custom event recorder
   110  	// summarizeandpatch doesn't emit events if no error and not ready,
   111  	// we want each node to report independently
   112  	// Add this block in when edge-infra version reaches BWC
   113  	//condition = conditions.Get(ienpatch, c.Name)
   114  	//if condition.Reason != "Progressing" {
   115  	//	if recErr != nil {
   116  	//		c.EventRecorder.Eventf(ienpatch, corev1.EventTypeWarning, condition.Reason, condition.Message)
   117  	//	} else {
   118  	//		c.EventRecorder.Eventf(ienpatch, corev1.EventTypeNormal, condition.Reason, condition.Message)
   119  	//	}
   120  	//}
   121  
   122  	// Lock requeue time instead of exponential backoff
   123  	// TODO Re-examine this
   124  	if res.Requeue {
   125  		res.RequeueAfter = c.RequeueTime
   126  	}
   127  	return res, nil
   128  }
   129  
   130  func (c *PatchController) clearDeletedNodes(ienpatch *v1ienpatch.IENPatch) {
   131  	var oldNodes []string
   132  	for _, node := range ienpatch.GetConditions() {
   133  		// skip the summary condition
   134  		if node.Type == "Ready" {
   135  			continue
   136  		}
   137  		oldNodes = append(oldNodes, node.Type)
   138  	}
   139  
   140  	var deletedNodes []string
   141  	for _, oldNode := range oldNodes {
   142  		if slices.Contains(c.Conditions.Summarize, oldNode) {
   143  			continue
   144  		}
   145  		deletedNodes = append(deletedNodes, oldNode)
   146  	}
   147  
   148  	for _, node := range deletedNodes {
   149  		c.Log.Info(fmt.Sprintf("Removing Node %s from ienpatch", node))
   150  		conditions.Delete(ienpatch, node)
   151  	}
   152  }
   153  
   154  func (c *PatchController) setPendingStatus(ctx context.Context, namespacedName types.NamespacedName, patchmanager patchManager.PatchManager, result reconcile.Result) (res ctrl.Result, recErr error) {
   155  	// Patch CR to indicate patch has started
   156  	conditions.MarkFalse(patchmanager.Ienpatch, c.Name, "Progressing", "Reconciling upgrade for %s to %s", c.Name, patchmanager.TargetVer)
   157  
   158  	if err := c.overrideOlderStatus(ctx, patchmanager.Ienpatch, patchmanager.TargetVer); err != nil {
   159  		c.Log.Error(err, "Unable to to override status")
   160  	}
   161  
   162  	return c.Summarizer(ctx, namespacedName, patchmanager.Ienpatch, result, nil)
   163  }
   164  
   165  // Reconciles on updates to Patchmanager objects
   166  func (c *PatchController) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
   167  	reconcileStart := time.Now()
   168  
   169  	// get the reconciled object
   170  	ienpatch := v1ienpatch.IENPatch{}
   171  	if err := c.Client.Get(ctx, req.NamespacedName, &ienpatch); err != nil {
   172  		return ctrl.Result{RequeueAfter: c.RequeueTime}, client.IgnoreNotFound(err)
   173  	}
   174  
   175  	if err := c.readNodeList(ctx); err != nil {
   176  		return ctrl.Result{RequeueAfter: c.RequeueTime}, err
   177  	}
   178  
   179  	patchmanager, err := c.NewPatchManager(ctx, &ienpatch)
   180  	if err != nil {
   181  		return ctrl.Result{RequeueAfter: c.RequeueTime}, err
   182  	}
   183  
   184  	result := reconcile.ResultEmpty
   185  	metrics := patchMetrics.CreateNewControllerMetrics(&ienpatch, c.Name, patchmanager.CurrentVer, patchmanager.TargetVer)
   186  	defer func() {
   187  		res, recErr = c.Summarizer(ctx, req.NamespacedName, &ienpatch, result, err)
   188  		metrics.RecordMetrics(time.Since(reconcileStart).Seconds())
   189  	}()
   190  
   191  	result, err = c.Patch(ctx, req, patchmanager, metrics)
   192  
   193  	return res, recErr
   194  }
   195  
   196  func (c *PatchController) Patch(ctx context.Context, req ctrl.Request, patchmanager patchManager.PatchManager, metrics *patchMetrics.ControllerMetrics) (result controllerReconciler.Result, patchErr error) {
   197  	status, err := patchmanager.PreChecks()
   198  	if status == v1ienpatch.Pending {
   199  		_, err := c.setPendingStatus(ctx, req.NamespacedName, patchmanager, result)
   200  		if err != nil {
   201  			c.Log.Error(err, "Failed to set pending status")
   202  		}
   203  		status, err = patchmanager.PatchIEN() //nolint:all
   204  	}
   205  
   206  	metrics.RecordReadiness()
   207  
   208  	result = c.updateConditionsByStatus(status, patchmanager.Ienpatch, patchmanager.TargetVer)
   209  	return result, err
   210  }
   211  
   212  func (c *PatchController) NewPatchManager(ctx context.Context, ienpatch *v1ienpatch.IENPatch) (patchmanager patchManager.PatchManager, err error) {
   213  	osFs, _ := afero.NewOsFs().(afero.OsFs)
   214  
   215  	//Temp version source instead of from CRD due to unpredictable deployment order
   216  	targetVersion := os.Getenv("TARGET_VERSION")
   217  	currentVersion, err := common.ReadCurrentVer()
   218  	if err != nil {
   219  		c.Log.Error(err, "error creating config struct")
   220  		return
   221  	}
   222  
   223  	cfg, err := common.NewConfig()
   224  	if err != nil {
   225  		c.Log.Error(err, "error creating config struct")
   226  		return
   227  	}
   228  
   229  	// Patch the current Node
   230  	patchmanager = patchManager.PatchManager{
   231  		Ctx:        ctx,
   232  		K8sClient:  c.K8sClient,
   233  		Log:        ctrl.LoggerFrom(ctx),
   234  		HostName:   c.Name,
   235  		Ienpatch:   ienpatch,
   236  		CurrentVer: currentVersion,
   237  		TargetVer:  targetVersion,
   238  		Fs:         osFs,
   239  		Cfg:        cfg,
   240  	}
   241  	return
   242  }
   243  
   244  // Mark conditions for other nodes if their patchctl has not done it yet
   245  func (c *PatchController) patchOtherNodes(ienpatch *v1ienpatch.IENPatch) {
   246  	for _, node := range c.Conditions.Summarize {
   247  		if conditions.Get(ienpatch, node) == nil {
   248  			conditions.MarkFalse(ienpatch, node, "Pending", "Pending patchctl pod to start on this node")
   249  		}
   250  	}
   251  }
   252  
   253  // update the state of conditions that are that have messages including outdated versions
   254  func (c *PatchController) overrideOlderStatus(ctx context.Context, ienpatch *v1ienpatch.IENPatch, targetVersion string) error {
   255  	var changesMade = false
   256  	patcher := patch.NewSerialPatcher(ienpatch, c.Client)
   257  
   258  	for _, condition := range ienpatch.Status.Conditions {
   259  		messageVersion := strings.Split(condition.Message, " ")[len(strings.Split(condition.Message, " "))-1]
   260  
   261  		conditionVersionObj, err := version.NewVersion(messageVersion)
   262  		if err != nil {
   263  			c.Log.Error(err, fmt.Sprintf("Error parsing message version %s", messageVersion))
   264  			return err
   265  		}
   266  		targetVersionObj, err := version.NewVersion(targetVersion)
   267  		if err != nil {
   268  			c.Log.Error(err, fmt.Sprintf("Error parsing target version %s", targetVersion))
   269  			return err
   270  		}
   271  
   272  		if len(targetVersionObj.Prerelease()) != 0 {
   273  			return nil
   274  		}
   275  
   276  		if conditionVersionObj.Core().LessThan(targetVersionObj.Core()) {
   277  			conditions.MarkFalse(ienpatch, condition.Type, "Pending", "Pending patchctl pod to start for %s to %s", condition.Type, targetVersion)
   278  			changesMade = true
   279  		}
   280  	}
   281  
   282  	if !changesMade {
   283  		return nil
   284  	}
   285  
   286  	if err := patcher.Patch(ctx, ienpatch); err != nil {
   287  		c.Log.Error(err, "Failed to patch outdated status condition")
   288  		return err
   289  	}
   290  	return nil
   291  }
   292  
   293  func (c *PatchController) readNodeList(ctx context.Context) error {
   294  	// get the node list
   295  	nodes := corev1.NodeList{}
   296  	if err := c.K8sClient.List(ctx, &nodes); err != nil {
   297  		return err
   298  	}
   299  	for _, node := range nodes.Items {
   300  		c.Conditions.Summarize = append(c.Conditions.Summarize, node.Name)
   301  	}
   302  	return nil
   303  }
   304  
   305  func (c *PatchController) updateConditionsByStatus(status v1ienpatch.PatchStatus, ienpatch *v1ienpatch.IENPatch, targetVersion string) (results controllerReconciler.Result) {
   306  	switch status {
   307  	case v1ienpatch.Retry:
   308  		conditions.MarkFalse(ienpatch, c.Name, "Failed", "Retrying upgrade for %s to %s", c.Name, targetVersion)
   309  		return reconcile.ResultRequeue
   310  	case v1ienpatch.Pending:
   311  		conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Pending upgrade for %s to %s", c.Name, targetVersion)
   312  	case v1ienpatch.DownloadComplete:
   313  		conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Download complete for %s to %s", c.Name, targetVersion)
   314  	case v1ienpatch.Reboot:
   315  		conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Pending reboot for %s to %s", c.Name, targetVersion)
   316  	case v1ienpatch.Failed:
   317  		conditions.MarkFalse(ienpatch, c.Name, "Failed", "Failed upgrade for %s to %s", c.Name, targetVersion)
   318  	case v1ienpatch.Success:
   319  		conditions.MarkTrue(ienpatch, c.Name, "Successful", "Successful upgrade for %s to %s", c.Name, targetVersion)
   320  	}
   321  
   322  	return reconcile.ResultEmpty
   323  }
   324  

View as plain text