...

Source file src/edge-infra.dev/pkg/sds/ien/k8s/controllers/firewallctl/controller.go

Documentation: edge-infra.dev/pkg/sds/ien/k8s/controllers/firewallctl

     1  package firewallctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"time"
     7  
     8  	"github.com/fluxcd/pkg/ssa"
     9  	corev1 "k8s.io/api/core/v1"
    10  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    11  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
    12  	ctrl "sigs.k8s.io/controller-runtime"
    13  	"sigs.k8s.io/controller-runtime/pkg/builder"
    14  	"sigs.k8s.io/controller-runtime/pkg/client"
    15  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    16  	ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
    17  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    18  
    19  	"edge-infra.dev/pkg/k8s/meta/status"
    20  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    21  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    22  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    23  	"edge-infra.dev/pkg/k8s/runtime/inventory"
    24  	"edge-infra.dev/pkg/k8s/runtime/patch"
    25  	"edge-infra.dev/pkg/k8s/unstructured"
    26  	v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
    27  )
    28  
    29  var clusterFirewallConditions = reconcile.Conditions{
    30  	Target: status.ReadyCondition,
    31  	Owned: []string{
    32  		string(v1ien.ClusterFirewallController),
    33  	},
    34  	Summarize: []string{
    35  		string(v1ien.ClusterFirewallController),
    36  	},
    37  	NegativePolarity: []string{},
    38  }
    39  
    40  type ClusterFirewallController Controller
    41  
    42  func NewClusterFirewallController(k8sClient client.Client, mgr ctrlmgr.Manager) *ClusterFirewallController {
    43  	return &ClusterFirewallController{
    44  		name:       "firewallctl",
    45  		client:     k8sClient,
    46  		conditions: clusterFirewallConditions,
    47  		metrics:    metrics.New(mgr, "firewallctl"),
    48  		resourceManager: ssa.NewResourceManager(
    49  			k8sClient,
    50  			polling.NewStatusPoller(k8sClient, k8sClient.RESTMapper(), polling.Options{}),
    51  			ssa.Owner{Field: "firewallctl"},
    52  		),
    53  		requeueTime: requeueTime,
    54  	}
    55  }
    56  
    57  func (c *ClusterFirewallController) SetUpWithManager(mgr ctrl.Manager) error {
    58  	return ctrl.NewControllerManagedBy(mgr).
    59  		For(&v1ien.ClusterFirewall{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
    60  		Complete(c)
    61  }
    62  
    63  // Reconciles on updates to ClusterFirewall objects
    64  func (c *ClusterFirewallController) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
    65  	reconcileStart := time.Now()
    66  	log := ctrl.LoggerFrom(ctx).WithName(c.name)
    67  	ctx = ctrl.LoggerInto(ctx, log)
    68  
    69  	var result = reconcile.ResultEmpty
    70  
    71  	clusterfw := &v1ien.ClusterFirewall{}
    72  	if err := c.client.Get(ctx, req.NamespacedName, clusterfw); client.IgnoreNotFound(err) != nil {
    73  		log.Error(err, "couldn't Get ClusterFirewall")
    74  		return ctrl.Result{}, err
    75  	}
    76  
    77  	patcher := patch.NewSerialPatcher(clusterfw, c.client)
    78  	defer func() {
    79  		res, recErr = c.summarizer(ctx, patcher, clusterfw, result, recErr)
    80  		c.metrics.RecordDuration(ctx, clusterfw, reconcileStart)
    81  		c.metrics.RecordReconciling(ctx, clusterfw)
    82  		c.metrics.RecordReadiness(ctx, clusterfw)
    83  	}()
    84  
    85  	log.Info("reconciling  ClusterFirewall", "object", req.Name)
    86  
    87  	conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwProgressing), "%s", v1ien.Reconciling.String())
    88  
    89  	// Check if finalizer exists
    90  	if !controllerutil.ContainsFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer) {
    91  		controllerutil.AddFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer)
    92  		// Return immediately to requeue and reconcile object with finalizer added.
    93  		result = reconcile.ResultRequeue
    94  		return
    95  	}
    96  
    97  	// cleanup if object was deleted
    98  	if !clusterfw.ObjectMeta.DeletionTimestamp.IsZero() {
    99  		recErr = c.deleteClusterFirewall(ctx, clusterfw)
   100  		return
   101  	}
   102  
   103  	// validate ClusterFirewall
   104  	if recErr = clusterfw.ValidateRules(); recErr != nil {
   105  		log.Error(recErr, "failed to validate config", "ClusterFirewall", clusterfw.ObjectMeta.Name)
   106  		conditions.MarkTrue(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), v1ien.Invalid, recErr.Error())
   107  		return
   108  	}
   109  
   110  	if clusterfw.Status == nil {
   111  		clusterfw.Status = &v1ien.ClusterFirewallStatus{}
   112  	}
   113  
   114  	// apply new NodeFirewalls
   115  	if recErr = c.applyNodeFirewalls(ctx, clusterfw); recErr != nil {
   116  		return
   117  	}
   118  
   119  	conditions.MarkTrue(clusterfw, status.ReadyCondition, string(v1ien.CfwSuccessful), "%s", v1ien.Succeeded.String())
   120  	result = reconcile.ResultSuccess
   121  	return
   122  }
   123  
   124  func (c *ClusterFirewallController) applyNodeFirewalls(ctx context.Context, clusterfw *v1ien.ClusterFirewall) error {
   125  	log := ctrl.LoggerFrom(ctx)
   126  
   127  	// build NodeFirewalls
   128  	nodeFirewalls, err := c.buildNodeFirewalls(ctx, clusterfw)
   129  	if err != nil {
   130  		log.Error(err, v1ien.BuildRequeueing)
   131  		conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.BuildRequeueing)
   132  		return err
   133  	}
   134  
   135  	// convert to unstructured array
   136  	uobjs, err := NodeFirewallsToUnstructuredList(nodeFirewalls)
   137  	if err != nil {
   138  		log.Error(err, v1ien.ConvertRequeueing)
   139  		conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.ConvertRequeueing)
   140  		return err
   141  	}
   142  
   143  	// apply new NodeFirewalls
   144  	changeset, err := c.applyObjects(ctx, uobjs)
   145  	if err != nil {
   146  		log.Error(err, v1ien.ApplyRequeueing)
   147  		conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.ApplyRequeueing)
   148  		return err
   149  	}
   150  
   151  	// store old inventory and create new one from changeset
   152  	oldInventory := clusterfw.Status.Inventory.DeepCopy()
   153  	clusterfw.Status.Inventory = inventory.New(inventory.FromChangeSet(changeset))
   154  
   155  	// remove stale Nodefirewalls
   156  	err = c.pruneInventory(ctx, oldInventory, clusterfw.Status.Inventory)
   157  	if err != nil {
   158  		log.Error(err, v1ien.DeleteRequeueing)
   159  		conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.DeleteRequeueing)
   160  		return err
   161  	}
   162  
   163  	return nil
   164  }
   165  
   166  func (c *ClusterFirewallController) buildNodeFirewalls(ctx context.Context, clusterfw *v1ien.ClusterFirewall) ([]*v1ien.NodeFirewall, error) {
   167  	nodefirewalls := []*v1ien.NodeFirewall{}
   168  
   169  	nodes, err := c.getNodes(ctx)
   170  	if err != nil {
   171  		return nil, err
   172  	}
   173  	for _, node := range nodes.Items {
   174  		nodeFirewall, needed := c.buildNodeFirewall(node, clusterfw)
   175  		if !needed {
   176  			continue
   177  		}
   178  		nodefirewalls = append(nodefirewalls, nodeFirewall)
   179  	}
   180  	return nodefirewalls, nil
   181  }
   182  
   183  func (c *ClusterFirewallController) applyObjects(ctx context.Context, uobjs []*unstructured.Unstructured) (*ssa.ChangeSet, error) {
   184  	// apply NodeFirewalls
   185  	changeset, err := c.resourceManager.ApplyAll(ctx, uobjs, ssa.ApplyOptions{Force: true})
   186  	if err != nil {
   187  		return changeset, fmt.Errorf("failed to apply NodeFirewalls(s): %w", err)
   188  	}
   189  
   190  	if len(changeset.Entries) > 0 {
   191  		ctrl.LoggerFrom(ctx).Info("applied NodeFirewalls", "changeset", changeset.ToMap())
   192  	}
   193  
   194  	return changeset, nil
   195  }
   196  
   197  // Removes all objects in the oldInventory that are not present in the newInventory
   198  func (c *ClusterFirewallController) pruneInventory(ctx context.Context, oldInventory *inventory.ResourceInventory, newInventory *inventory.ResourceInventory) error {
   199  	// check if nothing to remove
   200  	if oldInventory == nil {
   201  		return nil
   202  	}
   203  
   204  	diff, err := inventory.Diff(oldInventory, newInventory)
   205  	if err != nil {
   206  		return err
   207  	}
   208  
   209  	if len(diff) == 0 {
   210  		return nil
   211  	}
   212  
   213  	deleteset, err := c.resourceManager.DeleteAll(ctx, diff, ssa.DefaultDeleteOptions())
   214  	if err != nil {
   215  		return err
   216  	}
   217  	if len(deleteset.Entries) > 0 {
   218  		ctrl.LoggerFrom(ctx).Info("removed NodeFirewalls", "changeset", deleteset.ToMap())
   219  	}
   220  
   221  	return nil
   222  }
   223  
   224  func (c *ClusterFirewallController) getNodes(ctx context.Context) (*corev1.NodeList, error) {
   225  	nodes := &corev1.NodeList{}
   226  	return nodes, c.client.List(ctx, nodes)
   227  }
   228  
   229  func (c *ClusterFirewallController) buildNodeFirewall(node corev1.Node, clusterfw *v1ien.ClusterFirewall) (*v1ien.NodeFirewall, bool) {
   230  	rules := []v1ien.NodeRule{}
   231  
   232  	for _, clusterRule := range clusterfw.Spec.ClusterRules {
   233  		if len(clusterRule.NodeSelector) == 0 || c.matchesSelector(node, clusterRule.NodeSelector) {
   234  			rules = append(rules, clusterRule.NodeRule)
   235  		}
   236  	}
   237  
   238  	if len(rules) > 0 {
   239  		ownerRef := *metav1.NewControllerRef(clusterfw, v1ien.ClusterFirewallGVK)
   240  		return v1ien.NewNodeFirewall(clusterfw.Name+"-"+node.Name, rules, ownerRef), true
   241  	}
   242  
   243  	return nil, false
   244  }
   245  
   246  func (c *ClusterFirewallController) matchesSelector(node corev1.Node, selector v1ien.NodeSelector) bool {
   247  	for label, value := range selector {
   248  		if node.Labels[label] != value {
   249  			return false
   250  		}
   251  	}
   252  	return true
   253  }
   254  
   255  // remove old Nodefirewalls
   256  func (c *ClusterFirewallController) deleteClusterFirewall(ctx context.Context, clusterfw *v1ien.ClusterFirewall) error {
   257  	if clusterfw.Status != nil && clusterfw.Status.Inventory != nil {
   258  		log := ctrl.LoggerFrom(ctx)
   259  		objs, err := clusterfw.Status.Inventory.ListObjects()
   260  		if err != nil {
   261  			log.Error(err, "failed to list inventory", "inventory", *clusterfw.Status.Inventory)
   262  			conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.InventoryRequeueing)
   263  			return err
   264  		}
   265  		if len(objs) > 0 {
   266  			changeset, err := c.resourceManager.DeleteAll(ctx, objs, ssa.DefaultDeleteOptions())
   267  			if err != nil {
   268  				log.Error(err, "failed to delete inventory")
   269  				conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.DeleteRequeueing)
   270  				return err
   271  			}
   272  			log.Info("removed NodeFirewalls", "changeset", changeset.ToMap())
   273  		}
   274  	}
   275  	controllerutil.RemoveFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer)
   276  	return nil
   277  }
   278  
   279  func NodeFirewallsToUnstructuredList(nodeFirewalls []*v1ien.NodeFirewall) ([]*unstructured.Unstructured, error) {
   280  	uobjs := make([]*unstructured.Unstructured, 0, len(nodeFirewalls))
   281  
   282  	for _, nodeFirewall := range nodeFirewalls {
   283  		uobj, err := unstructured.ToUnstructured(client.Object(nodeFirewall))
   284  		if err != nil {
   285  			return uobjs, err
   286  		}
   287  		uobjs = append(uobjs, uobj)
   288  	}
   289  
   290  	return uobjs, nil
   291  }
   292  
   293  func (c *ClusterFirewallController) summarizer(ctx context.Context, patcher *patch.SerialPatcher, clusterFirewall *v1ien.ClusterFirewall, result reconcile.Result, recErr error) (ctrl.Result, error) {
   294  	s := reconcile.NewSummarizer(patcher)
   295  	return s.SummarizeAndPatch(ctx, clusterFirewall,
   296  		reconcile.WithConditions(c.conditions),
   297  		reconcile.WithResult(result),
   298  		reconcile.WithError(recErr),
   299  		reconcile.WithIgnoreNotFound(),
   300  		reconcile.WithProcessors(
   301  			reconcile.RecordReconcileReq,
   302  			reconcile.RecordResult,
   303  		),
   304  		reconcile.WithFieldOwner(c.name),
   305  	)
   306  }
   307  

View as plain text