1 package envctl
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "reflect"
9 "time"
10
11 "github.com/fluxcd/pkg/ssa"
12 "github.com/go-logr/logr"
13 v1 "k8s.io/api/apps/v1"
14 corev1 "k8s.io/api/core/v1"
15 "k8s.io/apimachinery/pkg/api/errors"
16 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
18 "k8s.io/apimachinery/pkg/labels"
19 "k8s.io/apimachinery/pkg/selection"
20 "k8s.io/apimachinery/pkg/types"
21 kuberecorder "k8s.io/client-go/tools/record"
22 "sigs.k8s.io/cli-utils/pkg/kstatus/polling"
23 ctrl "sigs.k8s.io/controller-runtime"
24 "sigs.k8s.io/controller-runtime/pkg/builder"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26 "sigs.k8s.io/controller-runtime/pkg/event"
27 "sigs.k8s.io/controller-runtime/pkg/handler"
28 "sigs.k8s.io/controller-runtime/pkg/predicate"
29 ctrlReconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
30
31 persistenceApi "edge-infra.dev/pkg/edge/apis/persistence/v1alpha1"
32 "edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils"
33 "edge-infra.dev/pkg/edge/datasync/couchdb"
34 "edge-infra.dev/pkg/k8s/meta/status"
35 "edge-infra.dev/pkg/k8s/runtime/conditions"
36 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
37 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
38 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
39 "edge-infra.dev/pkg/k8s/runtime/inventory"
40 "edge-infra.dev/pkg/k8s/runtime/patch"
41 unstructuredutil "edge-infra.dev/pkg/k8s/unstructured"
42 nodemeta "edge-infra.dev/pkg/sds/ien/node"
43 )
44
45 var (
46 customNodeLabel = "node.ncr.com/"
47 oldCustomNodeLabel = "edge.node.com/"
48
49
50
51
52 persistenceConditions = reconcile.Conditions{
53 Target: status.ReadyCondition,
54 Owned: []string{
55 status.ReadyCondition,
56 status.ReconcilingCondition,
57 status.StalledCondition,
58 },
59 Summarize: []string{
60 status.StalledCondition,
61 },
62 NegativePolarity: []string{
63 status.ReconcilingCondition,
64 status.StalledCondition,
65 },
66 }
67 )
68
69
70 type PersistenceReconciler struct {
71 client.Client
72 kuberecorder.EventRecorder
73 ResourceManager *ssa.ResourceManager
74 Name string
75
76 Metrics metrics.Metrics
77 Conditions reconcile.Conditions
78 }
79
80
81 func reconcilerPredicate() predicate.Predicate {
82 return predicate.Funcs{
83 UpdateFunc: func(e event.UpdateEvent) bool {
84 return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
85 },
86 CreateFunc: func(_ event.CreateEvent) bool {
87 return true
88 },
89 DeleteFunc: func(_ event.DeleteEvent) bool {
90 return false
91 },
92 GenericFunc: func(_ event.GenericEvent) bool {
93 return true
94 },
95 }
96 }
97
98
99 func nodeReconcilerPredicate() predicate.Predicate {
100 return predicate.Funcs{
101 UpdateFunc: func(e event.UpdateEvent) bool {
102
103 sh := !reflect.DeepEqual(e.ObjectNew.GetLabels(), e.ObjectOld.GetLabels())
104 return sh
105 },
106 CreateFunc: func(_ event.CreateEvent) bool {
107 return true
108 },
109 DeleteFunc: func(_ event.DeleteEvent) bool {
110 return true
111 },
112 }
113 }
114
115
116 func (r *PersistenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
117 return ctrl.NewControllerManagedBy(mgr).
118 For(&persistenceApi.Persistence{}, builder.WithPredicates(reconcilerPredicate())).
119 Watches(
120 &corev1.Node{},
121 handler.EnqueueRequestsFromMapFunc(r.getPersistenceToEnque),
122 builder.WithPredicates(nodeReconcilerPredicate()),
123 ).
124 Owns(&v1.StatefulSet{}).
125 Complete(r)
126 }
127
128 func (r *PersistenceReconciler) PatchOpts() []patch.Option {
129 return []patch.Option{
130 patch.WithOwnedConditions{Conditions: r.Conditions.Owned},
131 patch.WithFieldOwner(r.Name),
132 }
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 func (r *PersistenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
150 var (
151 reconcileStart = time.Now()
152 log = ctrl.LoggerFrom(ctx).WithName(r.Name)
153 result = reconcile.ResultEmpty
154 pers = &persistenceApi.Persistence{}
155 )
156 r.setResourceManager()
157
158 if err := r.Client.Get(ctx, req.NamespacedName, pers); err != nil {
159 return ctrl.Result{}, client.IgnoreNotFound(err)
160 }
161
162 oldStatus := pers.Status.DeepCopy()
163 patcher := patch.NewSerialPatcher(pers, r.Client)
164
165 defer func() {
166 if reconcileErr, ok := recErr.(recerr.Error); ok {
167 reconcileErr.ToCondition(pers, status.ReadyCondition)
168 }
169
170 summarizer := reconcile.NewSummarizer(patcher)
171 res, recErr = summarizer.SummarizeAndPatch(ctx, pers, []reconcile.SummarizeOption{
172 reconcile.WithConditions(r.Conditions),
173 reconcile.WithResult(result),
174 reconcile.WithError(recErr),
175 reconcile.WithIgnoreNotFound(),
176 reconcile.WithProcessors(
177 reconcile.RecordReconcileReq,
178 reconcile.RecordResult,
179 ),
180 reconcile.WithFieldOwner(r.Name),
181 reconcile.WithEventRecorder(r.EventRecorder),
182 }...)
183
184 r.Metrics.RecordDuration(ctx, pers, reconcileStart)
185 r.Metrics.RecordReadiness(ctx, pers)
186 r.Metrics.RecordReconciling(ctx, pers)
187 }()
188
189 log.Info("reconciling started for persistence")
190
191 if err := reconcile.Progressing(ctx, pers, patcher, r.PatchOpts()...); err != nil {
192 recErr = recerr.New(err, persistenceApi.ReconcileFailedReason)
193 return
194 }
195
196
197 allNodes := &corev1.NodeList{}
198 if err := r.Client.List(ctx, allNodes); client.IgnoreNotFound(err) != nil {
199 log.Error(err, "failed to get nodes")
200 recErr = recerr.New(err, persistenceApi.UnableToGetNodeReason)
201 return
202 }
203 allNodesMap := make(map[string]bool, 0)
204
205 for _, node := range allNodes.Items {
206 allNodesMap[node.Name] = false
207 }
208
209
210 filters := convertNodeSelectorToLabelSelector(pers.Spec.NodeSelectorTerms, log)
211 selectedNodes := &corev1.NodeList{}
212 if err := r.Client.List(ctx, selectedNodes, &client.ListOptions{LabelSelector: filters}); client.IgnoreNotFound(err) != nil {
213 log.Error(err, "failed to get nodes")
214 recErr = recerr.New(err, persistenceApi.UnableToGetNodeReason)
215 return
216 }
217 for _, node := range selectedNodes.Items {
218 allNodesMap[node.Name] = true
219 }
220
221
222 pvcs := &corev1.PersistentVolumeClaimList{}
223 exists, err := labels.NewRequirement(persistenceApi.InstanceLabel, selection.Exists, nil)
224 if err != nil {
225 recErr = recerr.NewStalled(err, persistenceApi.InvalidLabelReason)
226 return
227 }
228 err = r.Client.List(ctx, pvcs, &client.ListOptions{
229 LabelSelector: labels.NewSelector().Add(*exists),
230 Namespace: pers.Namespace,
231 })
232 if err != nil {
233 recErr = recerr.New(err, persistenceApi.UnableToGetPVCReason)
234 return
235 }
236 persistenceSts := map[string]struct{}{}
237 for _, pvc := range pvcs.Items {
238 persistenceSts[pvc.Labels[persistenceApi.InstanceLabel]] = struct{}{}
239 }
240
241 var unstructuredObjs []*unstructured.Unstructured
242
243 for _, node := range allNodes.Items {
244
245 oldSts := &v1.StatefulSet{}
246
247
248 instanceName := instanceName(node, pers.Spec.StatefulSet.GetName(), persistenceSts)
249
250 if err := validPVCName(pers, instanceName); err != nil {
251 recErr = recerr.NewStalled(err, persistenceApi.InvalidLabelReason)
252 return
253 }
254
255
256 if !allNodesMap[node.Name] {
257 err := r.Client.Get(ctx, types.NamespacedName{Name: instanceName, Namespace: pers.Namespace}, oldSts)
258 if errors.IsNotFound(err) {
259 continue
260 }
261 if err != nil {
262 recErr = recerr.New(fmt.Errorf("failed to check for existing statefulset: %w", err), persistenceApi.UnableToDeleteSS)
263 return
264 }
265 if err = r.Client.Delete(ctx, oldSts); err != nil {
266 recErr = recerr.New(fmt.Errorf("failed to delete existing statefulset: %w", err), persistenceApi.UnableToDeleteSS)
267 return
268 }
269 continue
270 }
271
272 ss := &v1.StatefulSet{}
273
274 if pers.Spec.NameSubstitution != nil {
275 ssBytes, err := json.Marshal(pers.Spec.StatefulSet)
276 if err != nil {
277 recErr = recerr.New(fmt.Errorf("failed to marshal statefulSet: %w", err), persistenceApi.NameSubstitutionFailedReason)
278 return
279 }
280
281 data := bytes.Replace(ssBytes, []byte(*pers.Spec.NameSubstitution), []byte(instanceName), -1)
282
283 err = json.Unmarshal(data, ss)
284 if err != nil {
285 recErr = recerr.New(fmt.Errorf("failed to unmarshal data to statefulSet: %w", err), persistenceApi.NameSubstitutionFailedReason)
286 return
287 }
288 } else {
289 ss = pers.Spec.StatefulSet.DeepCopy()
290 }
291
292
293 customNodeLabelKey := customNodeLabel + ss.Name
294 oldCustomNodeLabelKey := oldCustomNodeLabel + ss.Name
295
296 node.Labels[customNodeLabelKey] = instanceName
297
298
299 node.Labels[oldCustomNodeLabelKey] = instanceName
300
301 updatedNode := node
302
303 err := r.Client.Update(ctx, &updatedNode)
304 if err != nil {
305 recErr = recerr.New(fmt.Errorf("failed to update node label: %w", err), persistenceApi.UpdateNodeLabelFailedReason)
306 return
307 }
308
309 ss.Name = instanceName
310 ss.Namespace = pers.Namespace
311
312
313 ss.Spec.Template.ObjectMeta.Labels[persistenceApi.InstanceLabel] = instanceName
314
315 ss.Spec.Selector.MatchLabels[persistenceApi.InstanceLabel] = instanceName
316
317
318 ss.ObjectMeta.OwnerReferences = r.ownerRef(pers)
319
320
321 ss.Spec.Template.Spec.Affinity = &corev1.Affinity{
322 NodeAffinity: &corev1.NodeAffinity{
323 RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
324 NodeSelectorTerms: []corev1.NodeSelectorTerm{
325 {MatchExpressions: []corev1.NodeSelectorRequirement{
326 {Key: customNodeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{instanceName}},
327 {Key: oldCustomNodeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{instanceName}}}}}},
328 },
329 }
330 log.Info("creating stateful set for lane", "statefuleset name", ss.Name, "lane or node name", instanceName)
331
332 uobj, err := unstructuredutil.ToUnstructured(ss)
333 if err != nil {
334 recErr = recerr.New(fmt.Errorf("failed to convert %s/%s/%s to unstructured: %w", uobj.GetObjectKind(), uobj.GetNamespace(), uobj.GetName(), err), persistenceApi.ApplyFailedReason)
335 return
336 }
337 unstructuredObjs = append(unstructuredObjs, uobj)
338 }
339
340
341 if err := ssa.SetNativeKindsDefaults(unstructuredObjs); err != nil {
342 recErr = recerr.New(err, persistenceApi.ApplyFailedReason)
343 result = reconcile.ResultRequeue
344 return
345 }
346
347 changeSet, err := r.ResourceManager.ApplyAll(ctx, unstructuredObjs, ssa.ApplyOptions{Force: false})
348 if err != nil {
349 recErr = recerr.New(fmt.Errorf("failed to apply resources: %w", err), persistenceApi.ApplyFailedReason)
350 return
351 }
352 pers.Status.Inventory = inventory.New(inventory.FromChangeSet(changeSet))
353
354 log.Info("stateful sets created", "changeset", changeSet)
355
356 if oldStatus.Inventory != nil {
357 diff, err := inventory.Diff(oldStatus.Inventory, pers.Status.Inventory)
358 if err != nil {
359 recErr = recerr.New(err, persistenceApi.PruneFailedReason)
360 return
361 }
362 if len(diff) > 0 {
363 opt := ssa.DefaultDeleteOptions()
364 opt.Exclusions = map[string]string{couchdb.SubstitutionLabel: couchdb.LabelValueTrue}
365 changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt)
366 if err != nil {
367 recErr = recerr.New(err, persistenceApi.PruneFailedReason)
368 return
369 }
370 log.Info("pruned objects", "changeset", changeSet)
371 }
372 }
373 log.Info("persistence reconciled successfully")
374
375 conditions.MarkTrue(pers, status.ReadyCondition, persistenceApi.ApplySuccessReason, "successfully applied")
376 result = reconcile.ResultSuccess
377 return
378 }
379
380 func validPVCName(pers *persistenceApi.Persistence, instanceName string) error {
381 for _, claim := range pers.Spec.StatefulSet.Spec.VolumeClaimTemplates {
382 pvcName := fmt.Sprintf("%s-%s-0", claim.Name, instanceName)
383 if len(pvcName) > 63 {
384 return fmt.Errorf("pvc name %s is too long", pvcName)
385 }
386 }
387 return nil
388 }
389
390 func (r *PersistenceReconciler) ownerRef(p *persistenceApi.Persistence) []metav1.OwnerReference {
391 return []metav1.OwnerReference{
392 *metav1.NewControllerRef(
393 p,
394 persistenceApi.GroupVersion.WithKind(persistenceApi.Kind),
395 ),
396 }
397 }
398
399
400 func instanceName(node corev1.Node, stsName string, persistenceSts map[string]struct{}) string {
401 instanceName := nameutils.CreateReplicatedStatefulsetName(node, stsName)
402 if _, ok := persistenceSts[instanceName]; ok {
403 return instanceName
404 }
405 return nameutils.StatefulSetNodeName(node, stsName)
406 }
407
408 func createDefaultLabelFilter(log logr.Logger) labels.Selector {
409 nodeFilter, err := labels.NewRequirement(nodemeta.ClassLabel, selection.In, []string{persistenceApi.TouchpointLabel})
410 if err != nil {
411 log.Error(err, "unable to convert node selector into label selector")
412 return labels.Everything()
413 }
414 return labels.NewSelector().Add(*nodeFilter)
415 }
416
417 func convertNodeSelectorToLabelSelector(selectors []corev1.NodeSelectorTerm, log logr.Logger) labels.Selector {
418 if selectors == nil {
419 return createDefaultLabelFilter(log)
420 }
421 reqs := labels.Requirements{}
422 for _, term := range selectors {
423 for _, expression := range term.MatchExpressions {
424 nodeFilter, err := labels.NewRequirement(expression.Key, convertOperator(expression.Operator), expression.Values)
425 if err != nil {
426 log.Error(err, "unable to convert node selector into label selector")
427 } else {
428 reqs = append(reqs, *nodeFilter)
429 }
430 }
431 }
432 return labels.NewSelector().Add(reqs...)
433 }
434
435 func convertOperator(sop corev1.NodeSelectorOperator) selection.Operator {
436 switch sop {
437 case corev1.NodeSelectorOpIn:
438 return selection.In
439 case corev1.NodeSelectorOpNotIn:
440 return selection.NotIn
441 case corev1.NodeSelectorOpExists:
442 return selection.Exists
443 case corev1.NodeSelectorOpDoesNotExist:
444 return selection.DoesNotExist
445 case corev1.NodeSelectorOpGt:
446 return selection.GreaterThan
447 case corev1.NodeSelectorOpLt:
448 return selection.LessThan
449 default:
450 return selection.In
451 }
452 }
453
454 func (r *PersistenceReconciler) setResourceManager() {
455 if r.ResourceManager == nil {
456 mgr := ssa.NewResourceManager(
457 r.Client,
458
459
460 polling.NewStatusPoller(r.Client, r.Client.RESTMapper(), polling.Options{}), ssa.Owner{Field: r.Name},
461 )
462 r.ResourceManager = mgr
463 }
464 }
465
466
467 func (r *PersistenceReconciler) getPersistenceToEnque(ctx context.Context, _ client.Object) []ctrlReconcile.Request {
468 persList := &persistenceApi.PersistenceList{}
469 if err := r.Client.List(ctx, persList); client.IgnoreNotFound(err) != nil {
470 return nil
471 }
472
473 var persRequests []ctrlReconcile.Request
474 for _, pers := range persList.Items {
475 persRequests = append(persRequests, ctrlReconcile.Request{NamespacedName: types.NamespacedName{Name: pers.Name, Namespace: pers.Namespace}})
476 }
477 return persRequests
478 }
479
View as plain text