1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package lifecyclehandler
16
17 import (
18 "context"
19 "fmt"
20
21 corekccv1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/core/v1alpha1"
22 k8sv1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
23 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/deepcopy"
24 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
25 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/label"
26 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/lease/leaser"
27 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/resourceoverrides"
28 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
29
30 corev1 "k8s.io/api/core/v1"
31 apierrors "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/apimachinery/pkg/types"
36 "k8s.io/client-go/tools/record"
37 "sigs.k8s.io/controller-runtime/pkg/client"
38 )
39
40
41 type LifecycleHandler struct {
42 client.Client
43 Recorder record.EventRecorder
44 fieldOwner string
45 }
46
47 func NewLifecycleHandler(c client.Client, r record.EventRecorder) LifecycleHandler {
48 return NewLifecycleHandlerWithFieldOwner(c, r, k8s.ControllerManagedFieldManager)
49 }
50
51 func NewLifecycleHandlerWithFieldOwner(c client.Client, r record.EventRecorder, fieldOwner string) LifecycleHandler {
52 return LifecycleHandler{
53 Client: c,
54 Recorder: r,
55 fieldOwner: fieldOwner,
56 }
57 }
58
59 func (r *LifecycleHandler) updateStatus(ctx context.Context, resource *k8s.Resource) error {
60 u, err := resource.MarshalAsUnstructured()
61 if err != nil {
62 return err
63 }
64 if err := r.Client.Status().Update(ctx, u, client.FieldOwner(r.fieldOwner)); err != nil {
65 if apierrors.IsConflict(err) {
66 return fmt.Errorf("couldn't update the API server due to conflict. Re-enqueue the request for another reconciliation attempt: %v", err)
67 }
68 return fmt.Errorf("error with status update call to API server: %v", err)
69 }
70
71
72 if isFailureStatus(u) {
73 return fmt.Errorf("error with status update call to API server: %v", u.Object["message"])
74 }
75
76 if err := util.Marshal(u, resource); err != nil {
77 return err
78 }
79 return resourceoverrides.Handler.PostUpdateStatusTransform(resource)
80 }
81
82
83
84 func (r *LifecycleHandler) updateAPIServer(ctx context.Context, resource *k8s.Resource) error {
85
86
87 status := deepcopy.MapStringInterface(resource.Status)
88
89
90 observedGeneration := resource.GetGeneration()
91 u, err := resource.MarshalAsUnstructured()
92 if err != nil {
93 return err
94 }
95 removeSystemLabels(u)
96 if err := r.Client.Update(ctx, u, client.FieldOwner(r.fieldOwner)); err != nil {
97 if apierrors.IsConflict(err) {
98 return fmt.Errorf("couldn't update the API server due to conflict. Re-enqueue the request for another reconciliation attempt: %v", err)
99 }
100 return fmt.Errorf("error with update call to API server: %v", err)
101 }
102
103
104 if isFailureStatus(u) {
105 return fmt.Errorf("error with update call to API server: %v", u.Object["message"])
106 }
107
108 if err := util.Marshal(u, resource); err != nil {
109 return fmt.Errorf("error syncing updated resource metadata: %w", err)
110 }
111 if !u.GetDeletionTimestamp().IsZero() && len(u.GetFinalizers()) == 0 {
112
113
114 return nil
115 }
116 resource.Status = status
117 setObservedGeneration(resource, observedGeneration)
118 return r.updateStatus(ctx, resource)
119 }
120
121 func isFailureStatus(u *unstructured.Unstructured) bool {
122 return u.GetKind() == "Status" && u.Object["status"] == metav1.StatusFailure
123 }
124
125
126
127 func removeSystemLabels(u *unstructured.Unstructured) {
128 labels := u.GetLabels()
129 if labels == nil {
130 return
131 }
132 keys := leaser.GetLabelKeys()
133 keys = append(keys, label.CnrmManagedKey)
134 for _, k := range keys {
135 delete(labels, k)
136 }
137
138 u.SetLabels(labels)
139 }
140
141
142
143
144 func CausedByUnreadyOrNonexistentResourceRefs(err error) (refGVK schema.GroupVersionKind, refNN types.NamespacedName, ok bool) {
145 if unwrappedErr, ok := k8s.AsReferenceNotReadyError(err); ok {
146 return unwrappedErr.RefResourceGVK, unwrappedErr.RefResource, true
147 }
148 if unwrappedErr, ok := k8s.AsReferenceNotFoundError(err); ok {
149 return unwrappedErr.RefResourceGVK, unwrappedErr.RefResource, true
150 }
151 if unwrappedErr, ok := k8s.AsTransitiveDependencyNotFoundError(err); ok {
152 return unwrappedErr.ResourceGVK, unwrappedErr.Resource, true
153 }
154 if unwrappedErr, ok := k8s.AsTransitiveDependencyNotReadyError(err); ok {
155 return unwrappedErr.ResourceGVK, unwrappedErr.Resource, true
156 }
157 if unwrappedErr, ok := k8s.AsSecretNotFoundError(err); ok {
158 return schema.GroupVersionKind{Version: "v1", Kind: "Secret"}, unwrappedErr.Secret, true
159 }
160 return schema.GroupVersionKind{}, types.NamespacedName{}, false
161 }
162
163 func CausedByUnresolvableDeps(err error) (unwrappedErr error, ok bool) {
164 if unwrappedErr, ok := k8s.AsReferenceNotReadyError(err); ok {
165 return unwrappedErr, true
166 }
167 if unwrappedErr, ok := k8s.AsReferenceNotFoundError(err); ok {
168 return unwrappedErr, true
169 }
170 if unwrappedErr, ok := k8s.AsSecretNotFoundError(err); ok {
171 return unwrappedErr, true
172 }
173 if unwrappedErr, ok := k8s.AsKeyInSecretNotFoundError(err); ok {
174 return unwrappedErr, true
175 }
176 if unwrappedErr, ok := k8s.AsTransitiveDependencyNotFoundError(err); ok {
177 return unwrappedErr, true
178 }
179 if unwrappedErr, ok := k8s.AsTransitiveDependencyNotReadyError(err); ok {
180 return unwrappedErr, true
181 }
182 return nil, false
183 }
184
185 func reasonForUnresolvableDeps(err error) (string, error) {
186 switch err.(type) {
187 case *k8s.ReferenceNotReadyError, *k8s.TransitiveDependencyNotReadyError:
188 return k8s.DependencyNotReady, nil
189 case *k8s.ReferenceNotFoundError, *k8s.SecretNotFoundError, *k8s.TransitiveDependencyNotFoundError:
190 return k8s.DependencyNotFound, nil
191 case *k8s.KeyInSecretNotFoundError:
192 return k8s.DependencyInvalid, nil
193 default:
194 return "", fmt.Errorf("unrecognized error caused by unresolvable dependencies: %v", err)
195 }
196 }
197
198 func (r *LifecycleHandler) EnsureFinalizers(ctx context.Context, original, resource *k8s.Resource, finalizers ...string) error {
199 if !k8s.EnsureFinalizers(resource, finalizers...) {
200 u, err := original.MarshalAsUnstructured()
201 if err != nil {
202 return err
203 }
204 copy, err := k8s.NewResource(u)
205 if err != nil {
206 return err
207 }
208 if !k8s.EnsureFinalizers(copy, finalizers...) {
209 if err := r.updateAPIServer(ctx, copy); err != nil {
210 return err
211 }
212
213 resource.ObjectMeta = copy.ObjectMeta
214 }
215 }
216 return nil
217 }
218
219 func (r *LifecycleHandler) HandleUpToDate(ctx context.Context, resource *k8s.Resource) error {
220 setCondition(resource, corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage)
221 if err := r.updateAPIServer(ctx, resource); err != nil {
222 return err
223 }
224 r.recordEvent(resource, corev1.EventTypeNormal, k8s.UpToDate, k8s.UpToDateMessage)
225 return nil
226 }
227
228 func (r *LifecycleHandler) HandleUnresolvableDeps(ctx context.Context, resource *k8s.Resource, originErr error) error {
229 reason, err := reasonForUnresolvableDeps(originErr)
230 if err != nil {
231 return r.HandleUpdateFailed(ctx, resource, err)
232 }
233 msg := originErr.Error()
234
235 if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, reason, msg) {
236 setCondition(resource, corev1.ConditionFalse, reason, msg)
237 setObservedGeneration(resource, resource.GetGeneration())
238 if err := r.updateStatus(ctx, resource); err != nil {
239 return err
240 }
241 }
242 r.recordEvent(resource, corev1.EventTypeWarning, reason, msg)
243 return nil
244 }
245
246 func (r *LifecycleHandler) HandleObtainLeaseFailed(ctx context.Context, resource *k8s.Resource, err error) error {
247 msg := err.Error()
248
249 if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, k8s.ManagementConflict, msg) {
250 setCondition(resource, corev1.ConditionFalse, k8s.ManagementConflict, msg)
251 setObservedGeneration(resource, resource.GetGeneration())
252 if err := r.updateStatus(ctx, resource); err != nil {
253 return err
254 }
255 }
256 r.recordEvent(resource, corev1.EventTypeWarning, k8s.ManagementConflict, msg)
257 return err
258 }
259
260 func (r *LifecycleHandler) HandlePreActuationTransformFailed(ctx context.Context, resource *k8s.Resource, err error) error {
261 msg := err.Error()
262
263 if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, k8s.PreActuationTransformFailed, msg) {
264 setCondition(resource, corev1.ConditionFalse, k8s.PreActuationTransformFailed, msg)
265 setObservedGeneration(resource, resource.GetGeneration())
266 if err := r.updateStatus(ctx, resource); err != nil {
267 return err
268 }
269 }
270 r.recordEvent(resource, corev1.EventTypeWarning, k8s.PreActuationTransformFailed, msg)
271 return err
272 }
273
274 func (r *LifecycleHandler) HandlePostActuationTransformFailed(ctx context.Context, resource *k8s.Resource, err error) error {
275 msg := err.Error()
276
277 if !k8s.ReadyConditionMatches(resource, corev1.ConditionFalse, k8s.PostActuationTransformFailed, msg) {
278 setCondition(resource, corev1.ConditionFalse, k8s.PostActuationTransformFailed, msg)
279 setObservedGeneration(resource, resource.GetGeneration())
280 if err := r.updateStatus(ctx, resource); err != nil {
281 return err
282 }
283 }
284 r.recordEvent(resource, corev1.EventTypeWarning, k8s.PostActuationTransformFailed, msg)
285 return err
286 }
287
288 func (r *LifecycleHandler) HandleUpdating(ctx context.Context, resource *k8s.Resource) error {
289 setCondition(resource, corev1.ConditionFalse, k8s.Updating, k8s.UpdatingMessage)
290 setObservedGeneration(resource, resource.GetGeneration())
291 if err := r.updateStatus(ctx, resource); err != nil {
292 return err
293 }
294 r.recordEvent(resource, corev1.EventTypeNormal, k8s.Updating, k8s.UpdatingMessage)
295 return nil
296 }
297
298 func (r *LifecycleHandler) HandleUpdateFailed(ctx context.Context, resource *k8s.Resource, err error) error {
299 msg := fmt.Sprintf("Update call failed: %v", err)
300 setCondition(resource, corev1.ConditionFalse, k8s.UpdateFailed, msg)
301 setObservedGeneration(resource, resource.GetGeneration())
302 if err := r.updateStatus(ctx, resource); err != nil {
303 return err
304 }
305 r.recordEvent(resource, corev1.EventTypeWarning, k8s.UpdateFailed, msg)
306 return fmt.Errorf("Update call failed: %w", err)
307 }
308
309 func (r *LifecycleHandler) HandleDeleting(ctx context.Context, resource *k8s.Resource) error {
310 setCondition(resource, corev1.ConditionFalse, k8s.Deleting, k8s.DeletingMessage)
311 setObservedGeneration(resource, resource.GetGeneration())
312 if err := r.updateStatus(ctx, resource); err != nil {
313 return err
314 }
315 r.recordEvent(resource, corev1.EventTypeNormal, k8s.Deleting, k8s.DeletingMessage)
316 return nil
317 }
318
319 func (r *LifecycleHandler) HandleDeleted(ctx context.Context, resource *k8s.Resource) error {
320 setCondition(resource, corev1.ConditionFalse, k8s.Deleted, k8s.DeletedMessage)
321 setObservedGeneration(resource, resource.GetGeneration())
322
323
324 if err := r.updateStatus(ctx, resource); err != nil {
325 return fmt.Errorf("error updating status: %w", err)
326 }
327 r.recordEvent(resource, corev1.EventTypeNormal, k8s.Deleted, k8s.DeletedMessage)
328
329 k8s.RemoveFinalizer(resource, k8s.ControllerFinalizerName)
330 return r.updateAPIServer(ctx, resource)
331 }
332
333 func (r *LifecycleHandler) HandleDeleteFailed(ctx context.Context, resource *k8s.Resource, err error) error {
334 msg := fmt.Sprintf(k8s.DeleteFailedMessageTmpl, err)
335 setCondition(resource, corev1.ConditionFalse, k8s.DeleteFailed, msg)
336 setObservedGeneration(resource, resource.GetGeneration())
337 if err := r.updateStatus(ctx, resource); err != nil {
338 return err
339 }
340 r.recordEvent(resource, corev1.EventTypeWarning, k8s.DeleteFailed, msg)
341 return fmt.Errorf("Delete call failed: %w", err)
342 }
343
344 func (r *LifecycleHandler) HandleUnmanaged(ctx context.Context, resource *k8s.Resource) error {
345 msg := fmt.Sprintf(k8s.UnmanagedMessageTmpl, resource.GetNamespace())
346 setCondition(resource, corev1.ConditionFalse, k8s.Unmanaged, msg)
347 setObservedGeneration(resource, resource.GetGeneration())
348 if err := r.updateStatus(ctx, resource); err != nil {
349 return err
350 }
351 r.recordEvent(resource, corev1.EventTypeWarning, k8s.Unmanaged, msg)
352 return nil
353 }
354
355 func setCondition(resource *k8s.Resource, status corev1.ConditionStatus, reason, msg string) {
356 if resource.Status == nil {
357 resource.Status = make(map[string]interface{})
358 }
359 newReadyCondition := k8s.NewCustomReadyCondition(status, reason, msg)
360
361
362
363 if currentReadyCondition, found := k8s.GetReadyCondition(resource); found {
364 if currentReadyCondition.Status == status {
365 newReadyCondition.LastTransitionTime = currentReadyCondition.LastTransitionTime
366 }
367 }
368 resource.Status["conditions"] = []k8sv1alpha1.Condition{newReadyCondition}
369 }
370
371 func setObservedGeneration(resource *k8s.Resource, observedGeneration int64) {
372 if resource.Status == nil {
373 resource.Status = make(map[string]interface{})
374 }
375 resource.Status["observedGeneration"] = observedGeneration
376 }
377
378 func (r *LifecycleHandler) recordEvent(resource *k8s.Resource, eventtype, reason, message string) error {
379 u, err := resource.MarshalAsUnstructured()
380 if err != nil {
381 return err
382 }
383 r.Recorder.Event(u, eventtype, reason, message)
384 return nil
385 }
386
387 func IsOrphaned(resource *k8s.Resource, parentReferenceConfigs []corekccv1alpha1.TypeConfig, kubeClient client.Client) (orphaned bool, parent *k8s.Resource, err error) {
388 if len(parentReferenceConfigs) == 0 {
389 return false, nil, nil
390 }
391 for _, refConfig := range parentReferenceConfigs {
392 resourceRefRaw, ok := resource.Spec[refConfig.Key]
393 if !ok {
394
395 continue
396 }
397 resourceRef := &corekccv1alpha1.ResourceReference{}
398 if err := util.Marshal(resourceRefRaw, resourceRef); err != nil {
399 return false, nil, fmt.Errorf("'spec.%v' is an unrecognized format", refConfig.Key)
400 }
401 if resourceRef.External != "" {
402 return false, nil, nil
403 }
404 parent, err := k8s.GetReferencedResource(resourceRef, refConfig.GVK, resource.GetNamespace(), kubeClient)
405 if err != nil {
406 if k8s.IsReferenceNotFoundError(err) {
407 return true, nil, nil
408 }
409 return false, nil, fmt.Errorf("error getting parent reference 'spec.%v': %v", refConfig.Key, err)
410 }
411 return false, parent, nil
412 }
413 return false, nil, fmt.Errorf("no parent reference found in resource")
414 }
415
View as plain text