1 package syncedobject
2
3 import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "regexp"
9 "time"
10
11 "cloud.google.com/go/pubsub"
12 certmgr "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
13 "github.com/go-logr/logr"
14 "github.com/google/uuid"
15 "k8s.io/apimachinery/pkg/runtime"
16 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
18 ctrl "sigs.k8s.io/controller-runtime"
19 "sigs.k8s.io/controller-runtime/pkg/builder"
20 "sigs.k8s.io/controller-runtime/pkg/client"
21 k8sctrl "sigs.k8s.io/controller-runtime/pkg/controller"
22 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
23 "sigs.k8s.io/controller-runtime/pkg/predicate"
24
25 soapi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1"
26 "edge-infra.dev/pkg/edge/chariot"
27 chariotClientApi "edge-infra.dev/pkg/edge/chariot/client"
28 "edge-infra.dev/pkg/k8s/meta/status"
29 "edge-infra.dev/pkg/k8s/runtime/conditions"
30 "edge-infra.dev/pkg/k8s/runtime/controller"
31 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
32 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
33 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
34 "edge-infra.dev/pkg/k8s/runtime/patch"
35 "edge-infra.dev/pkg/lib/fog"
36 )
37
38 const SyncedObjectChariotOwner = "SyncedObjectController"
39
40
41
42
43 var syncedObjectConditions = reconcile.Conditions{
44 Target: status.ReadyCondition,
45 Owned: []string{
46 status.ReadyCondition,
47 status.ReconcilingCondition,
48 status.StalledCondition,
49 },
50 Summarize: []string{
51 status.StalledCondition,
52 },
53 NegativePolarity: []string{
54 status.ReconcilingCondition,
55 status.StalledCondition,
56 },
57 }
58
59
60
61
62
63
64 type Reconciler struct {
65 client.Client
66 Log logr.Logger
67 Name string
68 CrdMetrics metrics.Metrics
69 Conditions reconcile.Conditions
70 Cfg *Config
71 }
72
73 func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
74 return ctrl.NewControllerManagedBy(mgr).
75 For(&soapi.SyncedObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
76 WithOptions(k8sctrl.Options{
77 MaxConcurrentReconciles: r.Cfg.Concurrency,
78 }).
79 Complete(r)
80 }
81
82 func (r *Reconciler) PatchOpts() []patch.Option {
83 return []patch.Option{
84 patch.WithOwnedConditions{Conditions: r.Conditions.Owned},
85 patch.WithFieldOwner(r.Name),
86 }
87 }
88
89
90
91 func Run(ctx context.Context, cfg *Config) error {
92 var log = fog.FromContext(ctx)
93
94 mgr, err := cfg.CreateMgr()
95 if err != nil {
96 log.Error(err, "failed to create manager")
97 return err
98 }
99
100 log.Info("starting manager")
101 if err := mgr.Start(ctx); err != nil {
102 log.Error(err, "problem running manager")
103 return err
104 }
105 return nil
106 }
107
108
109
110
111 func (cfg *Config) CreateMgr(o ...controller.Option) (ctrl.Manager, error) {
112 var log = fog.New()
113 ctrl.SetLogger(log)
114
115 o = append(o, controller.WithMetricsAddress(cfg.MetricsAddr))
116 ctlCfg, opts := controller.ProcessOptions(o...)
117 opts.LeaderElectionID = "syncedobject.edge.ncr.com"
118 opts.Scheme = createScheme()
119
120 mgr, err := ctrl.NewManager(ctlCfg, opts)
121 if err != nil {
122 log.Error(err, "unable to create manager")
123 return nil, err
124 }
125
126 var r = &Reconciler{
127 Client: mgr.GetClient(),
128 Log: log.WithName("syncedobject-reconciler"),
129 Name: "syncedobject-controller",
130 CrdMetrics: metrics.New(mgr, "soctl"),
131 Conditions: syncedObjectConditions,
132 Cfg: cfg,
133 }
134 if err = r.SetupWithManager(mgr); err != nil {
135 log.Error(err, "failed to setup controller with manager")
136 return nil, err
137 }
138 return mgr, nil
139 }
140
141 func createScheme() *runtime.Scheme {
142 scheme := runtime.NewScheme()
143
144 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
145 utilruntime.Must(soapi.AddToScheme(scheme))
146 utilruntime.Must(certmgr.AddToScheme(scheme))
147
148 return scheme
149 }
150
151 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
152 var (
153 reconcileStart = time.Now()
154 log = ctrl.LoggerFrom(ctx)
155 result = reconcile.ResultEmpty
156 so = &soapi.SyncedObject{}
157 )
158
159 if err := r.Get(ctx, req.NamespacedName, so); err != nil {
160 return ctrl.Result{}, client.IgnoreNotFound(err)
161 }
162 log = log.WithValues("info", so.SyncedObjectInfo())
163 ctx = logr.NewContext(ctx, log)
164 r.CrdMetrics.RecordReconciling(ctx, so)
165
166 patcher := patch.NewSerialPatcher(so, r.Client)
167
168 defer func() {
169 if recErr != nil {
170 reconcileErr, ok := recErr.(recerr.Error)
171 if !ok {
172 reconcileErr = recerr.New(recErr, soapi.ReconcileFailedReason)
173 }
174 reconcileErr.ToCondition(so, status.ReadyCondition)
175 }
176
177 summarizer := reconcile.NewSummarizer(patcher)
178 res, recErr = summarizer.SummarizeAndPatch(ctx, so, []reconcile.SummarizeOption{
179 reconcile.WithConditions(r.Conditions),
180 reconcile.WithResult(result),
181 reconcile.WithError(recErr),
182 reconcile.WithIgnoreNotFound(),
183 reconcile.WithProcessors(
184 reconcile.RecordReconcileReq,
185 reconcile.RecordResult,
186 ),
187 reconcile.WithFieldOwner(r.Name),
188 }...)
189
190 r.CrdMetrics.RecordDuration(ctx, so, reconcileStart)
191 r.CrdMetrics.RecordReadiness(ctx, so)
192 }()
193
194
195 if !controllerutil.ContainsFinalizer(so, soapi.Finalizer) {
196 if !controllerutil.AddFinalizer(so, soapi.Finalizer) {
197 recErr = fmt.Errorf("finalizer not added")
198 log.Error(recErr, "AddFinalizer returned false")
199 }
200
201 result = reconcile.ResultRequeue
202 return
203 }
204
205 if err := ValidateSyncedObject(so); err != nil {
206 recErr = recerr.NewStalled(err, soapi.InvalidSpecReason)
207 log.Error(recErr, "failed to validate SyncedObject.Spec")
208 return
209 }
210
211
212 if !so.ObjectMeta.DeletionTimestamp.IsZero() {
213
214 annotations := so.GetAnnotations()
215 if annotations[soapi.AnnotationDeletionPolicy] == soapi.AnnotationDeletionPolicyAbandon {
216 log.Info("detected abandon deletion policy")
217 } else if err := r.rideChariot(ctx, so, chariotClientApi.Delete); err != nil {
218 recErr = recerr.New(err, soapi.PublishMessageFailedReason)
219 log.Error(recErr, "failed to publish chariot delete message")
220
221 result = reconcile.ResultRequeue
222 return
223 } else {
224 log.Info("published chariot delete message")
225 }
226
227 if !controllerutil.RemoveFinalizer(so, soapi.Finalizer) {
228 recErr = fmt.Errorf("finalizer not removed")
229 log.Error(recErr, "RemoveFinalizer returned false")
230 }
231 return
232 }
233
234
235 if so.IsExpired() {
236 log.Info("synced object expired")
237 err := r.Client.Delete(ctx, so)
238 if client.IgnoreNotFound(err) != nil {
239 recErr = err
240 }
241
242 result = reconcile.ResultRequeue
243 return
244 }
245
246 if recErr = reconcile.Progressing(ctx, so, patcher, r.PatchOpts()...); recErr != nil {
247 return
248 }
249
250 if storageLocationOutdated, err := IsStorageLocationOutdated(so); err != nil {
251 recErr = recerr.New(err, soapi.ReconcileFailedReason)
252 log.Error(recErr, "failed to check storage location for updates")
253 return
254 } else if storageLocationOutdated {
255
256 var outdatedSo = &soapi.SyncedObject{
257 Spec: soapi.SyncedObjectSpec{
258 Object: so.Status.StoredObject,
259 Banner: so.Status.Banner,
260 Cluster: so.Status.Cluster,
261 Directory: so.Status.Directory,
262 },
263 }
264
265
266 annotations := so.GetAnnotations()
267 if annotations[soapi.AnnotationDeletionPolicy] == soapi.AnnotationDeletionPolicyAbandon {
268 log.Info("detected abandon deletion policy for synced object with outdated storage location")
269 } else if err := r.rideChariot(ctx, outdatedSo, chariotClientApi.Delete); err != nil {
270 recErr = recerr.New(err, soapi.PublishMessageFailedReason)
271 log.Error(recErr, "failed to publish chariot delete message")
272
273 result = reconcile.ResultRequeue
274 return
275 } else {
276 log.Info("published chariot delete message for outdated storage location")
277 }
278
279 so.ClearStatusDetails()
280 }
281
282 if err := r.rideChariot(ctx, so, chariotClientApi.Create); err != nil {
283 recErr = recerr.New(err, soapi.PublishMessageFailedReason)
284 log.Error(recErr, "failed to publish chariot create message")
285
286 result = reconcile.ResultRequeue
287 return
288 }
289
290
291 if err := so.SetStatusDetails(); err != nil {
292
293 recErr = recerr.New(err, soapi.ReconcileFailedReason)
294 log.Error(recErr, "failed update status")
295 return
296 }
297
298 conditions.MarkTrue(so, status.ReadyCondition, soapi.PublishMessageSucceededReason, "message published successfully")
299 result = reconcile.ResultSuccess
300 return
301 }
302
303 func (r *Reconciler) rideChariot(ctx context.Context, so *soapi.SyncedObject, operation chariotClientApi.Operation) error {
304 var dir string
305 if so.Spec.Directory != nil {
306 dir = *so.Spec.Directory
307 }
308
309 re := chariotClientApi.
310 NewChariotMessage().
311 SetOperation(operation).
312 SetBanner(so.Spec.Banner).
313 SetCluster(so.Spec.Cluster).
314 SetOwner(SyncedObjectChariotOwner).
315 SetDir(dir).
316 SetObjects([]string{so.Spec.Object}).
317 SetNotify(so.Spec.Notify)
318
319 data, err := json.Marshal(re)
320 if err != nil {
321 return err
322 }
323
324 var msg = &pubsub.Message{Data: data}
325
326 _, err = r.Cfg.PubsubTopic.Publish(ctx, msg).Get(ctx)
327 return err
328 }
329
330 func CreateChariotID(base64EncodedObj string) (string, error) {
331 obj, err := base64.StdEncoding.DecodeString(base64EncodedObj)
332 if err != nil {
333 return "", err
334 }
335 gvknn, err := chariot.ParseYamlGVKNN(obj)
336 if err != nil {
337 return "", err
338 }
339 return gvknn.Hash(), gvknn.Validate()
340 }
341
342
343
344
345 func IsStorageLocationOutdated(so *soapi.SyncedObject) (bool, error) {
346 if so.Status.StorageLocation == "" {
347
348 return false, nil
349 }
350
351 chariotID, err := CreateChariotID(so.Spec.Object)
352 if err != nil {
353 return false, err
354 }
355 var dir string
356 if so.Spec.Directory != nil {
357 dir = *so.Spec.Directory
358 }
359 currentStorageLocation := chariot.FmtStorageLocation(so.Spec.Banner, so.Spec.Cluster, dir, chariotID)
360 if currentStorageLocation != so.Status.StorageLocation {
361 return true, nil
362 }
363
364 return false, nil
365 }
366
367
368
369
370 var reValidBannerAndCluster = regexp.MustCompile("^[a-z]([-]?[a-z0-9])+$")
371
372 func ValidateSyncedObject(so *soapi.SyncedObject) error {
373 if so.Spec.Object == "" {
374 return fmt.Errorf("missing object in spec")
375 } else if _, err := CreateChariotID(so.Spec.Object); err != nil {
376 return fmt.Errorf("invalid object in spec: %w", err)
377 }
378
379 if so.Spec.Banner == "" {
380 return fmt.Errorf("missing banner in spec")
381 } else if len(so.Spec.Banner) < 6 || len(so.Spec.Banner) > 30 {
382 return fmt.Errorf("invalid banner project ID length")
383 } else if !reValidBannerAndCluster.MatchString(so.Spec.Banner) {
384 return fmt.Errorf("invalid banner project ID name")
385 }
386
387
388 if so.Spec.Cluster == "" {
389 return nil
390 } else if len(so.Spec.Cluster) > 40 {
391 return fmt.Errorf("invalid cluster name length")
392 } else if reValidBannerAndCluster.MatchString(so.Spec.Cluster) {
393 return nil
394 } else if cuuid, err := uuid.Parse(so.Spec.Cluster); err != nil || cuuid == uuid.Nil {
395 return fmt.Errorf("invalid cluster name")
396 }
397 return nil
398 }
399
View as plain text