package firewallctl import ( "context" "fmt" "time" "github.com/fluxcd/pkg/ssa" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/cli-utils/pkg/kstatus/polling" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/unstructured" v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1" ) var clusterFirewallConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ string(v1ien.ClusterFirewallController), }, Summarize: []string{ string(v1ien.ClusterFirewallController), }, NegativePolarity: []string{}, } type ClusterFirewallController Controller func NewClusterFirewallController(k8sClient client.Client, mgr ctrlmgr.Manager) *ClusterFirewallController { return &ClusterFirewallController{ name: "firewallctl", client: k8sClient, conditions: clusterFirewallConditions, metrics: metrics.New(mgr, "firewallctl"), resourceManager: ssa.NewResourceManager( k8sClient, polling.NewStatusPoller(k8sClient, k8sClient.RESTMapper(), polling.Options{}), ssa.Owner{Field: "firewallctl"}, ), requeueTime: requeueTime, } } func (c *ClusterFirewallController) SetUpWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1ien.ClusterFirewall{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(c) } // Reconciles on updates to ClusterFirewall objects func (c *ClusterFirewallController) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { reconcileStart := time.Now() log := ctrl.LoggerFrom(ctx).WithName(c.name) ctx = ctrl.LoggerInto(ctx, log) var result = reconcile.ResultEmpty clusterfw := &v1ien.ClusterFirewall{} if err := c.client.Get(ctx, req.NamespacedName, clusterfw); client.IgnoreNotFound(err) != nil { log.Error(err, "couldn't Get ClusterFirewall") return ctrl.Result{}, err } patcher := patch.NewSerialPatcher(clusterfw, c.client) defer func() { res, recErr = c.summarizer(ctx, patcher, clusterfw, result, recErr) c.metrics.RecordDuration(ctx, clusterfw, reconcileStart) c.metrics.RecordReconciling(ctx, clusterfw) c.metrics.RecordReadiness(ctx, clusterfw) }() log.Info("reconciling ClusterFirewall", "object", req.Name) conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwProgressing), "%s", v1ien.Reconciling.String()) // Check if finalizer exists if !controllerutil.ContainsFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer) { controllerutil.AddFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer) // Return immediately to requeue and reconcile object with finalizer added. result = reconcile.ResultRequeue return } // cleanup if object was deleted if !clusterfw.ObjectMeta.DeletionTimestamp.IsZero() { recErr = c.deleteClusterFirewall(ctx, clusterfw) return } // validate ClusterFirewall if recErr = clusterfw.ValidateRules(); recErr != nil { log.Error(recErr, "failed to validate config", "ClusterFirewall", clusterfw.ObjectMeta.Name) conditions.MarkTrue(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), v1ien.Invalid, recErr.Error()) return } if clusterfw.Status == nil { clusterfw.Status = &v1ien.ClusterFirewallStatus{} } // apply new NodeFirewalls if recErr = c.applyNodeFirewalls(ctx, clusterfw); recErr != nil { return } conditions.MarkTrue(clusterfw, status.ReadyCondition, string(v1ien.CfwSuccessful), "%s", v1ien.Succeeded.String()) result = reconcile.ResultSuccess return } func (c *ClusterFirewallController) applyNodeFirewalls(ctx context.Context, clusterfw *v1ien.ClusterFirewall) error { log := ctrl.LoggerFrom(ctx) // build NodeFirewalls nodeFirewalls, err := c.buildNodeFirewalls(ctx, clusterfw) if err != nil { log.Error(err, v1ien.BuildRequeueing) conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.BuildRequeueing) return err } // convert to unstructured array uobjs, err := NodeFirewallsToUnstructuredList(nodeFirewalls) if err != nil { log.Error(err, v1ien.ConvertRequeueing) conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.ConvertRequeueing) return err } // apply new NodeFirewalls changeset, err := c.applyObjects(ctx, uobjs) if err != nil { log.Error(err, v1ien.ApplyRequeueing) conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.ApplyRequeueing) return err } // store old inventory and create new one from changeset oldInventory := clusterfw.Status.Inventory.DeepCopy() clusterfw.Status.Inventory = inventory.New(inventory.FromChangeSet(changeset)) // remove stale Nodefirewalls err = c.pruneInventory(ctx, oldInventory, clusterfw.Status.Inventory) if err != nil { log.Error(err, v1ien.DeleteRequeueing) conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.DeleteRequeueing) return err } return nil } func (c *ClusterFirewallController) buildNodeFirewalls(ctx context.Context, clusterfw *v1ien.ClusterFirewall) ([]*v1ien.NodeFirewall, error) { nodefirewalls := []*v1ien.NodeFirewall{} nodes, err := c.getNodes(ctx) if err != nil { return nil, err } for _, node := range nodes.Items { nodeFirewall, needed := c.buildNodeFirewall(node, clusterfw) if !needed { continue } nodefirewalls = append(nodefirewalls, nodeFirewall) } return nodefirewalls, nil } func (c *ClusterFirewallController) applyObjects(ctx context.Context, uobjs []*unstructured.Unstructured) (*ssa.ChangeSet, error) { // apply NodeFirewalls changeset, err := c.resourceManager.ApplyAll(ctx, uobjs, ssa.ApplyOptions{Force: true}) if err != nil { return changeset, fmt.Errorf("failed to apply NodeFirewalls(s): %w", err) } if len(changeset.Entries) > 0 { ctrl.LoggerFrom(ctx).Info("applied NodeFirewalls", "changeset", changeset.ToMap()) } return changeset, nil } // Removes all objects in the oldInventory that are not present in the newInventory func (c *ClusterFirewallController) pruneInventory(ctx context.Context, oldInventory *inventory.ResourceInventory, newInventory *inventory.ResourceInventory) error { // check if nothing to remove if oldInventory == nil { return nil } diff, err := inventory.Diff(oldInventory, newInventory) if err != nil { return err } if len(diff) == 0 { return nil } deleteset, err := c.resourceManager.DeleteAll(ctx, diff, ssa.DefaultDeleteOptions()) if err != nil { return err } if len(deleteset.Entries) > 0 { ctrl.LoggerFrom(ctx).Info("removed NodeFirewalls", "changeset", deleteset.ToMap()) } return nil } func (c *ClusterFirewallController) getNodes(ctx context.Context) (*corev1.NodeList, error) { nodes := &corev1.NodeList{} return nodes, c.client.List(ctx, nodes) } func (c *ClusterFirewallController) buildNodeFirewall(node corev1.Node, clusterfw *v1ien.ClusterFirewall) (*v1ien.NodeFirewall, bool) { rules := []v1ien.NodeRule{} for _, clusterRule := range clusterfw.Spec.ClusterRules { if len(clusterRule.NodeSelector) == 0 || c.matchesSelector(node, clusterRule.NodeSelector) { rules = append(rules, clusterRule.NodeRule) } } if len(rules) > 0 { ownerRef := *metav1.NewControllerRef(clusterfw, v1ien.ClusterFirewallGVK) return v1ien.NewNodeFirewall(clusterfw.Name+"-"+node.Name, rules, ownerRef), true } return nil, false } func (c *ClusterFirewallController) matchesSelector(node corev1.Node, selector v1ien.NodeSelector) bool { for label, value := range selector { if node.Labels[label] != value { return false } } return true } // remove old Nodefirewalls func (c *ClusterFirewallController) deleteClusterFirewall(ctx context.Context, clusterfw *v1ien.ClusterFirewall) error { if clusterfw.Status != nil && clusterfw.Status.Inventory != nil { log := ctrl.LoggerFrom(ctx) objs, err := clusterfw.Status.Inventory.ListObjects() if err != nil { log.Error(err, "failed to list inventory", "inventory", *clusterfw.Status.Inventory) conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.InventoryRequeueing) return err } if len(objs) > 0 { changeset, err := c.resourceManager.DeleteAll(ctx, objs, ssa.DefaultDeleteOptions()) if err != nil { log.Error(err, "failed to delete inventory") conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.DeleteRequeueing) return err } log.Info("removed NodeFirewalls", "changeset", changeset.ToMap()) } } controllerutil.RemoveFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer) return nil } func NodeFirewallsToUnstructuredList(nodeFirewalls []*v1ien.NodeFirewall) ([]*unstructured.Unstructured, error) { uobjs := make([]*unstructured.Unstructured, 0, len(nodeFirewalls)) for _, nodeFirewall := range nodeFirewalls { uobj, err := unstructured.ToUnstructured(client.Object(nodeFirewall)) if err != nil { return uobjs, err } uobjs = append(uobjs, uobj) } return uobjs, nil } func (c *ClusterFirewallController) summarizer(ctx context.Context, patcher *patch.SerialPatcher, clusterFirewall *v1ien.ClusterFirewall, result reconcile.Result, recErr error) (ctrl.Result, error) { s := reconcile.NewSummarizer(patcher) return s.SummarizeAndPatch(ctx, clusterFirewall, reconcile.WithConditions(c.conditions), reconcile.WithResult(result), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordReconcileReq, reconcile.RecordResult, ), reconcile.WithFieldOwner(c.name), ) }