1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package policy
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "time"
22
23 iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
24 condition "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
25 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
26 kcciamclient "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
27 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
28 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
29 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
30 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
31 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
32 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
33 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/conversion"
34 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
35 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
36 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
37 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
38
39 mmdcl "github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
40 tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
41 "golang.org/x/sync/semaphore"
42 corev1 "k8s.io/api/core/v1"
43 apierrors "k8s.io/apimachinery/pkg/api/errors"
44 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
45 "k8s.io/apimachinery/pkg/runtime"
46 "k8s.io/apimachinery/pkg/types"
47 "k8s.io/client-go/rest"
48 "sigs.k8s.io/controller-runtime/pkg/builder"
49 "sigs.k8s.io/controller-runtime/pkg/client"
50 "sigs.k8s.io/controller-runtime/pkg/controller"
51 "sigs.k8s.io/controller-runtime/pkg/event"
52 "sigs.k8s.io/controller-runtime/pkg/handler"
53 klog "sigs.k8s.io/controller-runtime/pkg/log"
54 "sigs.k8s.io/controller-runtime/pkg/manager"
55 "sigs.k8s.io/controller-runtime/pkg/reconcile"
56 "sigs.k8s.io/controller-runtime/pkg/source"
57 )
58
59 const controllerName = "iampolicy-controller"
60
61 var logger = klog.Log.WithName(controllerName)
62
63
64
65 func Add(mgr manager.Manager, tfProvider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader,
66 converter *conversion.Converter, dclConfig *mmdcl.Config) error {
67 immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
68 resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
69 reconciler, err := NewReconciler(mgr, tfProvider, smLoader, converter, dclConfig, immediateReconcileRequests, resourceWatcherRoutines)
70 if err != nil {
71 return err
72 }
73 return add(mgr, reconciler)
74 }
75
76
77 func NewReconciler(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, converter *conversion.Converter, dclConfig *mmdcl.Config, immediateReconcileRequests chan event.GenericEvent, resourceWatcherRoutines *semaphore.Weighted) (*ReconcileIAMPolicy, error) {
78 r := ReconcileIAMPolicy{
79 LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
80 mgr.GetClient(),
81 mgr.GetEventRecorderFor(controllerName),
82 ),
83 Client: mgr.GetClient(),
84 iamClient: iamclient.New(provider, smLoader, mgr.GetClient(), converter, dclConfig),
85 config: mgr.GetConfig(),
86 immediateReconcileRequests: immediateReconcileRequests,
87 resourceWatcherRoutines: resourceWatcherRoutines,
88 scheme: mgr.GetScheme(),
89 ReconcilerMetrics: metrics.ReconcilerMetrics{
90 ResourceNameLabel: metrics.ResourceNameLabel,
91 },
92 }
93 return &r, nil
94 }
95
96
97 func add(mgr manager.Manager, r *ReconcileIAMPolicy) error {
98 obj := &iamv1beta1.IAMPolicy{}
99 _, err := builder.
100 ControllerManagedBy(mgr).
101 Named(controllerName).
102 WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
103 Watches(&source.Channel{Source: r.immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
104 For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
105 Build(r)
106 if err != nil {
107 return fmt.Errorf("error creating new controller: %v", err)
108 }
109 return nil
110 }
111
112 var _ reconcile.Reconciler = &ReconcileIAMPolicy{}
113
114
115 type ReconcileIAMPolicy struct {
116 lifecyclehandler.LifecycleHandler
117 client.Client
118 metrics.ReconcilerMetrics
119 iamClient *kcciamclient.IAMClient
120 scheme *runtime.Scheme
121 config *rest.Config
122
123 immediateReconcileRequests chan event.GenericEvent
124 resourceWatcherRoutines *semaphore.Weighted
125 }
126
127 type reconcileContext struct {
128 Reconciler *ReconcileIAMPolicy
129 Ctx context.Context
130 NamespacedName types.NamespacedName
131 }
132
133
134 func (r *ReconcileIAMPolicy) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
135 logger.Info("Running reconcile", "resource", request.NamespacedName)
136 startTime := time.Now()
137 ctx, cancel := context.WithTimeout(ctx, k8s.ReconcileDeadline)
138 defer cancel()
139 r.RecordReconcileWorkers(ctx, iamv1beta1.IAMPolicyGVK)
140 defer r.AfterReconcile()
141 defer r.RecordReconcileMetrics(ctx, iamv1beta1.IAMPolicyGVK, request.Namespace, request.Name, startTime, &err)
142
143 policy := &iamv1beta1.IAMPolicy{}
144 if err := r.Get(context.TODO(), request.NamespacedName, policy); err != nil {
145 if apierrors.IsNotFound(err) {
146
147
148 return reconcile.Result{}, nil
149 }
150
151 return reconcile.Result{}, err
152 }
153 runCtx := &reconcileContext{
154 Reconciler: r,
155 Ctx: ctx,
156 NamespacedName: request.NamespacedName,
157 }
158 requeue, err := runCtx.doReconcile(policy)
159 if err != nil {
160 return reconcile.Result{}, err
161 }
162 if requeue {
163 return reconcile.Result{Requeue: true}, nil
164 }
165 jitteredPeriod, err := jitter.GenerateJitteredReenqueuePeriod(iamv1beta1.IAMPolicyGVK, nil, nil, policy)
166 if err != nil {
167 return reconcile.Result{}, err
168 }
169 logger.Info("successfully finished reconcile", "resource", request.NamespacedName, "time to next reconciliation", jitteredPeriod)
170 return reconcile.Result{RequeueAfter: jitteredPeriod}, nil
171 }
172
173 func (r *reconcileContext) doReconcile(policy *iamv1beta1.IAMPolicy) (requeue bool, err error) {
174 defer execution.RecoverWithInternalError(&err)
175 if !policy.DeletionTimestamp.IsZero() {
176 if !k8s.HasFinalizer(policy, k8s.ControllerFinalizerName) {
177
178 return false, nil
179 }
180 if k8s.HasFinalizer(policy, k8s.DeletionDefenderFinalizerName) {
181
182 logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(policy))
183 return true, nil
184 }
185 if !k8s.HasAbandonAnnotation(policy) {
186 if err := r.Reconciler.iamClient.DeletePolicy(r.Ctx, policy); err != nil {
187 if !errors.Is(err, kcciamclient.NotFoundError) && !k8s.IsReferenceNotFoundError(err) {
188 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
189 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policy))
190 resource, err := toK8sResource(policy)
191 if err != nil {
192 return false, fmt.Errorf("error converting IAMPolicy to k8s resource while handling unresolvable dependencies event: %w", err)
193 }
194
195 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, unwrappedErr)
196 }
197 return false, r.handleDeleteFailed(policy, err)
198 }
199 }
200 }
201 return false, r.handleDeleted(policy)
202 }
203 if _, err := r.Reconciler.iamClient.GetPolicy(r.Ctx, policy); err != nil {
204 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
205 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policy))
206 return r.handleUnresolvableDeps(policy, unwrappedErr)
207 }
208 return false, r.handleUpdateFailed(policy, err)
209 }
210 k8s.EnsureFinalizers(policy, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName)
211
212 policy.Spec.Etag = ""
213 if _, err = r.Reconciler.iamClient.SetPolicy(r.Ctx, policy); err != nil {
214 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
215 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(policy))
216 return r.handleUnresolvableDeps(policy, unwrappedErr)
217 }
218 return false, r.handleUpdateFailed(policy, fmt.Errorf("error setting policy: %w", err))
219 }
220 if isAPIServerUpdateRequired(policy) {
221 return false, r.handleUpToDate(policy)
222 }
223 return false, nil
224 }
225
226 func (r *reconcileContext) handleUpToDate(policy *iamv1beta1.IAMPolicy) error {
227 resource, err := toK8sResource(policy)
228 if err != nil {
229 return fmt.Errorf("error converting IAMPolicy to k8s resource while handling %v event: %w", k8s.UpToDate, err)
230 }
231 return r.Reconciler.HandleUpToDate(r.Ctx, resource)
232 }
233
234 func (r *reconcileContext) handleUpdateFailed(policy *iamv1beta1.IAMPolicy, origErr error) error {
235 resource, err := toK8sResource(policy)
236 if err != nil {
237 logger.Error(err, "error converting IAMPolicy to k8s resource while handling event",
238 "resource", k8s.GetNamespacedName(policy), "event", k8s.UpdateFailed)
239 return fmt.Errorf("Update call failed: %w", origErr)
240 }
241 return r.Reconciler.HandleUpdateFailed(r.Ctx, resource, origErr)
242 }
243
244 func (r *reconcileContext) handleDeleted(policy *iamv1beta1.IAMPolicy) error {
245 resource, err := toK8sResource(policy)
246 if err != nil {
247 return fmt.Errorf("error converting IAMPolicy to k8s resource while handling %v event: %w", k8s.Deleted, err)
248 }
249 return r.Reconciler.HandleDeleted(r.Ctx, resource)
250 }
251
252 func (r *reconcileContext) handleDeleteFailed(policy *iamv1beta1.IAMPolicy, origErr error) error {
253 resource, err := toK8sResource(policy)
254 if err != nil {
255 logger.Error(err, "error converting IAMPolicy to k8s resource while handling event",
256 "resource", k8s.GetNamespacedName(policy), "event", k8s.DeleteFailed)
257 return fmt.Errorf(k8s.DeleteFailedMessageTmpl, origErr)
258 }
259 return r.Reconciler.HandleDeleteFailed(r.Ctx, resource, origErr)
260 }
261
262 func (r *ReconcileIAMPolicy) supportsImmediateReconciliations() bool {
263 return r.immediateReconcileRequests != nil
264 }
265
266 func (r *reconcileContext) handleUnresolvableDeps(policy *iamv1beta1.IAMPolicy, origErr error) (requeue bool, err error) {
267 resource, err := toK8sResource(policy)
268 if err != nil {
269 return false, fmt.Errorf("error converting IAMPolicy to k8s resource while handling unresolvable dependencies event: %w", err)
270 }
271 refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(origErr)
272 if !ok || !r.Reconciler.supportsImmediateReconciliations() {
273
274 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
275 }
276
277
278
279
280
281 if !r.Reconciler.resourceWatcherRoutines.TryAcquire(1) {
282
283 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
284 }
285
286
287
288
289 watcherLogger := logger.WithValues(
290 "referencingResource", resource.GetNamespacedName(),
291 "referencingResourceGVK", resource.GroupVersionKind())
292 watcher, err := resourcewatcher.New(r.Reconciler.config, watcherLogger)
293 if err != nil {
294 return false, r.Reconciler.HandleUpdateFailed(r.Ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
295 }
296
297 logger := logger.WithValues(
298 "resource", resource.GetNamespacedName(),
299 "resourceGVK", resource.GroupVersionKind(),
300 "reference", refNN,
301 "referenceGVK", refGVK)
302 go func() {
303
304
305 defer r.Reconciler.resourceWatcherRoutines.Release(1)
306 timeoutPeriod := jitter.GenerateWatchJitteredTimeoutPeriod()
307 ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
308 defer cancel()
309 logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
310 if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
311 logger.Error(err, "error while waiting for resource's reference to be ready")
312 return
313 }
314 logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
315 r.Reconciler.enqueueForImmediateReconciliation(resource.GetNamespacedName())
316 }()
317
318
319
320
321 return false, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
322 }
323
324
325
326
327
328 func (r *ReconcileIAMPolicy) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
329 genEvent := event.GenericEvent{}
330 genEvent.Object = &unstructured.Unstructured{}
331 genEvent.Object.SetNamespace(resourceNN.Namespace)
332 genEvent.Object.SetName(resourceNN.Name)
333 r.immediateReconcileRequests <- genEvent
334 }
335
336 func isAPIServerUpdateRequired(policy *iamv1beta1.IAMPolicy) bool {
337
338
339 conditions := []condition.Condition{
340 k8s.NewCustomReadyCondition(corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage),
341 }
342 if !k8s.ConditionSlicesEqual(policy.Status.Conditions, conditions) {
343 return true
344 }
345 if policy.Status.ObservedGeneration != policy.GetGeneration() {
346 return true
347 }
348 return false
349 }
350
351 func toK8sResource(policy *iamv1beta1.IAMPolicy) (*k8s.Resource, error) {
352 iamclient.SetGVK(policy)
353 resource := k8s.Resource{}
354 if err := util.Marshal(policy, &resource); err != nil {
355 return nil, fmt.Errorf("error marshalling IAMPolicy to k8s resource: %w", err)
356 }
357 return &resource, nil
358 }
359
View as plain text