1 package lumperctl
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 ctrl "sigs.k8s.io/controller-runtime"
10 "sigs.k8s.io/controller-runtime/pkg/client"
11 "sigs.k8s.io/controller-runtime/pkg/controller"
12 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
13
14 whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
15 "edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal"
16 "edge-infra.dev/pkg/k8s/meta/status"
17 "edge-infra.dev/pkg/k8s/runtime/conditions"
18 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
19 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
20 "edge-infra.dev/pkg/k8s/runtime/inventory"
21 "edge-infra.dev/pkg/k8s/runtime/patch"
22 "edge-infra.dev/pkg/k8s/runtime/sap"
23 "edge-infra.dev/pkg/k8s/unstructured"
24 )
25
26
27
28
29
30
31
32
33 type ShipmentReconciler struct {
34 *internal.Reconciler
35 }
36
37 var shipmentConditions = reconcile.Conditions{
38 Target: status.ReadyCondition,
39 Owned: []string{
40 whv1.HealthyCondition,
41 whv1.FetchedArtifactCondition,
42 status.ReadyCondition,
43 status.ReconcilingCondition,
44 status.StalledCondition,
45 },
46 Summarize: []string{
47 whv1.HealthyCondition,
48 whv1.FetchedArtifactCondition,
49 status.StalledCondition,
50 },
51 NegativePolarity: []string{
52 status.ReconcilingCondition,
53 status.StalledCondition,
54 },
55 }
56
57 func (r *ShipmentReconciler) SetupWithManager(mgr ctrl.Manager, cfg ShipmentCfg) error {
58 if err := r.Reconciler.SetupWithManager(mgr); err != nil {
59 return err
60 }
61
62 return ctrl.NewControllerManagedBy(mgr).
63 For(&whv1.Shipment{}).
64 WithOptions(controller.Options{
65 MaxConcurrentReconciles: cfg.Concurrent,
66 }).
67 Owns(&whv1.UnpackedPallet{}).
68 Complete(r)
69 }
70
71 func (r *ShipmentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
72 var (
73 reconcileStart = time.Now()
74 log = ctrl.LoggerFrom(ctx)
75 result = reconcile.ResultEmpty
76 s = &whv1.Shipment{}
77 )
78
79 if err := r.Get(ctx, req.NamespacedName, s); err != nil {
80 return ctrl.Result{}, client.IgnoreNotFound(err)
81 }
82
83 patcher := patch.NewSerialPatcher(s, r.Client)
84
85 defer func() {
86 summarizer := reconcile.NewSummarizer(patcher)
87 res, recErr = summarizer.SummarizeAndPatch(ctx, s, []reconcile.SummarizeOption{
88 reconcile.WithConditions(r.Conditions),
89 reconcile.WithResult(result),
90 reconcile.WithError(recErr),
91 reconcile.WithIgnoreNotFound(),
92 reconcile.WithProcessors(
93 reconcile.RecordReconcileReq,
94 reconcile.RecordResult,
95 ),
96 reconcile.WithFieldOwner(r.Name),
97 }...)
98
99 r.Metrics.RecordConditionsWithReason(ctx, s)
100 r.Metrics.RecordDuration(ctx, s, reconcileStart)
101 r.Metrics.RecordSuspend(ctx, s, s.Spec.Suspend)
102 r.LivenessChecker.AddStatus(s, recErr)
103
104 internal.RecordCacheLen(float64(r.Cache.Len()))
105 }()
106
107
108 if !controllerutil.ContainsFinalizer(s, whv1.WarehouseFinalizer) {
109 controllerutil.AddFinalizer(s, whv1.WarehouseFinalizer)
110
111
112 result = reconcile.ResultRequeue
113 return
114 }
115
116
117 if !s.ObjectMeta.DeletionTimestamp.IsZero() {
118 log.Info("detected deletion, executing finalizer")
119
120 finalizer := Finalizer{
121 Name: whv1.WarehouseFinalizer,
122 ResourceManager: r.ResourceManager,
123 }
124
125 if err := finalizer.Finalize(ctx, s, s.Spec.Prune); err != nil {
126 result, recErr = reconcile.ResultEmpty, err
127 return
128 }
129
130 controllerutil.RemoveFinalizer(s, whv1.WarehouseFinalizer)
131 log.Info("finalizer executed")
132 return
133 }
134
135 if s.Spec.Suspend {
136 log.Info("reconciliation is suspended", "suspended", "true")
137 return
138 }
139
140 log.Info("reconciling")
141 recErr = r.reconcile(ctx, patcher, s)
142 if recErr == nil {
143 result = reconcile.ResultSuccess
144 }
145 return
146 }
147
148 func (r *ShipmentReconciler) reconcile(
149 ctx context.Context,
150 patcher *patch.SerialPatcher,
151 s *whv1.Shipment,
152 ) recerr.Error {
153 log := ctrl.LoggerFrom(ctx)
154
155 if s.Spec.UnpackOptions.Provider == "" {
156 s.Spec.UnpackOptions.Provider = r.Provider
157 }
158
159 if err := s.Validate(); err != nil {
160 return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err),
161 whv1.InvalidSpecReason)
162 }
163
164 if err := reconcile.Progressing(ctx, s, patcher, r.PatchOpts()...); err != nil {
165 return recerr.New(err, whv1.ReconcileFailedReason)
166 }
167
168 parameters, err := r.resolveParameters(ctx, s)
169 if err != nil {
170 return err
171 }
172 resolveResult, err := r.resolvePallets(ctx, s, parameters)
173 if err != nil {
174 return err
175 }
176
177
178
179 if !s.Spec.UnpackOptions.IsEmpty() {
180 for _, capability := range s.Spec.UnpackOptions.Capabilities {
181 found := false
182 for name := range resolveResult.unpackedPallets {
183 if capability == name {
184 found = true
185 }
186 }
187 if !found {
188 return recerr.NewStalled(
189 fmt.Errorf(
190 "runtime capability provider %s not included in shipment packages",
191 capability,
192 ),
193 whv1.InvalidArtifactReason,
194 )
195 }
196 }
197 }
198
199 objs := make([]*unstructured.Unstructured, len(resolveResult.unpackedPallets))
200 i := 0
201 for _, v := range resolveResult.unpackedPallets {
202 u, err := unstructured.ToUnstructured(v)
203 if err != nil {
204 return recerr.New(err, whv1.ApplyFailedReason)
205 }
206 objs[i] = u
207 i++
208 }
209
210
211 s.Status.LastAttempted = resolveResult.resolved
212
213 if err := r.apply(ctx, s, objs); err != nil {
214 err.ToCondition(s, whv1.HealthyCondition)
215 return err
216 }
217 log.Info("applied artifacts")
218
219 conditions.MarkTrue(s, whv1.HealthyCondition, status.SucceededReason,
220 "Applied %d pallets", len(resolveResult.unpackedPallets))
221 return nil
222 }
223
224 func (r *ShipmentReconciler) apply(
225 ctx context.Context,
226 s *whv1.Shipment,
227 objs []*unstructured.Unstructured,
228 ) recerr.Error {
229 var (
230 mgr = r.ResourceManager
231 log = ctrl.LoggerFrom(ctx).WithName("apply")
232 )
233
234 mgr.SetOwnerLabels(objs, s.Name, "")
235
236 changeSet, err := mgr.ApplyAll(ctx, objs, sap.ApplyOptions{
237 WaitTimeout: s.Spec.Timeout.Duration,
238 })
239 if err != nil {
240 return recerr.New(err, whv1.ApplyFailedReason)
241 }
242 log.Info("applied", "changeset", changeSet.ToMap())
243
244 newInventory := inventory.New(inventory.FromSapChangeSet(changeSet))
245 if err := r.prune(ctx, s, newInventory); err != nil {
246 return recerr.New(err, whv1.PruneFailedReason)
247 }
248
249
250
251 s.Status.Inventory = newInventory
252 s.Status.LastApplied = s.Status.LastAttempted
253
254 if err := mgr.WaitForSet(ctx, changeSet.ToObjMetadataSet(), sap.WaitOptions{
255 Timeout: s.Spec.Timeout.Duration,
256 }); err != nil {
257 err := recerr.NewWait(err, whv1.ReconcileFailedReason, s.RetryInterval())
258 return err
259 }
260
261 return nil
262 }
263
264 func (r *ShipmentReconciler) prune(
265 ctx context.Context,
266 s *whv1.Shipment,
267 newInventory *inventory.ResourceInventory,
268 ) error {
269 log := ctrl.LoggerFrom(ctx).WithName("prune")
270
271 switch {
272 case !s.Spec.Prune:
273 log.Info("pruning is turned off for this shipment")
274 return nil
275 case s.Status.Inventory == nil:
276 return nil
277 default:
278 diff, err := s.Status.Inventory.Diff(newInventory)
279 switch {
280 case err != nil:
281 return err
282 case len(diff) == 0:
283 return nil
284 }
285
286
287 deleted, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions())
288 if err != nil {
289 return err
290 }
291 log.Info("pruned", "changeset", deleted.ToMap())
292 return nil
293 }
294 }
295
296 func (r *ShipmentReconciler) resolveParameters(
297 ctx context.Context,
298 s *whv1.Shipment,
299 ) (map[string]string, recerr.Error) {
300 result := make(map[string]string, 0)
301 for _, p := range s.Spec.Rendering {
302 vars, err := p.Resolve(ctx, r.Client)
303 switch {
304 case errors.Is(err, whv1.ErrInvalidParameterMapping):
305
306
307
308 return nil, recerr.NewStalled(err, whv1.InvalidResourceReason)
309 case err != nil:
310 return nil, recerr.New(err, whv1.RenderingParameterParsingFailedReason)
311 }
312
313 for k, v := range vars {
314 result[k] = v
315 }
316 }
317
318 return result, nil
319 }
320
View as plain text