1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package auditconfig
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "time"
22
23 iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/iam/v1beta1"
24 condition "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
25 kcciamclient "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/iam/iamclient"
26 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
27 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
28 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
29 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
30 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
31 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
32 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/conversion"
33 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
34 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
35 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
36 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"
37
38 mmdcl "github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
39 tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
40 "golang.org/x/sync/semaphore"
41 corev1 "k8s.io/api/core/v1"
42 apierrors "k8s.io/apimachinery/pkg/api/errors"
43 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
44 "k8s.io/apimachinery/pkg/runtime"
45 "k8s.io/apimachinery/pkg/types"
46 "k8s.io/client-go/rest"
47 "sigs.k8s.io/controller-runtime/pkg/builder"
48 "sigs.k8s.io/controller-runtime/pkg/client"
49 "sigs.k8s.io/controller-runtime/pkg/controller"
50 "sigs.k8s.io/controller-runtime/pkg/event"
51 "sigs.k8s.io/controller-runtime/pkg/handler"
52 klog "sigs.k8s.io/controller-runtime/pkg/log"
53 "sigs.k8s.io/controller-runtime/pkg/manager"
54 "sigs.k8s.io/controller-runtime/pkg/reconcile"
55 "sigs.k8s.io/controller-runtime/pkg/source"
56 )
57
58 const controllerName = "iamauditconfig-controller"
59
60 var logger = klog.Log.WithName(controllerName)
61
62 func Add(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader,
63 converter *conversion.Converter, dclConfig *mmdcl.Config) error {
64 immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
65 resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
66 reconciler, err := NewReconciler(mgr, provider, smLoader, converter, dclConfig, immediateReconcileRequests, resourceWatcherRoutines)
67 if err != nil {
68 return err
69 }
70 return add(mgr, reconciler)
71 }
72
73 func NewReconciler(mgr manager.Manager, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, converter *conversion.Converter, dclConfig *mmdcl.Config, immediateReconcileRequests chan event.GenericEvent, resourceWatcherRoutines *semaphore.Weighted) (*Reconciler, error) {
74 r := Reconciler{
75 LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
76 mgr.GetClient(),
77 mgr.GetEventRecorderFor(controllerName),
78 ),
79 Client: mgr.GetClient(),
80 iamClient: kcciamclient.New(provider, smLoader, mgr.GetClient(), converter, dclConfig).TFIAMClient,
81 scheme: mgr.GetScheme(),
82 config: mgr.GetConfig(),
83 immediateReconcileRequests: immediateReconcileRequests,
84 resourceWatcherRoutines: resourceWatcherRoutines,
85 }
86 return &r, nil
87 }
88
89
90 func add(mgr manager.Manager, r *Reconciler) error {
91 obj := &iamv1beta1.IAMAuditConfig{}
92 _, err := builder.
93 ControllerManagedBy(mgr).
94 Named(controllerName).
95 WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
96 Watches(&source.Channel{Source: r.immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
97 For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
98 Build(r)
99 if err != nil {
100 return fmt.Errorf("error creating new controller: %v", err)
101 }
102 return nil
103 }
104
105 var _ reconcile.Reconciler = &Reconciler{}
106
107 type Reconciler struct {
108 lifecyclehandler.LifecycleHandler
109 client.Client
110 metrics.ReconcilerMetrics
111 iamClient *kcciamclient.TFIAMClient
112 scheme *runtime.Scheme
113 config *rest.Config
114
115 immediateReconcileRequests chan event.GenericEvent
116 resourceWatcherRoutines *semaphore.Weighted
117 }
118
119 type reconcileContext struct {
120 Reconciler *Reconciler
121 Ctx context.Context
122 NamespacedName types.NamespacedName
123 }
124
125 func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
126 logger.Info("Starting reconcile", "resource", request.NamespacedName)
127 startTime := time.Now()
128 ctx, cancel := context.WithTimeout(ctx, k8s.ReconcileDeadline)
129 defer cancel()
130 r.RecordReconcileWorkers(ctx, iamv1beta1.IAMAuditConfigGVK)
131 defer r.AfterReconcile()
132 defer r.RecordReconcileMetrics(ctx, iamv1beta1.IAMAuditConfigGVK, request.Namespace, request.Name, startTime, &err)
133
134 var auditConfig iamv1beta1.IAMAuditConfig
135 if err := r.Get(context.TODO(), request.NamespacedName, &auditConfig); err != nil {
136 if apierrors.IsNotFound(err) {
137 logger.Info("resource not found in API server; finishing reconcile", "resource", request.NamespacedName)
138 return reconcile.Result{}, nil
139 }
140 return reconcile.Result{}, err
141 }
142 reconcileContext := &reconcileContext{
143 Reconciler: r,
144 Ctx: ctx,
145 NamespacedName: request.NamespacedName,
146 }
147 requeue, err := reconcileContext.doReconcile(&auditConfig)
148 if err != nil {
149 return reconcile.Result{}, err
150 }
151 if requeue {
152 return reconcile.Result{Requeue: true}, nil
153 }
154 jitteredPeriod, err := jitter.GenerateJitteredReenqueuePeriod(iamv1beta1.IAMAuditConfigGVK, nil, nil, &auditConfig)
155 if err != nil {
156 return reconcile.Result{}, err
157 }
158 logger.Info("successfully finished reconcile", "resource", request.NamespacedName, "time to next reconciliation", jitteredPeriod)
159 return reconcile.Result{RequeueAfter: jitteredPeriod}, nil
160 }
161
162 func (r *reconcileContext) doReconcile(auditConfig *iamv1beta1.IAMAuditConfig) (requeue bool, err error) {
163 defer execution.RecoverWithInternalError(&err)
164 if !auditConfig.DeletionTimestamp.IsZero() {
165 if !k8s.HasFinalizer(auditConfig, k8s.ControllerFinalizerName) {
166
167 return false, nil
168 }
169 if k8s.HasFinalizer(auditConfig, k8s.DeletionDefenderFinalizerName) {
170
171 logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(auditConfig))
172 return true, nil
173 }
174 if !k8s.HasAbandonAnnotation(auditConfig) {
175 if err := r.Reconciler.iamClient.DeleteAuditConfig(r.Ctx, auditConfig); err != nil {
176 if !errors.Is(err, kcciamclient.NotFoundError) && !k8s.IsReferenceNotFoundError(err) {
177 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
178 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(auditConfig))
179 resource, err := ToK8sResource(auditConfig)
180 if err != nil {
181 return false, fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling unresolvable dependencies event: %w", err)
182 }
183
184 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, unwrappedErr)
185 }
186 return false, r.handleDeleteFailed(auditConfig, err)
187 }
188 }
189 }
190 return false, r.handleDeleted(auditConfig)
191 }
192 if _, err := r.Reconciler.iamClient.GetAuditConfig(r.Ctx, auditConfig); err != nil {
193 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
194 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(auditConfig))
195 return r.handleUnresolvableDeps(auditConfig, unwrappedErr)
196 }
197 if !errors.Is(err, kcciamclient.NotFoundError) {
198 return false, r.handleUpdateFailed(auditConfig, err)
199 }
200 }
201 if !k8s.EnsureFinalizers(auditConfig, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName) {
202 if err := r.update(auditConfig); err != nil {
203 return false, r.handleUpdateFailed(auditConfig, err)
204 }
205 }
206 if _, err := r.Reconciler.iamClient.SetAuditConfig(r.Ctx, auditConfig); err != nil {
207 if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
208 logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(auditConfig))
209 return r.handleUnresolvableDeps(auditConfig, unwrappedErr)
210 }
211 return false, r.handleUpdateFailed(auditConfig, fmt.Errorf("error setting audit config: %w", err))
212 }
213 if isAPIServerUpdateRequired(auditConfig) {
214 return false, r.handleUpToDate(auditConfig)
215 }
216 return false, nil
217 }
218
219 func (r *reconcileContext) update(auditConfig *iamv1beta1.IAMAuditConfig) error {
220 if err := r.Reconciler.Client.Update(r.Ctx, auditConfig); err != nil {
221 return fmt.Errorf("error updating '%v' in API server: %w", r.NamespacedName, err)
222 }
223 return nil
224 }
225
226 func (r *reconcileContext) handleUpToDate(auditConfig *iamv1beta1.IAMAuditConfig) error {
227 resource, err := ToK8sResource(auditConfig)
228 if err != nil {
229 return fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling %v event: %w", k8s.UpToDate, err)
230 }
231 return r.Reconciler.HandleUpToDate(r.Ctx, resource)
232 }
233
234 func (r *reconcileContext) handleUpdateFailed(auditConfig *iamv1beta1.IAMAuditConfig, origErr error) error {
235 resource, err := ToK8sResource(auditConfig)
236 if err != nil {
237 logger.Error(err, "error converting IAMAuditConfig to k8s resource while handling event",
238 "resource", k8s.GetNamespacedName(auditConfig), "event", k8s.UpdateFailed)
239 return fmt.Errorf("Update call failed: %w", origErr)
240 }
241 return r.Reconciler.HandleUpdateFailed(r.Ctx, resource, origErr)
242 }
243
244 func (r *reconcileContext) handleDeleted(auditConfig *iamv1beta1.IAMAuditConfig) error {
245 resource, err := ToK8sResource(auditConfig)
246 if err != nil {
247 return fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling %v event: %w", k8s.Deleted, err)
248 }
249 return r.Reconciler.HandleDeleted(r.Ctx, resource)
250 }
251
252 func (r *reconcileContext) handleDeleteFailed(auditConfig *iamv1beta1.IAMAuditConfig, origErr error) error {
253 resource, err := ToK8sResource(auditConfig)
254 if err != nil {
255 logger.Error(err, "error converting IAMAuditConfig to k8s resource while handling event",
256 "resource", k8s.GetNamespacedName(auditConfig), "event", k8s.DeleteFailed)
257 return fmt.Errorf(k8s.DeleteFailedMessageTmpl, origErr)
258 }
259 return r.Reconciler.HandleDeleteFailed(r.Ctx, resource, origErr)
260 }
261
262 func (r *Reconciler) supportsImmediateReconciliations() bool {
263 return r.immediateReconcileRequests != nil
264 }
265
266 func (r *reconcileContext) handleUnresolvableDeps(auditConfig *iamv1beta1.IAMAuditConfig, origErr error) (requeue bool, err error) {
267 resource, err := ToK8sResource(auditConfig)
268 if err != nil {
269 return false, fmt.Errorf("error converting IAMAuditConfig to k8s resource while handling unresolvable dependencies event: %w", err)
270 }
271 refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(origErr)
272 if !ok || !r.Reconciler.supportsImmediateReconciliations() {
273
274 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
275 }
276
277
278
279
280
281 if !r.Reconciler.resourceWatcherRoutines.TryAcquire(1) {
282
283 return true, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
284 }
285
286
287
288
289 watcherLogger := logger.WithValues(
290 "referencingResource", resource.GetNamespacedName(),
291 "referencingResourceGVK", resource.GroupVersionKind())
292 watcher, err := resourcewatcher.New(r.Reconciler.config, watcherLogger)
293 if err != nil {
294 return false, r.Reconciler.HandleUpdateFailed(r.Ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
295 }
296
297 logger := logger.WithValues(
298 "resource", resource.GetNamespacedName(),
299 "resourceGVK", resource.GroupVersionKind(),
300 "reference", refNN,
301 "referenceGVK", refGVK)
302 go func() {
303
304
305 defer r.Reconciler.resourceWatcherRoutines.Release(1)
306 timeoutPeriod := jitter.GenerateWatchJitteredTimeoutPeriod()
307 ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
308 defer cancel()
309 logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
310 if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
311 logger.Error(err, "error while waiting for resource's reference to be ready")
312 return
313 }
314 logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
315 r.Reconciler.enqueueForImmediateReconciliation(resource.GetNamespacedName())
316 }()
317
318
319
320
321 return false, r.Reconciler.HandleUnresolvableDeps(r.Ctx, resource, origErr)
322 }
323
324
325
326
327
328 func (r *Reconciler) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
329 genEvent := event.GenericEvent{}
330 genEvent.Object = &unstructured.Unstructured{}
331 genEvent.Object.SetNamespace(resourceNN.Namespace)
332 genEvent.Object.SetName(resourceNN.Name)
333 r.immediateReconcileRequests <- genEvent
334 }
335
336 func isAPIServerUpdateRequired(auditConfig *iamv1beta1.IAMAuditConfig) bool {
337
338
339 conditions := []condition.Condition{
340 k8s.NewCustomReadyCondition(corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage),
341 }
342 if !k8s.ConditionSlicesEqual(auditConfig.Status.Conditions, conditions) {
343 return true
344 }
345 if auditConfig.Status.ObservedGeneration != auditConfig.GetGeneration() {
346 return true
347 }
348 return false
349 }
350
351 func ToK8sResource(auditConfig *iamv1beta1.IAMAuditConfig) (*k8s.Resource, error) {
352 kcciamclient.SetGVK(auditConfig)
353 resource := k8s.Resource{}
354 if err := util.Marshal(auditConfig, &resource); err != nil {
355 return nil, fmt.Errorf("error marshalling IAMAuditConfig to k8s resource: %w", err)
356 }
357 return &resource, nil
358 }
359
View as plain text