1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package partialpolicy
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "reflect"
22 "time"
23
24 iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
25 condition "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
26 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
27 kcciamclient "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
28 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
29 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
30 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
31 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
32 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
33 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
34 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/conversion"
35 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
36 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
37 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
38 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
39
40 mmdcl "github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
41 tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
42 "golang.org/x/sync/semaphore"
43 corev1 "k8s.io/api/core/v1"
44 apierrors "k8s.io/apimachinery/pkg/api/errors"
45 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
47 "k8s.io/apimachinery/pkg/runtime"
48 "k8s.io/apimachinery/pkg/types"
49 "k8s.io/client-go/rest"
50 "sigs.k8s.io/controller-runtime/pkg/builder"
51 "sigs.k8s.io/controller-runtime/pkg/client"
52 "sigs.k8s.io/controller-runtime/pkg/controller"
53 "sigs.k8s.io/controller-runtime/pkg/event"
54 "sigs.k8s.io/controller-runtime/pkg/handler"
55 klog "sigs.k8s.io/controller-runtime/pkg/log"
56 "sigs.k8s.io/controller-runtime/pkg/manager"
57 "sigs.k8s.io/controller-runtime/pkg/reconcile"
58 "sigs.k8s.io/controller-runtime/pkg/source"
59 )
60
61 const controllerName = "iampartialpolicy-controller"
62
63 var logger = klog.Log.WithName(controllerName)
64
65
66
67 func Add(mgr manager.Manager, tfProvider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader,
68 converter *conversion.Converter, dclConfig *mmdcl.Config) error {
69 immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
70 resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
71 reconciler, err := NewReconciler(mgr, tfProvider, smLoader, converter, dclConfig, immediateReconcileRequests, resourceWatcherRoutines)
72 if err != nil {
73 return err
74 }
75 return add(mgr, reconciler)
76 }
77
78
79 func NewReconciler(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, converter *conversion.Converter, dclConfig *mmdcl.Config, immediateReconcileRequests chan event.GenericEvent, resourceWatcherRoutines *semaphore.Weighted) (*ReconcileIAMPartialPolicy, error) {
80 r := ReconcileIAMPartialPolicy{
81 LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
82 mgr.GetClient(),
83 mgr.GetEventRecorderFor(controllerName),
84 ),
85 Client: mgr.GetClient(),
86 iamClient: iamclient.New(provider, smLoader, mgr.GetClient(), converter, dclConfig),
87 scheme: mgr.GetScheme(),
88 config: mgr.GetConfig(),
89 immediateReconcileRequests: immediateReconcileRequests,
90 resourceWatcherRoutines: resourceWatcherRoutines,
91 ReconcilerMetrics: metrics.ReconcilerMetrics{
92 ResourceNameLabel: metrics.ResourceNameLabel,
93 },
94 }
95 return &r, nil
96 }
97
98
99 func add(mgr manager.Manager, r *ReconcileIAMPartialPolicy) error {
100 obj := &iamv1beta1.IAMPartialPolicy{}
101 _, err := builder.
102 ControllerManagedBy(mgr).
103 Named(controllerName).
104 WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
105 Watches(&source.Channel{Source: r.immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
106 For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
107 Build(r)
108 if err != nil {
109 return fmt.Errorf("error creating new controller: %v", err)
110 }
111 return nil
112 }
113
114 var _ reconcile.Reconciler = &ReconcileIAMPartialPolicy{}
115
116
117 type ReconcileIAMPartialPolicy struct {
118 lifecyclehandler.LifecycleHandler
119 client.Client
120 metrics.ReconcilerMetrics
121 iamClient *kcciamclient.IAMClient
122 scheme *runtime.Scheme
123 config *rest.Config
124
125 immediateReconcileRequests chan event.GenericEvent
126 resourceWatcherRoutines *semaphore.Weighted
127 }
128
129 type reconcileContext struct {
130 Reconciler *ReconcileIAMPartialPolicy
131 Ctx context.Context
132 NamespacedName types.NamespacedName
133 }
134
135 func (r *ReconcileIAMPartialPolicy) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
136 logger.Info("Running reconcile", "resource", request.NamespacedName)
137 startTime := time.Now()
138 ctx, cancel := context.WithTimeout(ctx, k8s.ReconcileDeadline)
139 defer cancel()
140 r.RecordReconcileWorkers(ctx, iamv1beta1.IAMPartialPolicyGVK)
141 defer r.AfterReconcile()
142 defer r.RecordReconcileMetrics(ctx, iamv1beta1.IAMPartialPolicyGVK, request.Namespace, request.Name, startTime, &err)
143
144 policy := &iamv1beta1.IAMPartialPolicy{}
145 if err := r.Get(context.TODO(), request.NamespacedName, policy); err != nil {
146 if apierrors.IsNotFound(err) {
147
148
149 return reconcile.Result{}, nil
150 }
151
152 return reconcile.Result{}, err
153 }
154 runCtx := &reconcileContext{
155 Reconciler: r,
156 Ctx: ctx,
157 NamespacedName: request.NamespacedName,
158 }
159 requeue, err := runCtx.doReconcile(policy)
160 if err != nil {
161 return reconcile.Result{}, err
162 }
163 if requeue {
164 return reconcile.Result{Requeue: true}, nil
165 }
166 jitteredPeriod, err := jitter.GenerateJitteredReenqueuePeriod(iamv1beta1.IAMPartialPolicyGVK, nil, nil, policy)
167 if err != nil {
168 return reconcile.Result{}, err
169 }
170 logger.Info("successfully finished reconcile", "resource", request.NamespacedName, "time to next reconciliation", jitteredPeriod)
171 return reconcile.Result{RequeueAfter: jitteredPeriod}, nil
172 }
173
174 func (r *reconcileContext) doReconcile(pp *iamv1beta1.IAMPartialPolicy) (requeue bool, err error) {
175 defer execution.RecoverWithInternalError(&err)
176 if !pp.DeletionTimestamp.IsZero() {
177 return r.finalizeDeletion(pp)
178 }
179 iamPolicy := ToIAMPolicySkeleton(pp)
180 if iamPolicy, err = r.Reconciler.iamClient.GetPolicy(r.Ctx, iamPolicy); err != nil {
181 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
182 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(pp))
183 return r.handleUnresolvableDeps(pp, unwrappedErr)
184 }
185 return false, r.handleUpdateFailed(pp, err)
186 }
187 k8s.EnsureFinalizers(pp, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName)
188
189 resolver := IAMMemberIdentityResolver{Iamclient: r.Reconciler.iamClient, Ctx: r.Ctx}
190 desiredPartialPolicy, err := ComputePartialPolicyWithMergedBindings(pp, iamPolicy, &resolver)
191 if err != nil {
192 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
193 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(pp))
194 return r.handleUnresolvableDeps(pp, unwrappedErr)
195 }
196 return false, r.handleUpdateFailed(pp, fmt.Errorf("error computing partial policy: %w", err))
197 }
198 desiredPolicy := toDesiredPolicy(desiredPartialPolicy, iamPolicy)
199 if _, err = r.Reconciler.iamClient.SetPolicy(r.Ctx, desiredPolicy); err != nil {
200 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
201 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(pp))
202 return r.handleUnresolvableDeps(pp, unwrappedErr)
203 }
204 return false, r.handleUpdateFailed(pp, fmt.Errorf("error setting policy: %w", err))
205 }
206 if isAPIServerUpdateRequired(desiredPartialPolicy, pp) {
207 return false, r.handleUpToDate(desiredPartialPolicy)
208 }
209 return false, nil
210 }
211
212 func (r *reconcileContext) finalizeDeletion(pp *iamv1beta1.IAMPartialPolicy) (requeue bool, err error) {
213 if !k8s.HasFinalizer(pp, k8s.ControllerFinalizerName) {
214
215 return false, nil
216 }
217 if k8s.HasFinalizer(pp, k8s.DeletionDefenderFinalizerName) {
218
219 logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(pp))
220 return true, nil
221 }
222 if !k8s.HasAbandonAnnotation(pp) {
223 iamPolicy := ToIAMPolicySkeleton(pp)
224 if iamPolicy, err = r.Reconciler.iamClient.GetPolicy(r.Ctx, iamPolicy); err != nil {
225 if !errors.Is(err, kcciamclient.NotFoundError) && !k8s.IsReferenceNotFoundError(err) {
226 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
227 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(pp))
228 resource, err := toK8sResource(pp)
229 if err != nil {
230 return false, fmt.Errorf("error converting IAMPartialPolicy to k8s resource while handling unresolvable dependencies event: %w", err)
231 }
232
233 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, unwrappedErr)
234 }
235 return false, r.handleDeleteFailed(pp, err)
236 }
237
238
239 return false, r.handleDeleted(pp)
240 }
241
242
243 desiredPartialPolicy := ComputePartialPolicyWithRemainingBindings(pp, iamPolicy)
244 desiredPolicy := toDesiredPolicy(desiredPartialPolicy, iamPolicy)
245 if _, err = r.Reconciler.iamClient.SetPolicy(r.Ctx, desiredPolicy); err != nil {
246 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
247 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(pp))
248 return r.handleUnresolvableDeps(pp, unwrappedErr)
249 }
250 return false, r.handleDeleteFailed(pp, fmt.Errorf("error setting policy: %w", err))
251 }
252 pp = desiredPartialPolicy
253 }
254 return false, r.handleDeleted(pp)
255 }
256
257 func (r *reconcileContext) handleUpToDate(policy *iamv1beta1.IAMPartialPolicy) error {
258 resource, err := toK8sResource(policy)
259 if err != nil {
260 return fmt.Errorf("error converting IAMPartialPolicy to k8s resource while handling %v event: %w", k8s.UpToDate, err)
261 }
262 return r.Reconciler.HandleUpToDate(r.Ctx, resource)
263 }
264
265 func (r *reconcileContext) handleUpdateFailed(policy *iamv1beta1.IAMPartialPolicy, origErr error) error {
266 resource, err := toK8sResource(policy)
267 if err != nil {
268 logger.Error(err, "error converting IAMPartialPolicy to k8s resource while handling event",
269 "resource", k8s.GetNamespacedName(policy), "event", k8s.UpdateFailed)
270 return fmt.Errorf("Update call failed: %w", origErr)
271 }
272 return r.Reconciler.HandleUpdateFailed(r.Ctx, resource, origErr)
273 }
274
275 func (r *reconcileContext) handleDeleted(policy *iamv1beta1.IAMPartialPolicy) error {
276 resource, err := toK8sResource(policy)
277 if err != nil {
278 return fmt.Errorf("error converting IAMPartialPolicy to k8s resource while handling %v event: %w", k8s.Deleted, err)
279 }
280 return r.Reconciler.HandleDeleted(r.Ctx, resource)
281 }
282
283 func (r *reconcileContext) handleDeleteFailed(policy *iamv1beta1.IAMPartialPolicy, origErr error) error {
284 resource, err := toK8sResource(policy)
285 if err != nil {
286 logger.Error(err, "error converting IAMPartialPolicy to k8s resource while handling event",
287 "resource", k8s.GetNamespacedName(policy), "event", k8s.DeleteFailed)
288 return fmt.Errorf(k8s.DeleteFailedMessageTmpl, origErr)
289 }
290 return r.Reconciler.HandleDeleteFailed(r.Ctx, resource, origErr)
291 }
292
293 func (r *ReconcileIAMPartialPolicy) supportsImmediateReconciliations() bool {
294 return r.immediateReconcileRequests != nil
295 }
296
297 func (r *reconcileContext) handleUnresolvableDeps(policy *iamv1beta1.IAMPartialPolicy, origErr error) (requeue bool, err error) {
298 resource, err := toK8sResource(policy)
299 if err != nil {
300 return false, fmt.Errorf("error converting IAMPartialPolicy to k8s resource while handling unresolvable dependencies event: %w", err)
301 }
302 refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(origErr)
303 if !ok || !r.Reconciler.supportsImmediateReconciliations() {
304
305 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
306 }
307
308
309
310
311
312 if !r.Reconciler.resourceWatcherRoutines.TryAcquire(1) {
313
314 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
315 }
316
317
318
319
320 watcherLogger := logger.WithValues(
321 "referencingResource", resource.GetNamespacedName(),
322 "referencingResourceGVK", resource.GroupVersionKind())
323 watcher, err := resourcewatcher.New(r.Reconciler.config, watcherLogger)
324 if err != nil {
325 return false, r.Reconciler.HandleUpdateFailed(r.Ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
326 }
327
328 logger := logger.WithValues(
329 "resource", resource.GetNamespacedName(),
330 "resourceGVK", resource.GroupVersionKind(),
331 "reference", refNN,
332 "referenceGVK", refGVK)
333 go func() {
334
335
336 defer r.Reconciler.resourceWatcherRoutines.Release(1)
337 timeoutPeriod := jitter.GenerateWatchJitteredTimeoutPeriod()
338 ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
339 defer cancel()
340 logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
341 if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
342 logger.Error(err, "error while waiting for resource's reference to be ready")
343 return
344 }
345 logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
346 r.Reconciler.enqueueForImmediateReconciliation(resource.GetNamespacedName())
347 }()
348
349
350
351
352 return false, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
353 }
354
355
356
357
358
359 func (r *ReconcileIAMPartialPolicy) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
360 genEvent := event.GenericEvent{}
361 genEvent.Object = &unstructured.Unstructured{}
362 genEvent.Object.SetNamespace(resourceNN.Namespace)
363 genEvent.Object.SetName(resourceNN.Name)
364 r.immediateReconcileRequests <- genEvent
365 }
366
367
368 type IAMMemberIdentityResolver struct {
369 Iamclient *kcciamclient.IAMClient
370 Ctx context.Context
371 }
372
373 func (t *IAMMemberIdentityResolver) Resolve(member iamv1beta1.Member, memberFrom *iamv1beta1.MemberSource, defaultNamespace string) (string, error) {
374 return kcciamclient.ResolveMemberIdentity(t.Ctx, member, memberFrom, defaultNamespace, t.Iamclient.TFIAMClient)
375 }
376
377 func isAPIServerUpdateRequired(desired, original *iamv1beta1.IAMPartialPolicy) bool {
378
379
380 conditions := []condition.Condition{
381 k8s.NewCustomReadyCondition(corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage),
382 }
383 if !k8s.ConditionSlicesEqual(original.Status.Conditions, conditions) {
384 return true
385 }
386 if original.Status.ObservedGeneration != original.GetGeneration() {
387 return true
388 }
389 if !reflect.DeepEqual(desired.Status.LastAppliedBindings, original.Status.LastAppliedBindings) {
390 return true
391 }
392 if !reflect.DeepEqual(desired.Status.AllBindings, original.Status.AllBindings) {
393 return true
394 }
395 return false
396 }
397
398 func toK8sResource(policy *iamv1beta1.IAMPartialPolicy) (*k8s.Resource, error) {
399 iamclient.SetGVK(policy)
400 resource := k8s.Resource{}
401 if err := util.Marshal(policy, &resource); err != nil {
402 return nil, fmt.Errorf("error marshalling IAMPartialPolicy to k8s resource: %w", err)
403 }
404 return &resource, nil
405 }
406
407
408
409
410 func ToIAMPolicySkeleton(p *iamv1beta1.IAMPartialPolicy) *iamv1beta1.IAMPolicy {
411 res := &iamv1beta1.IAMPolicy{
412 TypeMeta: metav1.TypeMeta{
413 Kind: iamv1beta1.IAMPolicyGVK.Kind,
414 APIVersion: iamv1beta1.IAMAPIVersion,
415 },
416 }
417 res.ObjectMeta = *p.ObjectMeta.DeepCopy()
418 res.Spec.ResourceReference = p.Spec.ResourceReference
419 return res
420 }
421
422 func toDesiredPolicy(desiredPartialPolicy *iamv1beta1.IAMPartialPolicy, livePolicy *iamv1beta1.IAMPolicy) *iamv1beta1.IAMPolicy {
423 desiredPolicy := ToIAMPolicySkeleton(desiredPartialPolicy)
424 desiredPolicy.Spec.Bindings = desiredPartialPolicy.Status.AllBindings
425
426
427 desiredPolicy.Spec.Etag = livePolicy.Spec.Etag
428
429 desiredPolicy.Spec.AuditConfigs = livePolicy.Spec.AuditConfigs
430 return desiredPolicy
431 }
432
View as plain text