...

Source file src/edge-infra.dev/pkg/edge/controllers/envctl/persistence_controller.go

Documentation: edge-infra.dev/pkg/edge/controllers/envctl

     1  package envctl
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/json"
     7  	"fmt"
     8  	"reflect"
     9  	"time"
    10  
    11  	"github.com/fluxcd/pkg/ssa"
    12  	"github.com/go-logr/logr"
    13  	v1 "k8s.io/api/apps/v1"
    14  	corev1 "k8s.io/api/core/v1"
    15  	"k8s.io/apimachinery/pkg/api/errors"
    16  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    17  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    18  	"k8s.io/apimachinery/pkg/labels"
    19  	"k8s.io/apimachinery/pkg/selection"
    20  	"k8s.io/apimachinery/pkg/types"
    21  	kuberecorder "k8s.io/client-go/tools/record"
    22  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
    23  	ctrl "sigs.k8s.io/controller-runtime"
    24  	"sigs.k8s.io/controller-runtime/pkg/builder"
    25  	"sigs.k8s.io/controller-runtime/pkg/client"
    26  	"sigs.k8s.io/controller-runtime/pkg/event"
    27  	"sigs.k8s.io/controller-runtime/pkg/handler"
    28  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    29  	ctrlReconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
    30  
    31  	persistenceApi "edge-infra.dev/pkg/edge/apis/persistence/v1alpha1"
    32  	"edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils"
    33  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    34  	"edge-infra.dev/pkg/k8s/meta/status"
    35  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    36  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    37  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    38  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    39  	"edge-infra.dev/pkg/k8s/runtime/inventory"
    40  	"edge-infra.dev/pkg/k8s/runtime/patch"
    41  	unstructuredutil "edge-infra.dev/pkg/k8s/unstructured"
    42  	nodemeta "edge-infra.dev/pkg/sds/ien/node"
    43  )
    44  
    45  var (
    46  	customNodeLabel    = "node.ncr.com/"
    47  	oldCustomNodeLabel = "edge.node.com/"
    48  
    49  	// persistenceConditions is the reconcile summarization configuration for how
    50  	// various conditions should be taken into account when the final condition is
    51  	// summarized
    52  	persistenceConditions = reconcile.Conditions{
    53  		Target: status.ReadyCondition,
    54  		Owned: []string{
    55  			status.ReadyCondition,
    56  			status.ReconcilingCondition,
    57  			status.StalledCondition,
    58  		},
    59  		Summarize: []string{
    60  			status.StalledCondition,
    61  		},
    62  		NegativePolarity: []string{
    63  			status.ReconcilingCondition,
    64  			status.StalledCondition,
    65  		},
    66  	}
    67  )
    68  
    69  // PersistenceReconciler reconciles a Persistence object
    70  type PersistenceReconciler struct {
    71  	client.Client
    72  	kuberecorder.EventRecorder
    73  	ResourceManager *ssa.ResourceManager
    74  	Name            string
    75  	// Metrics records condition, duration, and suspension metrics by default
    76  	Metrics    metrics.Metrics
    77  	Conditions reconcile.Conditions
    78  }
    79  
    80  // reconcilerPredicate will filter out events for the persistence controller
    81  func reconcilerPredicate() predicate.Predicate {
    82  	return predicate.Funcs{
    83  		UpdateFunc: func(e event.UpdateEvent) bool {
    84  			return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
    85  		},
    86  		CreateFunc: func(_ event.CreateEvent) bool {
    87  			return true
    88  		},
    89  		DeleteFunc: func(_ event.DeleteEvent) bool {
    90  			return false
    91  		},
    92  		GenericFunc: func(_ event.GenericEvent) bool {
    93  			return true
    94  		},
    95  	}
    96  }
    97  
    98  // nodeReconcilerPredicate will cause persistences to be enqueued if a node is add, removed,or labels are updated
    99  func nodeReconcilerPredicate() predicate.Predicate {
   100  	return predicate.Funcs{
   101  		UpdateFunc: func(e event.UpdateEvent) bool {
   102  			//todo refine this filter
   103  			sh := !reflect.DeepEqual(e.ObjectNew.GetLabels(), e.ObjectOld.GetLabels())
   104  			return sh
   105  		},
   106  		CreateFunc: func(_ event.CreateEvent) bool {
   107  			return true
   108  		},
   109  		DeleteFunc: func(_ event.DeleteEvent) bool {
   110  			return true
   111  		},
   112  	}
   113  }
   114  
   115  // SetupWithManager sets up the controller with the Manager.
   116  func (r *PersistenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
   117  	return ctrl.NewControllerManagedBy(mgr).
   118  		For(&persistenceApi.Persistence{}, builder.WithPredicates(reconcilerPredicate())).
   119  		Watches(
   120  			&corev1.Node{},
   121  			handler.EnqueueRequestsFromMapFunc(r.getPersistenceToEnque),
   122  			builder.WithPredicates(nodeReconcilerPredicate()),
   123  		).
   124  		Owns(&v1.StatefulSet{}).
   125  		Complete(r)
   126  }
   127  
   128  func (r *PersistenceReconciler) PatchOpts() []patch.Option {
   129  	return []patch.Option{
   130  		patch.WithOwnedConditions{Conditions: r.Conditions.Owned},
   131  		patch.WithFieldOwner(r.Name),
   132  	}
   133  }
   134  
   135  // +kubebuilder:rbac:groups="",resources=configmaps,verbs=create;get;list;watch;update;patch
   136  // +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get;update;patch
   137  // +kubebuilder:rbac:groups="",resources=namespaces,verbs=create;get;list;watch;update;patch
   138  // +kubebuilder:rbac:groups="",resources=namespaces/status,verbs=get;update;patch
   139  // +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update
   140  // +kubebuilder:rbac:groups="",resources=nodes/status,verbs=get
   141  // +kubebuilder:rbac:groups=edge.ncr.com,resources=persistence,verbs=create;get;list;update;patch;watch
   142  // +kubebuilder:rbac:groups=edge.ncr.com,resources=persistence/status,verbs=get;update;patch
   143  // +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=create;get;list;update;patch;watch;delete
   144  // +kubebuilder:rbac:groups="apps",resources=statefulsets/status,verbs=get
   145  // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
   146  // +kubebuilder:rbac:groups="",resources=pods;persistentvolumeclaims,verbs=get;list;watch
   147  // +kubebuilder:rbac:groups="",resources=pods/status;persistentvolumeclaims/status,verbs=get;watch
   148  
   149  func (r *PersistenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
   150  	var (
   151  		reconcileStart = time.Now()
   152  		log            = ctrl.LoggerFrom(ctx).WithName(r.Name)
   153  		result         = reconcile.ResultEmpty
   154  		pers           = &persistenceApi.Persistence{}
   155  	)
   156  	r.setResourceManager()
   157  
   158  	if err := r.Client.Get(ctx, req.NamespacedName, pers); err != nil {
   159  		return ctrl.Result{}, client.IgnoreNotFound(err)
   160  	}
   161  
   162  	oldStatus := pers.Status.DeepCopy()
   163  	patcher := patch.NewSerialPatcher(pers, r.Client)
   164  
   165  	defer func() {
   166  		if reconcileErr, ok := recErr.(recerr.Error); ok {
   167  			reconcileErr.ToCondition(pers, status.ReadyCondition)
   168  		}
   169  
   170  		summarizer := reconcile.NewSummarizer(patcher)
   171  		res, recErr = summarizer.SummarizeAndPatch(ctx, pers, []reconcile.SummarizeOption{
   172  			reconcile.WithConditions(r.Conditions),
   173  			reconcile.WithResult(result),
   174  			reconcile.WithError(recErr),
   175  			reconcile.WithIgnoreNotFound(),
   176  			reconcile.WithProcessors(
   177  				reconcile.RecordReconcileReq,
   178  				reconcile.RecordResult,
   179  			),
   180  			reconcile.WithFieldOwner(r.Name),
   181  			reconcile.WithEventRecorder(r.EventRecorder),
   182  		}...)
   183  
   184  		r.Metrics.RecordDuration(ctx, pers, reconcileStart)
   185  		r.Metrics.RecordReadiness(ctx, pers)
   186  		r.Metrics.RecordReconciling(ctx, pers)
   187  	}()
   188  
   189  	log.Info("reconciling started for persistence")
   190  
   191  	if err := reconcile.Progressing(ctx, pers, patcher, r.PatchOpts()...); err != nil {
   192  		recErr = recerr.New(err, persistenceApi.ReconcileFailedReason)
   193  		return
   194  	}
   195  
   196  	// Get all nodes
   197  	allNodes := &corev1.NodeList{}
   198  	if err := r.Client.List(ctx, allNodes); client.IgnoreNotFound(err) != nil {
   199  		log.Error(err, "failed to get nodes")
   200  		recErr = recerr.New(err, persistenceApi.UnableToGetNodeReason)
   201  		return
   202  	}
   203  	allNodesMap := make(map[string]bool, 0)
   204  
   205  	for _, node := range allNodes.Items {
   206  		allNodesMap[node.Name] = false
   207  	}
   208  
   209  	// Get nodes, loop through, create ns and ss per node
   210  	filters := convertNodeSelectorToLabelSelector(pers.Spec.NodeSelectorTerms, log)
   211  	selectedNodes := &corev1.NodeList{}
   212  	if err := r.Client.List(ctx, selectedNodes, &client.ListOptions{LabelSelector: filters}); client.IgnoreNotFound(err) != nil {
   213  		log.Error(err, "failed to get nodes")
   214  		recErr = recerr.New(err, persistenceApi.UnableToGetNodeReason)
   215  		return
   216  	}
   217  	for _, node := range selectedNodes.Items {
   218  		allNodesMap[node.Name] = true
   219  	}
   220  
   221  	// pvcs that exist for statefulsets that we created.
   222  	pvcs := &corev1.PersistentVolumeClaimList{}
   223  	exists, err := labels.NewRequirement(persistenceApi.InstanceLabel, selection.Exists, nil)
   224  	if err != nil {
   225  		recErr = recerr.NewStalled(err, persistenceApi.InvalidLabelReason)
   226  		return
   227  	}
   228  	err = r.Client.List(ctx, pvcs, &client.ListOptions{
   229  		LabelSelector: labels.NewSelector().Add(*exists),
   230  		Namespace:     pers.Namespace,
   231  	})
   232  	if err != nil {
   233  		recErr = recerr.New(err, persistenceApi.UnableToGetPVCReason)
   234  		return
   235  	}
   236  	persistenceSts := map[string]struct{}{} // statefulsets created by this controller
   237  	for _, pvc := range pvcs.Items {
   238  		persistenceSts[pvc.Labels[persistenceApi.InstanceLabel]] = struct{}{}
   239  	}
   240  
   241  	var unstructuredObjs []*unstructured.Unstructured
   242  
   243  	for _, node := range allNodes.Items {
   244  		// Add ss
   245  		oldSts := &v1.StatefulSet{}
   246  
   247  		// Get name for StatefulSet
   248  		instanceName := instanceName(node, pers.Spec.StatefulSet.GetName(), persistenceSts)
   249  
   250  		if err := validPVCName(pers, instanceName); err != nil {
   251  			recErr = recerr.NewStalled(err, persistenceApi.InvalidLabelReason)
   252  			return
   253  		}
   254  
   255  		// if node is not selected to get a pod check if one exists and delete if there is
   256  		if !allNodesMap[node.Name] {
   257  			err := r.Client.Get(ctx, types.NamespacedName{Name: instanceName, Namespace: pers.Namespace}, oldSts)
   258  			if errors.IsNotFound(err) {
   259  				continue
   260  			}
   261  			if err != nil {
   262  				recErr = recerr.New(fmt.Errorf("failed to check for existing statefulset: %w", err), persistenceApi.UnableToDeleteSS)
   263  				return
   264  			}
   265  			if err = r.Client.Delete(ctx, oldSts); err != nil {
   266  				recErr = recerr.New(fmt.Errorf("failed to delete existing statefulset: %w", err), persistenceApi.UnableToDeleteSS)
   267  				return
   268  			}
   269  			continue
   270  		}
   271  
   272  		ss := &v1.StatefulSet{}
   273  		// If nameSubstitution is provided, replace any names matching nameSubstitution in the statefulSet with instanceName
   274  		if pers.Spec.NameSubstitution != nil {
   275  			ssBytes, err := json.Marshal(pers.Spec.StatefulSet)
   276  			if err != nil {
   277  				recErr = recerr.New(fmt.Errorf("failed to marshal statefulSet: %w", err), persistenceApi.NameSubstitutionFailedReason)
   278  				return
   279  			}
   280  
   281  			data := bytes.Replace(ssBytes, []byte(*pers.Spec.NameSubstitution), []byte(instanceName), -1)
   282  
   283  			err = json.Unmarshal(data, ss)
   284  			if err != nil {
   285  				recErr = recerr.New(fmt.Errorf("failed to unmarshal data to statefulSet: %w", err), persistenceApi.NameSubstitutionFailedReason)
   286  				return
   287  			}
   288  		} else {
   289  			ss = pers.Spec.StatefulSet.DeepCopy()
   290  		}
   291  
   292  		// Initialize custom ss label
   293  		customNodeLabelKey := customNodeLabel + ss.Name
   294  		oldCustomNodeLabelKey := oldCustomNodeLabel + ss.Name
   295  
   296  		node.Labels[customNodeLabelKey] = instanceName
   297  
   298  		// TODO: remove once all nodes are migrated to new label
   299  		node.Labels[oldCustomNodeLabelKey] = instanceName
   300  
   301  		updatedNode := node
   302  
   303  		err := r.Client.Update(ctx, &updatedNode)
   304  		if err != nil {
   305  			recErr = recerr.New(fmt.Errorf("failed to update node label: %w", err), persistenceApi.UpdateNodeLabelFailedReason)
   306  			return
   307  		}
   308  
   309  		ss.Name = instanceName
   310  		ss.Namespace = pers.Namespace
   311  
   312  		// Add new label to the pod template
   313  		ss.Spec.Template.ObjectMeta.Labels[persistenceApi.InstanceLabel] = instanceName
   314  		// Update the ss pod selector
   315  		ss.Spec.Selector.MatchLabels[persistenceApi.InstanceLabel] = instanceName
   316  
   317  		// Set owner ref
   318  		ss.ObjectMeta.OwnerReferences = r.ownerRef(pers)
   319  
   320  		// Add node affinity for node we're installing to
   321  		ss.Spec.Template.Spec.Affinity = &corev1.Affinity{
   322  			NodeAffinity: &corev1.NodeAffinity{
   323  				RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
   324  					NodeSelectorTerms: []corev1.NodeSelectorTerm{
   325  						{MatchExpressions: []corev1.NodeSelectorRequirement{
   326  							{Key: customNodeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{instanceName}},
   327  							{Key: oldCustomNodeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{instanceName}}}}}},
   328  			},
   329  		}
   330  		log.Info("creating stateful set for lane", "statefuleset name", ss.Name, "lane or node name", instanceName)
   331  
   332  		uobj, err := unstructuredutil.ToUnstructured(ss)
   333  		if err != nil {
   334  			recErr = recerr.New(fmt.Errorf("failed to convert %s/%s/%s to unstructured: %w", uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), persistenceApi.ApplyFailedReason)
   335  			return
   336  		}
   337  		unstructuredObjs = append(unstructuredObjs, uobj)
   338  	}
   339  
   340  	// Workaround for server side apply manifests failing on clusters with kuberenetes < 1.22
   341  	if err := ssa.SetNativeKindsDefaults(unstructuredObjs); err != nil {
   342  		recErr = recerr.New(err, persistenceApi.ApplyFailedReason)
   343  		result = reconcile.ResultRequeue
   344  		return
   345  	}
   346  
   347  	changeSet, err := r.ResourceManager.ApplyAll(ctx, unstructuredObjs, ssa.ApplyOptions{Force: false})
   348  	if err != nil {
   349  		recErr = recerr.New(fmt.Errorf("failed to apply resources: %w", err), persistenceApi.ApplyFailedReason)
   350  		return
   351  	}
   352  	pers.Status.Inventory = inventory.New(inventory.FromChangeSet(changeSet))
   353  
   354  	log.Info("stateful sets created", "changeset", changeSet)
   355  
   356  	if oldStatus.Inventory != nil {
   357  		diff, err := inventory.Diff(oldStatus.Inventory, pers.Status.Inventory)
   358  		if err != nil {
   359  			recErr = recerr.New(err, persistenceApi.PruneFailedReason)
   360  			return
   361  		}
   362  		if len(diff) > 0 {
   363  			opt := ssa.DefaultDeleteOptions()
   364  			opt.Exclusions = map[string]string{couchdb.SubstitutionLabel: couchdb.LabelValueTrue}
   365  			changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt)
   366  			if err != nil {
   367  				recErr = recerr.New(err, persistenceApi.PruneFailedReason)
   368  				return
   369  			}
   370  			log.Info("pruned objects", "changeset", changeSet)
   371  		}
   372  	}
   373  	log.Info("persistence reconciled successfully")
   374  
   375  	conditions.MarkTrue(pers, status.ReadyCondition, persistenceApi.ApplySuccessReason, "successfully applied")
   376  	result = reconcile.ResultSuccess
   377  	return
   378  }
   379  
   380  func validPVCName(pers *persistenceApi.Persistence, instanceName string) error {
   381  	for _, claim := range pers.Spec.StatefulSet.Spec.VolumeClaimTemplates {
   382  		pvcName := fmt.Sprintf("%s-%s-0", claim.Name, instanceName)
   383  		if len(pvcName) > 63 {
   384  			return fmt.Errorf("pvc name %s is too long", pvcName)
   385  		}
   386  	}
   387  	return nil
   388  }
   389  
   390  func (r *PersistenceReconciler) ownerRef(p *persistenceApi.Persistence) []metav1.OwnerReference {
   391  	return []metav1.OwnerReference{
   392  		*metav1.NewControllerRef(
   393  			p,
   394  			persistenceApi.GroupVersion.WithKind(persistenceApi.Kind),
   395  		),
   396  	}
   397  }
   398  
   399  // instanceName if pvc does not exist, create a new instance name
   400  func instanceName(node corev1.Node, stsName string, persistenceSts map[string]struct{}) string {
   401  	instanceName := nameutils.CreateReplicatedStatefulsetName(node, stsName)
   402  	if _, ok := persistenceSts[instanceName]; ok {
   403  		return instanceName
   404  	}
   405  	return nameutils.StatefulSetNodeName(node, stsName)
   406  }
   407  
   408  func createDefaultLabelFilter(log logr.Logger) labels.Selector {
   409  	nodeFilter, err := labels.NewRequirement(nodemeta.ClassLabel, selection.In, []string{persistenceApi.TouchpointLabel})
   410  	if err != nil {
   411  		log.Error(err, "unable to convert node selector into label selector")
   412  		return labels.Everything()
   413  	}
   414  	return labels.NewSelector().Add(*nodeFilter)
   415  }
   416  
   417  func convertNodeSelectorToLabelSelector(selectors []corev1.NodeSelectorTerm, log logr.Logger) labels.Selector {
   418  	if selectors == nil {
   419  		return createDefaultLabelFilter(log)
   420  	}
   421  	reqs := labels.Requirements{}
   422  	for _, term := range selectors {
   423  		for _, expression := range term.MatchExpressions {
   424  			nodeFilter, err := labels.NewRequirement(expression.Key, convertOperator(expression.Operator), expression.Values)
   425  			if err != nil {
   426  				log.Error(err, "unable to convert node selector into label selector")
   427  			} else {
   428  				reqs = append(reqs, *nodeFilter)
   429  			}
   430  		}
   431  	}
   432  	return labels.NewSelector().Add(reqs...)
   433  }
   434  
   435  func convertOperator(sop corev1.NodeSelectorOperator) selection.Operator {
   436  	switch sop {
   437  	case corev1.NodeSelectorOpIn:
   438  		return selection.In
   439  	case corev1.NodeSelectorOpNotIn:
   440  		return selection.NotIn
   441  	case corev1.NodeSelectorOpExists:
   442  		return selection.Exists
   443  	case corev1.NodeSelectorOpDoesNotExist:
   444  		return selection.DoesNotExist
   445  	case corev1.NodeSelectorOpGt:
   446  		return selection.GreaterThan
   447  	case corev1.NodeSelectorOpLt:
   448  		return selection.LessThan
   449  	default:
   450  		return selection.In
   451  	}
   452  }
   453  
   454  func (r *PersistenceReconciler) setResourceManager() {
   455  	if r.ResourceManager == nil {
   456  		mgr := ssa.NewResourceManager(
   457  			r.Client,
   458  			// be sure to consistently communicate this controllers ownership of objects
   459  			// this should match the result of createOpts()
   460  			polling.NewStatusPoller(r.Client, r.Client.RESTMapper(), polling.Options{}), ssa.Owner{Field: r.Name},
   461  		)
   462  		r.ResourceManager = mgr
   463  	}
   464  }
   465  
   466  // getPersistenceToEnque will create the list of persistences to reconcile when nodes are updated
   467  func (r *PersistenceReconciler) getPersistenceToEnque(ctx context.Context, _ client.Object) []ctrlReconcile.Request {
   468  	persList := &persistenceApi.PersistenceList{}
   469  	if err := r.Client.List(ctx, persList); client.IgnoreNotFound(err) != nil {
   470  		return nil
   471  	}
   472  
   473  	var persRequests []ctrlReconcile.Request
   474  	for _, pers := range persList.Items {
   475  		persRequests = append(persRequests, ctrlReconcile.Request{NamespacedName: types.NamespacedName{Name: pers.Name, Namespace: pers.Namespace}})
   476  	}
   477  	return persRequests
   478  }
   479  

View as plain text