package controller import ( "context" "fmt" "os" "slices" "strings" "time" "github.com/go-logr/logr" "github.com/hashicorp/go-version" "github.com/spf13/afero" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/predicate" "edge-infra.dev/pkg/k8s/meta/status" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" kuberecorder "k8s.io/client-go/tools/record" "edge-infra.dev/pkg/k8s/runtime/events" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" controllerReconciler "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" patch "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/sds/patching/common" v1ienpatch "edge-infra.dev/pkg/sds/patching/k8s/apis/ienpatch/v1" patchMetrics "edge-infra.dev/pkg/sds/patching/k8s/controller/internal/metrics" patchManager "edge-infra.dev/pkg/sds/patching/patchmanager" ) var predicates = builder.WithPredicates(predicate.GenerationChangedPredicate{}) type PatchController struct { client.Client kuberecorder.EventRecorder K8sClient client.Client Log logr.Logger Name string RequeueTime time.Duration Conditions controllerReconciler.Conditions Metrics patchMetrics.Metrics } // Create new controller. Summarize conditions will be a list of all node hostnames. // NoCacheClient is for single requests to node list without watcher instead of default for controllers. func NewPatchController(mgr ctrl.Manager, log logr.Logger) *PatchController { noCacheClient, _ := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()}) k8sClient := mgr.GetClient() mgr.GetClient() hostname := os.Getenv("NODE_NAME") patchConditions := reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ hostname, }, Summarize: []string{}, NegativePolarity: []string{}, } metrics := patchMetrics.PatchingMetrics(mgr) eventRecorder := events.NewRecorder(mgr, ctrl.Log, "patchctl") return &PatchController{k8sClient, eventRecorder, noCacheClient, log, hostname, requeueTime, patchConditions, metrics} } func (c *PatchController) SetUpWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr).For(&v1ienpatch.IENPatch{}, predicates).Complete(c) } // Creates a new Summarizer and summarizes and patches the IENPatches object func (c *PatchController) Summarizer(ctx context.Context, ns types.NamespacedName, ienpatch *v1ienpatch.IENPatch, result controllerReconciler.Result, patchErr error) (res ctrl.Result, recErr error) { condition := conditions.Get(ienpatch, c.Name) if err := c.Client.Get(ctx, ns, ienpatch); err != nil { c.Log.Error(err, "error getting ienpatch object") } patcher := patch.NewSerialPatcher(ienpatch, c.Client) c.clearDeletedNodes(ienpatch) if condition != nil { conditions.Set(ienpatch, condition) } c.patchOtherNodes(ienpatch) s := controllerReconciler.NewSummarizer(patcher) res, recErr = s.SummarizeAndPatch(ctx, ienpatch, controllerReconciler.WithConditions(c.Conditions), controllerReconciler.WithResult(result), //controllerReconciler.WithEventRecorder(c.EventRecorder), using our own event recorder controllerReconciler.WithError(patchErr), controllerReconciler.WithIgnoreNotFound(), controllerReconciler.WithProcessors( controllerReconciler.RecordReconcileReq, controllerReconciler.RecordResult, ), controllerReconciler.WithFieldOwner(c.Name), ) if recErr != nil && strings.Contains(fmt.Sprint(recErr), "modified by a different process") { c.Log.Info("Conflict when patching Ready status, retrying") return c.Summarizer(ctx, ns, ienpatch, result, patchErr) } // custom event recorder // summarizeandpatch doesn't emit events if no error and not ready, // we want each node to report independently // Add this block in when edge-infra version reaches BWC //condition = conditions.Get(ienpatch, c.Name) //if condition.Reason != "Progressing" { // if recErr != nil { // c.EventRecorder.Eventf(ienpatch, corev1.EventTypeWarning, condition.Reason, condition.Message) // } else { // c.EventRecorder.Eventf(ienpatch, corev1.EventTypeNormal, condition.Reason, condition.Message) // } //} // Lock requeue time instead of exponential backoff // TODO Re-examine this if res.Requeue { res.RequeueAfter = c.RequeueTime } return res, nil } func (c *PatchController) clearDeletedNodes(ienpatch *v1ienpatch.IENPatch) { var oldNodes []string for _, node := range ienpatch.GetConditions() { // skip the summary condition if node.Type == "Ready" { continue } oldNodes = append(oldNodes, node.Type) } var deletedNodes []string for _, oldNode := range oldNodes { if slices.Contains(c.Conditions.Summarize, oldNode) { continue } deletedNodes = append(deletedNodes, oldNode) } for _, node := range deletedNodes { c.Log.Info(fmt.Sprintf("Removing Node %s from ienpatch", node)) conditions.Delete(ienpatch, node) } } func (c *PatchController) setPendingStatus(ctx context.Context, namespacedName types.NamespacedName, patchmanager patchManager.PatchManager, result reconcile.Result) (res ctrl.Result, recErr error) { // Patch CR to indicate patch has started conditions.MarkFalse(patchmanager.Ienpatch, c.Name, "Progressing", "Reconciling upgrade for %s to %s", c.Name, patchmanager.TargetVer) if err := c.overrideOlderStatus(ctx, patchmanager.Ienpatch, patchmanager.TargetVer); err != nil { c.Log.Error(err, "Unable to to override status") } return c.Summarizer(ctx, namespacedName, patchmanager.Ienpatch, result, nil) } // Reconciles on updates to Patchmanager objects func (c *PatchController) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { reconcileStart := time.Now() // get the reconciled object ienpatch := v1ienpatch.IENPatch{} if err := c.Client.Get(ctx, req.NamespacedName, &ienpatch); err != nil { return ctrl.Result{RequeueAfter: c.RequeueTime}, client.IgnoreNotFound(err) } if err := c.readNodeList(ctx); err != nil { return ctrl.Result{RequeueAfter: c.RequeueTime}, err } patchmanager, err := c.NewPatchManager(ctx, &ienpatch) if err != nil { return ctrl.Result{RequeueAfter: c.RequeueTime}, err } result := reconcile.ResultEmpty metrics := patchMetrics.CreateNewControllerMetrics(&ienpatch, c.Name, patchmanager.CurrentVer, patchmanager.TargetVer) defer func() { res, recErr = c.Summarizer(ctx, req.NamespacedName, &ienpatch, result, err) metrics.RecordMetrics(time.Since(reconcileStart).Seconds()) }() result, err = c.Patch(ctx, req, patchmanager, metrics) return res, recErr } func (c *PatchController) Patch(ctx context.Context, req ctrl.Request, patchmanager patchManager.PatchManager, metrics *patchMetrics.ControllerMetrics) (result controllerReconciler.Result, patchErr error) { status, err := patchmanager.PreChecks() if status == v1ienpatch.Pending { _, err := c.setPendingStatus(ctx, req.NamespacedName, patchmanager, result) if err != nil { c.Log.Error(err, "Failed to set pending status") } status, err = patchmanager.PatchIEN() //nolint:all } metrics.RecordReadiness() result = c.updateConditionsByStatus(status, patchmanager.Ienpatch, patchmanager.TargetVer) return result, err } func (c *PatchController) NewPatchManager(ctx context.Context, ienpatch *v1ienpatch.IENPatch) (patchmanager patchManager.PatchManager, err error) { osFs, _ := afero.NewOsFs().(afero.OsFs) //Temp version source instead of from CRD due to unpredictable deployment order targetVersion := os.Getenv("TARGET_VERSION") currentVersion, err := common.ReadCurrentVer() if err != nil { c.Log.Error(err, "error creating config struct") return } cfg, err := common.NewConfig() if err != nil { c.Log.Error(err, "error creating config struct") return } // Patch the current Node patchmanager = patchManager.PatchManager{ Ctx: ctx, K8sClient: c.K8sClient, Log: ctrl.LoggerFrom(ctx), HostName: c.Name, Ienpatch: ienpatch, CurrentVer: currentVersion, TargetVer: targetVersion, Fs: osFs, Cfg: cfg, } return } // Mark conditions for other nodes if their patchctl has not done it yet func (c *PatchController) patchOtherNodes(ienpatch *v1ienpatch.IENPatch) { for _, node := range c.Conditions.Summarize { if conditions.Get(ienpatch, node) == nil { conditions.MarkFalse(ienpatch, node, "Pending", "Pending patchctl pod to start on this node") } } } // update the state of conditions that are that have messages including outdated versions func (c *PatchController) overrideOlderStatus(ctx context.Context, ienpatch *v1ienpatch.IENPatch, targetVersion string) error { var changesMade = false patcher := patch.NewSerialPatcher(ienpatch, c.Client) for _, condition := range ienpatch.Status.Conditions { messageVersion := strings.Split(condition.Message, " ")[len(strings.Split(condition.Message, " "))-1] conditionVersionObj, err := version.NewVersion(messageVersion) if err != nil { c.Log.Error(err, fmt.Sprintf("Error parsing message version %s", messageVersion)) return err } targetVersionObj, err := version.NewVersion(targetVersion) if err != nil { c.Log.Error(err, fmt.Sprintf("Error parsing target version %s", targetVersion)) return err } if len(targetVersionObj.Prerelease()) != 0 { return nil } if conditionVersionObj.Core().LessThan(targetVersionObj.Core()) { conditions.MarkFalse(ienpatch, condition.Type, "Pending", "Pending patchctl pod to start for %s to %s", condition.Type, targetVersion) changesMade = true } } if !changesMade { return nil } if err := patcher.Patch(ctx, ienpatch); err != nil { c.Log.Error(err, "Failed to patch outdated status condition") return err } return nil } func (c *PatchController) readNodeList(ctx context.Context) error { // get the node list nodes := corev1.NodeList{} if err := c.K8sClient.List(ctx, &nodes); err != nil { return err } for _, node := range nodes.Items { c.Conditions.Summarize = append(c.Conditions.Summarize, node.Name) } return nil } func (c *PatchController) updateConditionsByStatus(status v1ienpatch.PatchStatus, ienpatch *v1ienpatch.IENPatch, targetVersion string) (results controllerReconciler.Result) { switch status { case v1ienpatch.Retry: conditions.MarkFalse(ienpatch, c.Name, "Failed", "Retrying upgrade for %s to %s", c.Name, targetVersion) return reconcile.ResultRequeue case v1ienpatch.Pending: conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Pending upgrade for %s to %s", c.Name, targetVersion) case v1ienpatch.DownloadComplete: conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Download complete for %s to %s", c.Name, targetVersion) case v1ienpatch.Reboot: conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Pending reboot for %s to %s", c.Name, targetVersion) case v1ienpatch.Failed: conditions.MarkFalse(ienpatch, c.Name, "Failed", "Failed upgrade for %s to %s", c.Name, targetVersion) case v1ienpatch.Success: conditions.MarkTrue(ienpatch, c.Name, "Successful", "Successful upgrade for %s to %s", c.Name, targetVersion) } return reconcile.ResultEmpty }