1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package policymember
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "time"
22
23 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
24 iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
25 condition "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
26 kcciamclient "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
27 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
28 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
29 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
30 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
31 kccratelimiter "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
32 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
33 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/conversion"
34 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
35 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
36 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
37 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
38
39 mmdcl "github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
40 tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
41 "golang.org/x/sync/semaphore"
42 corev1 "k8s.io/api/core/v1"
43 apierrors "k8s.io/apimachinery/pkg/api/errors"
44 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
45 "k8s.io/apimachinery/pkg/runtime"
46 "k8s.io/apimachinery/pkg/types"
47 "k8s.io/client-go/rest"
48 "sigs.k8s.io/controller-runtime/pkg/builder"
49 "sigs.k8s.io/controller-runtime/pkg/client"
50 "sigs.k8s.io/controller-runtime/pkg/controller"
51 "sigs.k8s.io/controller-runtime/pkg/event"
52 "sigs.k8s.io/controller-runtime/pkg/handler"
53 klog "sigs.k8s.io/controller-runtime/pkg/log"
54 "sigs.k8s.io/controller-runtime/pkg/manager"
55 "sigs.k8s.io/controller-runtime/pkg/ratelimiter"
56 "sigs.k8s.io/controller-runtime/pkg/reconcile"
57 "sigs.k8s.io/controller-runtime/pkg/source"
58 )
59
60 const controllerName = "iampolicymember-controller"
61
62 var logger = klog.Log.WithName(controllerName)
63
64
65
66 func Add(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader,
67 converter *conversion.Converter, dclConfig *mmdcl.Config) error {
68 immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
69 resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
70 reconciler, err := NewReconciler(mgr, provider, smLoader, converter, dclConfig, immediateReconcileRequests, resourceWatcherRoutines)
71 if err != nil {
72 return err
73 }
74 return add(mgr, reconciler)
75 }
76
77
78 func NewReconciler(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, converter *conversion.Converter, dclConfig *mmdcl.Config, immediateReconcileRequests chan event.GenericEvent, resourceWatcherRoutines *semaphore.Weighted) (*Reconciler, error) {
79 r := Reconciler{
80 LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
81 mgr.GetClient(),
82 mgr.GetEventRecorderFor(controllerName),
83 ),
84 Client: mgr.GetClient(),
85 iamClient: kcciamclient.New(provider, smLoader, mgr.GetClient(), converter, dclConfig),
86 scheme: mgr.GetScheme(),
87 config: mgr.GetConfig(),
88 immediateReconcileRequests: immediateReconcileRequests,
89 resourceWatcherRoutines: resourceWatcherRoutines,
90 requeueRateLimiter: kccratelimiter.RequeueRateLimiter(),
91 }
92
93 return &r, nil
94 }
95
96
97 func add(mgr manager.Manager, r *Reconciler) error {
98 obj := &iamv1beta1.IAMPolicyMember{}
99 _, err := builder.
100 ControllerManagedBy(mgr).
101 Named(controllerName).
102 WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: kccratelimiter.NewRateLimiter()}).
103 Watches(&source.Channel{Source: r.immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
104 For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
105 Build(r)
106 if err != nil {
107 return fmt.Errorf("error creating new controller: %v", err)
108 }
109 return nil
110 }
111
112 var _ reconcile.Reconciler = &Reconciler{}
113
114 type Reconciler struct {
115 lifecyclehandler.LifecycleHandler
116 client.Client
117 metrics.ReconcilerMetrics
118 iamClient *kcciamclient.IAMClient
119 scheme *runtime.Scheme
120 config *rest.Config
121
122 immediateReconcileRequests chan event.GenericEvent
123 resourceWatcherRoutines *semaphore.Weighted
124
125
126 requeueRateLimiter ratelimiter.RateLimiter
127 }
128
129 type reconcileContext struct {
130 Reconciler *Reconciler
131 Ctx context.Context
132 NamespacedName types.NamespacedName
133 }
134
135
136 func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
137 logger.Info("Starting reconcile", "resource", request.NamespacedName)
138 startTime := time.Now()
139 ctx, cancel := context.WithTimeout(ctx, k8s.ReconcileDeadline)
140 defer cancel()
141 r.RecordReconcileWorkers(ctx, iamv1beta1.IAMPolicyMemberGVK)
142 defer r.AfterReconcile()
143 defer r.RecordReconcileMetrics(ctx, iamv1beta1.IAMPolicyMemberGVK, request.Namespace, request.Name, startTime, &err)
144
145 var memberPolicy iamv1beta1.IAMPolicyMember
146 if err := r.Get(context.TODO(), request.NamespacedName, &memberPolicy); err != nil {
147 if apierrors.IsNotFound(err) {
148 return reconcile.Result{}, nil
149 }
150 return reconcile.Result{}, err
151 }
152 reconcileContext := &reconcileContext{
153 Reconciler: r,
154 Ctx: ctx,
155 NamespacedName: request.NamespacedName,
156 }
157 requeue, err := reconcileContext.doReconcile(&memberPolicy)
158 if err != nil {
159 return reconcile.Result{}, err
160 }
161 if requeue {
162 return reconcile.Result{Requeue: true}, nil
163 }
164
165 jitteredPeriod, err := jitter.GenerateJitteredReenqueuePeriod(iamv1beta1.IAMPolicyMemberGVK, nil, nil, &memberPolicy)
166 if err != nil {
167 return reconcile.Result{}, err
168 }
169 requeueDelay := r.requeueRateLimiter.When(request)
170 requeueAfter := jitteredPeriod + requeueDelay
171 logger.Info("successfully finished reconcile", "resource", request.NamespacedName, "time to next reconciliation", requeueAfter)
172 return reconcile.Result{RequeueAfter: requeueAfter}, nil
173 }
174
175 func (r *reconcileContext) doReconcile(policyMember *iamv1beta1.IAMPolicyMember) (requeue bool, err error) {
176 defer execution.RecoverWithInternalError(&err)
177 if !policyMember.DeletionTimestamp.IsZero() {
178 if !k8s.HasFinalizer(policyMember, k8s.ControllerFinalizerName) {
179
180 return false, nil
181 }
182 if k8s.HasFinalizer(policyMember, k8s.DeletionDefenderFinalizerName) {
183
184 logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(policyMember))
185 return true, nil
186 }
187 if !k8s.HasAbandonAnnotation(policyMember) {
188 if err := r.Reconciler.iamClient.DeletePolicyMember(r.Ctx, policyMember); err != nil {
189 if !errors.Is(err, kcciamclient.NotFoundError) && !k8s.IsReferenceNotFoundError(err) {
190 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
191 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policyMember))
192 resource, err := ToK8sResource(policyMember)
193 if err != nil {
194 return false, fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling unresolvable dependencies event: %w", err)
195 }
196
197 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, unwrappedErr)
198 }
199 return false, r.handleDeleteFailed(policyMember, err)
200 }
201 }
202 }
203 return false, r.handleDeleted(policyMember)
204 }
205 if _, err := r.Reconciler.iamClient.GetPolicyMember(r.Ctx, policyMember); err != nil {
206 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
207 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policyMember))
208 return r.handleUnresolvableDeps(policyMember, unwrappedErr)
209 }
210 if !errors.Is(err, kcciamclient.NotFoundError) {
211 return false, r.handleUpdateFailed(policyMember, err)
212 }
213 }
214 if !k8s.EnsureFinalizers(policyMember, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName) {
215 if err := r.update(policyMember); err != nil {
216 return false, r.handleUpdateFailed(policyMember, err)
217 }
218 }
219 if _, err := r.Reconciler.iamClient.SetPolicyMember(r.Ctx, policyMember); err != nil {
220 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
221 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policyMember))
222 return r.handleUnresolvableDeps(policyMember, unwrappedErr)
223 }
224 return false, r.handleUpdateFailed(policyMember, fmt.Errorf("error setting policy member: %w", err))
225 }
226 if isAPIServerUpdateRequired(policyMember) {
227 return false, r.handleUpToDate(policyMember)
228 }
229 return false, nil
230 }
231
232 func (r *reconcileContext) update(policyMember *iamv1beta1.IAMPolicyMember) error {
233 if err := r.Reconciler.Client.Update(r.Ctx, policyMember); err != nil {
234 return fmt.Errorf("error updating '%v' in API server: %w", r.NamespacedName, err)
235 }
236 return nil
237 }
238
239 func (r *reconcileContext) handleUpToDate(policyMember *iamv1beta1.IAMPolicyMember) error {
240 resource, err := ToK8sResource(policyMember)
241 if err != nil {
242 return fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling %v event: %w", k8s.UpToDate, err)
243 }
244 return r.Reconciler.HandleUpToDate(r.Ctx, resource)
245 }
246
247 func (r *reconcileContext) handleUpdateFailed(policyMember *iamv1beta1.IAMPolicyMember, origErr error) error {
248 resource, err := ToK8sResource(policyMember)
249 if err != nil {
250 logger.Error(err, "error converting IAMPolicyMember to k8s resource while handling event",
251 "resource", k8s.GetNamespacedName(policyMember), "event", k8s.UpdateFailed)
252 return fmt.Errorf("Update call failed: %w", origErr)
253 }
254 return r.Reconciler.HandleUpdateFailed(r.Ctx, resource, origErr)
255 }
256
257 func (r *reconcileContext) handleDeleted(policyMember *iamv1beta1.IAMPolicyMember) error {
258 resource, err := ToK8sResource(policyMember)
259 if err != nil {
260 return fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling %v event: %w", k8s.Deleted, err)
261 }
262 return r.Reconciler.HandleDeleted(r.Ctx, resource)
263 }
264
265 func (r *reconcileContext) handleDeleteFailed(policyMember *iamv1beta1.IAMPolicyMember, origErr error) error {
266 resource, err := ToK8sResource(policyMember)
267 if err != nil {
268 logger.Error(err, "error converting IAMPolicyMember to k8s resource while handling event",
269 "resource", k8s.GetNamespacedName(policyMember), "event", k8s.DeleteFailed)
270 return fmt.Errorf(k8s.DeleteFailedMessageTmpl, origErr)
271 }
272 return r.Reconciler.HandleDeleteFailed(r.Ctx, resource, origErr)
273 }
274
275 func (r *Reconciler) supportsImmediateReconciliations() bool {
276 return r.immediateReconcileRequests != nil
277 }
278
279 func (r *reconcileContext) handleUnresolvableDeps(policyMember *v1beta1.IAMPolicyMember, origErr error) (requeue bool, err error) {
280 resource, err := ToK8sResource(policyMember)
281 if err != nil {
282 return false, fmt.Errorf("error converting IAMPolicyMember to k8s resource while handling unresolvable dependencies event: %w", err)
283 }
284 refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(origErr)
285 if !ok || !r.Reconciler.supportsImmediateReconciliations() {
286
287 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
288 }
289
290
291
292
293
294 if !r.Reconciler.resourceWatcherRoutines.TryAcquire(1) {
295
296 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
297 }
298
299
300
301
302 watcherLogger := logger.WithValues(
303 "referencingResource", resource.GetNamespacedName(),
304 "referencingResourceGVK", resource.GroupVersionKind())
305 watcher, err := resourcewatcher.New(r.Reconciler.config, watcherLogger)
306 if err != nil {
307 return false, r.Reconciler.HandleUpdateFailed(r.Ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
308 }
309
310 logger := logger.WithValues(
311 "resource", resource.GetNamespacedName(),
312 "resourceGVK", resource.GroupVersionKind(),
313 "reference", refNN,
314 "referenceGVK", refGVK)
315 go func() {
316
317
318 defer r.Reconciler.resourceWatcherRoutines.Release(1)
319 timeoutPeriod := jitter.GenerateWatchJitteredTimeoutPeriod()
320 ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
321 defer cancel()
322 logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
323 if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
324 logger.Error(err, "error while waiting for resource's reference to be ready")
325 return
326 }
327 logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
328 r.Reconciler.enqueueForImmediateReconciliation(resource.GetNamespacedName())
329 }()
330
331
332
333
334 return false, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
335 }
336
337
338
339
340
341 func (r *Reconciler) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
342 genEvent := event.GenericEvent{}
343 genEvent.Object = &unstructured.Unstructured{}
344 genEvent.Object.SetNamespace(resourceNN.Namespace)
345 genEvent.Object.SetName(resourceNN.Name)
346 r.immediateReconcileRequests <- genEvent
347 }
348
349 func isAPIServerUpdateRequired(policyMember *iamv1beta1.IAMPolicyMember) bool {
350
351
352 conditions := []condition.Condition{
353 k8s.NewCustomReadyCondition(corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage),
354 }
355 if !k8s.ConditionSlicesEqual(policyMember.Status.Conditions, conditions) {
356 return true
357 }
358 if policyMember.Status.ObservedGeneration != policyMember.GetGeneration() {
359 return true
360 }
361 return false
362 }
363
364 func ToK8sResource(policyMember *iamv1beta1.IAMPolicyMember) (*k8s.Resource, error) {
365 kcciamclient.SetGVK(policyMember)
366 resource := k8s.Resource{}
367 if err := util.Marshal(policyMember, &resource); err != nil {
368 return nil, fmt.Errorf("error marshalling IAMPolicyMember to k8s resource: %w", err)
369 }
370 return &resource, nil
371 }
372
View as plain text