package controllers import ( "context" "time" certmgr "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" "github.com/fluxcd/pkg/ssa" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" kuberecorder "k8s.io/client-go/tools/record" "sigs.k8s.io/cli-utils/pkg/kstatus/polling" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "edge-infra.dev/pkg/edge/component/build" linkerd "edge-infra.dev/pkg/edge/linkerd" "edge-infra.dev/pkg/edge/linkerd/certs/identity" "edge-infra.dev/pkg/edge/linkerd/certs/trustanchor" l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1" l5dconfig "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/linkerd/config" "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/linkerd/install" "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/linkerd/proxyinjection" l5dmetrics "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/metrics" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/events" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/sds/ien/topology" l5dchart "edge-infra.dev/third_party/k8s/linkerd/helm" ) // +kubebuilder:rbac:groups="linkerd.edge.ncr.com",resources=linkerdworkloadinjections,verbs=get;list;create;delete;update;patch;watch // +kubebuilder:rbac:groups="linkerd.edge.ncr.com",resources=linkerdworkloadinjections/status,verbs=get;update;patch // // +kubebuilder:rbac:groups="linkerd.edge.ncr.com",resources=linkerds,verbs=get;list;update;patch;watch // +kubebuilder:rbac:groups="linkerd.edge.ncr.com",resources=linkerds/status,verbs=get;update;patch // // +kubebuilder:rbac:groups="cert-manager.io",resources=certificates;issuers,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="cert-manager.io",resources=certificates/status;issuers/status,verbs=get // // +kubebuilder:rbac:groups="apps",resources=deployments;daemonsets;statefulsets;replicasets,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="apps",resources=deployments/status,verbs=get // // +kubebuilder:rbac:groups="",resources=pods;serviceaccounts;secrets;configmaps;services;namespaces,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="",resources=services/status;namespaces/status,verbs=get // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // // +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch // // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles;rolebindings;clusterroles;clusterrolebindings,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles;clusterroles,verbs=bind;escalate // // +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions/status,verbs=get // // +kubebuilder:rbac:groups="batch",resources=cronjobs,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="batch",resources=cronjobs/status,verbs=get // // +kubebuilder:rbac:groups="policy",resources=podsecuritypolicies;poddisruptionbudgets,verbs=create;get;list;update;patch;watch;delete // // +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=validatingwebhookconfigurations;mutatingwebhookconfigurations,verbs=create;get;list;update;patch;watch;delete // // +kubebuilder:rbac:groups="policy.linkerd.io",resources=servers;serverauthorizations;authorizationpolicies,verbs=create;get;list;update;patch;watch;delete // // +kubebuilder:rbac:groups="monitoring.coreos.com",resources=podmonitors,verbs=create;get;list;update;patch;watch; // // +kubebuilder:rbac:groups="redpanda.vectorized.io",resources=clusters,verbs=get;list;patch;watch; // L5dReconciler reconciles Linkerd objects to in order to manage an installation // of the Linkerd service mesh. type L5dReconciler struct { client.Client kuberecorder.EventRecorder Log logr.Logger Manifests []*unstructured.Unstructured Name string ResourceManager *ssa.ResourceManager Conditions edgereconcile.Conditions Metrics metrics.Metrics Config *l5dconfig.Config } func newL5dReconciler(mgr ctrl.Manager, l5dcfg Config, metrics metrics.Metrics, eventRecorder *events.Recorder) *L5dReconciler { l5dctl := &L5dReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName(linkerd.LinkerdControllerName), Name: linkerd.LinkerdControllerName, Conditions: LinkerdConditions, Metrics: metrics, EventRecorder: eventRecorder, } var registry, l5dDir string = build.DefaultPublicContainerRegistry, "/etc/l5d" if l5dcfg.Registry != nil { registry = *l5dcfg.Registry } if l5dcfg.L5dDirPath != nil { l5dDir = *l5dcfg.L5dDirPath } l5dctl.Config = l5dconfig.NewConfig(registry, l5dDir) return l5dctl } var LinkerdConditions = edgereconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ status.ReconcilingCondition, status.StalledCondition, l5dv1alpha1.Apply, l5dv1alpha1.ProxyInjection, l5dv1alpha1.CertManager, l5dv1alpha1.TrustAnchor, }, Summarize: []string{ status.ReconcilingCondition, status.StalledCondition, l5dv1alpha1.Apply, l5dv1alpha1.ProxyInjection, l5dv1alpha1.CertManager, l5dv1alpha1.TrustAnchor, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } // SetupWithManager sets up L5dReconciler with the manager. It configures the // controller for the resources it creates that also need to be reconciled, such // as cert-manager CRD objects. func (r *L5dReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&l5dv1alpha1.Linkerd{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Owns(&certmgr.Certificate{}, builder.WithPredicates(predicate.NewPredicateFuncs(isLinkerdIssuer))). Owns(&certmgr.Issuer{}, builder.WithPredicates(predicate.NewPredicateFuncs(isLinkerdCertificate))). WithEventFilter(createEventFilter()). Watches( &corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.createReconcileRequests), builder.WithPredicates(topology.PredicateFilter()), ). Complete(r) } // predicate filter for l5d issuer object func isLinkerdIssuer(obj client.Object) bool { return obj.GetName() == linkerd.IssuerName && obj.GetNamespace() == linkerd.Namespace } // predicate filter for l5d certificate object func isLinkerdCertificate(obj client.Object) bool { return obj.GetName() == linkerd.TrustAnchorName && obj.GetNamespace() == linkerd.Namespace } func createEventFilter() predicate.Predicate { return predicate.Funcs{ CreateFunc: func(_ event.CreateEvent) bool { return true }, UpdateFunc: func(e event.UpdateEvent) bool { if node, ok := e.ObjectNew.(*corev1.Node); ok { // we only need to reconcile on node deletions return !node.DeletionTimestamp.IsZero() } return true }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } // Creates reconcile requests for all Linkerd objects func (r *L5dReconciler) createReconcileRequests(_ context.Context, _ client.Object) []reconcile.Request { linkerdList := &l5dv1alpha1.LinkerdList{} if err := r.Client.List(context.Background(), linkerdList); err != nil { return nil } reconcileRequests := []reconcile.Request{} for _, linkerd := range linkerdList.Items { reconcileRequests = append(reconcileRequests, reconcile.Request{ NamespacedName: client.ObjectKey{ Namespace: linkerd.GetNamespace(), Name: linkerd.GetName(), }, }) } return reconcileRequests } // Reconcile is the top-level reconciliation function for Linkerd objects, it // is mostly concerned with updating the status of the reconciled Linkerd object // and returning the appropriate results (e.g., requeuing) based on the result // of the internal reconciliation functions. // // Each internal reconciliation function will mutate the status of the object as // needed, so this function only needs to report the updated object status and // return accordingly based on errors. Wherever possible, we try to capture // errors as messages on the Linkerd status, so that you can get a clear view // of the objects state without looking at logs, and avoiding lots of error logging // boiler plate. func (r *L5dReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.setResourceManager() l5d := l5dv1alpha1.Linkerd{} if err := r.Get(ctx, req.NamespacedName, &l5d); err != nil { return ctrl.Result{}, err } patcher := patch.NewSerialPatcher(&l5d, r.Client) // update status to indicate reconciliation in progress if len(l5d.Status.Conditions) == 0 || l5d.Status.Version != l5dchart.Version { conditions.MarkReconciling(&l5d, status.ProgressingReason, "reconciliation in progress") if _, err := r.summarizer(ctx, patcher, &l5d); err != nil { return ctrl.Result{RequeueAfter: 1 * time.Minute}, err } } // reconcile the l5d object, creating a copy so that we avoid mutating our controller's cache l5d, reconcileErr := r.reconcile(ctx, *l5d.DeepCopy()) if reconcileErr != nil { // reflect the reconciled status on the API server if _, err := r.summarizer(ctx, patcher, &l5d); err != nil { r.Log.Error(reconcileErr, "Reconcile error") return ctrl.Result{Requeue: true}, err } return ctrl.Result{RequeueAfter: 1 * time.Minute}, reconcileErr } // by default, we want to reconcile every 5 minutes to be sure that our // linkerd installation manifests are not drifting or deleted by another // actor on the cluster conditions.Delete(&l5d, status.ReconcilingCondition) conditions.Delete(&l5d, status.StalledCondition) _, err := r.summarizer(ctx, patcher, &l5d) return ctrl.Result{RequeueAfter: 1 * time.Minute}, err } // reconcile contains the actual reconciliation logic for Linkerd objects. // unlike the public Reconcile function, it is only concerned with making the // required updates to the Linkerd object and returns it, so that the public // Reconcile function can handle updating status and the final return value. // // this function reconciles each Linkerd installation precondition in order and // then attempts to install Linkerd if none of the previous steps failed. // the appropriate status conditions are added by each step to the returned // Linkerd object func (r *L5dReconciler) reconcile(ctx context.Context, l5d l5dv1alpha1.Linkerd) (l5dv1alpha1.Linkerd, error) { var err error log := r.Log.WithName("Reconciler") // creating/clearing inventory oldStatus := l5d.Status.DeepCopy() l5d.Status.Inventory = inventory.New() if err := install.Namespace(ctx, r.Client, &l5d); err != nil { return l5d, err } _, err = trustanchor.CreateIfNotExists(ctx, r.Client, &l5d) if err != nil { return l5d, err } if err := r.Config.UpdateTopologyInfo(ctx, r.Client); err != nil { return l5d, err } if err := identity.Create(ctx, r.Client, &l5d, r.Config.IdentityIssuer); err != nil { return l5d, err } if err := install.Apply(ctx, r.Client, r.ResourceManager, &l5d, r.Config); err != nil { log.Error(err, "unable to apply the linkerd control plane manifests") return l5d, err } l5d, err = proxyinjection.Start(ctx, r.Client, r.ResourceManager, l5d) if err != nil { log.Error(err, "unable to start the proxy injection job") return l5d, err } if trustanchor.IsRotated(ctx, r.Client) { if err := identity.DeleteIdentity(ctx, r.Client); err != nil { return l5d, err } if err := trustanchor.RemoveRotationAnnotations(ctx, r.Client, &l5d); err != nil { log.Error(err, "unable to remove trust anchor rotation annotations") } } // diff the inventory and prune if oldStatus.Inventory != nil { diff, err := inventory.Diff(oldStatus.Inventory, l5d.GetInventory()) if err != nil { return l5d, err } if len(diff) > 0 { changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, ssa.DefaultDeleteOptions()) if err != nil { return l5d, err } log.Info("pruned objects", "changeset", linkerd.FilterChanged(changeSet)) } } // all done and return return l5d, nil } func (r *L5dReconciler) setResourceManager() { if r.ResourceManager == nil { mgr := ssa.NewResourceManager( r.Client, // be sure to consistently communicate this controllers ownership of objects // this should match the result of CreateOpts() polling.NewStatusPoller(r.Client, r.Client.RESTMapper(), polling.Options{}), ssa.Owner{Field: r.Name}, ) r.ResourceManager = mgr } } func (r *L5dReconciler) summarizer(ctx context.Context, patcher *patch.SerialPatcher, l5d *l5dv1alpha1.Linkerd) (res ctrl.Result, recErr error) { s := edgereconcile.NewSummarizer(patcher) res, recErr = s.SummarizeAndPatch(ctx, l5d, edgereconcile.WithConditions(r.Conditions), edgereconcile.WithResult(edgereconcile.ResultEmpty), edgereconcile.WithError(recErr), edgereconcile.WithIgnoreNotFound(), edgereconcile.WithProcessors( edgereconcile.RecordReconcileReq, edgereconcile.RecordResult, ), edgereconcile.WithFieldOwner(r.Name), edgereconcile.WithEventRecorder(r.EventRecorder), ) l5dmetrics.RecordLinkerdReadiness(l5d) return res, recErr }