1 package firewallctl
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/fluxcd/pkg/ssa"
9 corev1 "k8s.io/api/core/v1"
10 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11 "sigs.k8s.io/cli-utils/pkg/kstatus/polling"
12 ctrl "sigs.k8s.io/controller-runtime"
13 "sigs.k8s.io/controller-runtime/pkg/builder"
14 "sigs.k8s.io/controller-runtime/pkg/client"
15 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
16 ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
17 "sigs.k8s.io/controller-runtime/pkg/predicate"
18
19 "edge-infra.dev/pkg/k8s/meta/status"
20 "edge-infra.dev/pkg/k8s/runtime/conditions"
21 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
22 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
23 "edge-infra.dev/pkg/k8s/runtime/inventory"
24 "edge-infra.dev/pkg/k8s/runtime/patch"
25 "edge-infra.dev/pkg/k8s/unstructured"
26 v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
27 )
28
29 var clusterFirewallConditions = reconcile.Conditions{
30 Target: status.ReadyCondition,
31 Owned: []string{
32 string(v1ien.ClusterFirewallController),
33 },
34 Summarize: []string{
35 string(v1ien.ClusterFirewallController),
36 },
37 NegativePolarity: []string{},
38 }
39
40 type ClusterFirewallController Controller
41
42 func NewClusterFirewallController(k8sClient client.Client, mgr ctrlmgr.Manager) *ClusterFirewallController {
43 return &ClusterFirewallController{
44 name: "firewallctl",
45 client: k8sClient,
46 conditions: clusterFirewallConditions,
47 metrics: metrics.New(mgr, "firewallctl"),
48 resourceManager: ssa.NewResourceManager(
49 k8sClient,
50 polling.NewStatusPoller(k8sClient, k8sClient.RESTMapper(), polling.Options{}),
51 ssa.Owner{Field: "firewallctl"},
52 ),
53 requeueTime: requeueTime,
54 }
55 }
56
57 func (c *ClusterFirewallController) SetUpWithManager(mgr ctrl.Manager) error {
58 return ctrl.NewControllerManagedBy(mgr).
59 For(&v1ien.ClusterFirewall{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
60 Complete(c)
61 }
62
63
64 func (c *ClusterFirewallController) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
65 reconcileStart := time.Now()
66 log := ctrl.LoggerFrom(ctx).WithName(c.name)
67 ctx = ctrl.LoggerInto(ctx, log)
68
69 var result = reconcile.ResultEmpty
70
71 clusterfw := &v1ien.ClusterFirewall{}
72 if err := c.client.Get(ctx, req.NamespacedName, clusterfw); client.IgnoreNotFound(err) != nil {
73 log.Error(err, "couldn't Get ClusterFirewall")
74 return ctrl.Result{}, err
75 }
76
77 patcher := patch.NewSerialPatcher(clusterfw, c.client)
78 defer func() {
79 res, recErr = c.summarizer(ctx, patcher, clusterfw, result, recErr)
80 c.metrics.RecordDuration(ctx, clusterfw, reconcileStart)
81 c.metrics.RecordReconciling(ctx, clusterfw)
82 c.metrics.RecordReadiness(ctx, clusterfw)
83 }()
84
85 log.Info("reconciling ClusterFirewall", "object", req.Name)
86
87 conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwProgressing), "%s", v1ien.Reconciling.String())
88
89
90 if !controllerutil.ContainsFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer) {
91 controllerutil.AddFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer)
92
93 result = reconcile.ResultRequeue
94 return
95 }
96
97
98 if !clusterfw.ObjectMeta.DeletionTimestamp.IsZero() {
99 recErr = c.deleteClusterFirewall(ctx, clusterfw)
100 return
101 }
102
103
104 if recErr = clusterfw.ValidateRules(); recErr != nil {
105 log.Error(recErr, "failed to validate config", "ClusterFirewall", clusterfw.ObjectMeta.Name)
106 conditions.MarkTrue(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), v1ien.Invalid, recErr.Error())
107 return
108 }
109
110 if clusterfw.Status == nil {
111 clusterfw.Status = &v1ien.ClusterFirewallStatus{}
112 }
113
114
115 if recErr = c.applyNodeFirewalls(ctx, clusterfw); recErr != nil {
116 return
117 }
118
119 conditions.MarkTrue(clusterfw, status.ReadyCondition, string(v1ien.CfwSuccessful), "%s", v1ien.Succeeded.String())
120 result = reconcile.ResultSuccess
121 return
122 }
123
124 func (c *ClusterFirewallController) applyNodeFirewalls(ctx context.Context, clusterfw *v1ien.ClusterFirewall) error {
125 log := ctrl.LoggerFrom(ctx)
126
127
128 nodeFirewalls, err := c.buildNodeFirewalls(ctx, clusterfw)
129 if err != nil {
130 log.Error(err, v1ien.BuildRequeueing)
131 conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.BuildRequeueing)
132 return err
133 }
134
135
136 uobjs, err := NodeFirewallsToUnstructuredList(nodeFirewalls)
137 if err != nil {
138 log.Error(err, v1ien.ConvertRequeueing)
139 conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.ConvertRequeueing)
140 return err
141 }
142
143
144 changeset, err := c.applyObjects(ctx, uobjs)
145 if err != nil {
146 log.Error(err, v1ien.ApplyRequeueing)
147 conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.ApplyRequeueing)
148 return err
149 }
150
151
152 oldInventory := clusterfw.Status.Inventory.DeepCopy()
153 clusterfw.Status.Inventory = inventory.New(inventory.FromChangeSet(changeset))
154
155
156 err = c.pruneInventory(ctx, oldInventory, clusterfw.Status.Inventory)
157 if err != nil {
158 log.Error(err, v1ien.DeleteRequeueing)
159 conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.DeleteRequeueing)
160 return err
161 }
162
163 return nil
164 }
165
166 func (c *ClusterFirewallController) buildNodeFirewalls(ctx context.Context, clusterfw *v1ien.ClusterFirewall) ([]*v1ien.NodeFirewall, error) {
167 nodefirewalls := []*v1ien.NodeFirewall{}
168
169 nodes, err := c.getNodes(ctx)
170 if err != nil {
171 return nil, err
172 }
173 for _, node := range nodes.Items {
174 nodeFirewall, needed := c.buildNodeFirewall(node, clusterfw)
175 if !needed {
176 continue
177 }
178 nodefirewalls = append(nodefirewalls, nodeFirewall)
179 }
180 return nodefirewalls, nil
181 }
182
183 func (c *ClusterFirewallController) applyObjects(ctx context.Context, uobjs []*unstructured.Unstructured) (*ssa.ChangeSet, error) {
184
185 changeset, err := c.resourceManager.ApplyAll(ctx, uobjs, ssa.ApplyOptions{Force: true})
186 if err != nil {
187 return changeset, fmt.Errorf("failed to apply NodeFirewalls(s): %w", err)
188 }
189
190 if len(changeset.Entries) > 0 {
191 ctrl.LoggerFrom(ctx).Info("applied NodeFirewalls", "changeset", changeset.ToMap())
192 }
193
194 return changeset, nil
195 }
196
197
198 func (c *ClusterFirewallController) pruneInventory(ctx context.Context, oldInventory *inventory.ResourceInventory, newInventory *inventory.ResourceInventory) error {
199
200 if oldInventory == nil {
201 return nil
202 }
203
204 diff, err := inventory.Diff(oldInventory, newInventory)
205 if err != nil {
206 return err
207 }
208
209 if len(diff) == 0 {
210 return nil
211 }
212
213 deleteset, err := c.resourceManager.DeleteAll(ctx, diff, ssa.DefaultDeleteOptions())
214 if err != nil {
215 return err
216 }
217 if len(deleteset.Entries) > 0 {
218 ctrl.LoggerFrom(ctx).Info("removed NodeFirewalls", "changeset", deleteset.ToMap())
219 }
220
221 return nil
222 }
223
224 func (c *ClusterFirewallController) getNodes(ctx context.Context) (*corev1.NodeList, error) {
225 nodes := &corev1.NodeList{}
226 return nodes, c.client.List(ctx, nodes)
227 }
228
229 func (c *ClusterFirewallController) buildNodeFirewall(node corev1.Node, clusterfw *v1ien.ClusterFirewall) (*v1ien.NodeFirewall, bool) {
230 rules := []v1ien.NodeRule{}
231
232 for _, clusterRule := range clusterfw.Spec.ClusterRules {
233 if len(clusterRule.NodeSelector) == 0 || c.matchesSelector(node, clusterRule.NodeSelector) {
234 rules = append(rules, clusterRule.NodeRule)
235 }
236 }
237
238 if len(rules) > 0 {
239 ownerRef := *metav1.NewControllerRef(clusterfw, v1ien.ClusterFirewallGVK)
240 return v1ien.NewNodeFirewall(clusterfw.Name+"-"+node.Name, rules, ownerRef), true
241 }
242
243 return nil, false
244 }
245
246 func (c *ClusterFirewallController) matchesSelector(node corev1.Node, selector v1ien.NodeSelector) bool {
247 for label, value := range selector {
248 if node.Labels[label] != value {
249 return false
250 }
251 }
252 return true
253 }
254
255
256 func (c *ClusterFirewallController) deleteClusterFirewall(ctx context.Context, clusterfw *v1ien.ClusterFirewall) error {
257 if clusterfw.Status != nil && clusterfw.Status.Inventory != nil {
258 log := ctrl.LoggerFrom(ctx)
259 objs, err := clusterfw.Status.Inventory.ListObjects()
260 if err != nil {
261 log.Error(err, "failed to list inventory", "inventory", *clusterfw.Status.Inventory)
262 conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.InventoryRequeueing)
263 return err
264 }
265 if len(objs) > 0 {
266 changeset, err := c.resourceManager.DeleteAll(ctx, objs, ssa.DefaultDeleteOptions())
267 if err != nil {
268 log.Error(err, "failed to delete inventory")
269 conditions.MarkFalse(clusterfw, status.ReadyCondition, string(v1ien.CfwFailed), "%s", v1ien.DeleteRequeueing)
270 return err
271 }
272 log.Info("removed NodeFirewalls", "changeset", changeset.ToMap())
273 }
274 }
275 controllerutil.RemoveFinalizer(clusterfw, v1ien.ClusterFirewallFinalizer)
276 return nil
277 }
278
279 func NodeFirewallsToUnstructuredList(nodeFirewalls []*v1ien.NodeFirewall) ([]*unstructured.Unstructured, error) {
280 uobjs := make([]*unstructured.Unstructured, 0, len(nodeFirewalls))
281
282 for _, nodeFirewall := range nodeFirewalls {
283 uobj, err := unstructured.ToUnstructured(client.Object(nodeFirewall))
284 if err != nil {
285 return uobjs, err
286 }
287 uobjs = append(uobjs, uobj)
288 }
289
290 return uobjs, nil
291 }
292
293 func (c *ClusterFirewallController) summarizer(ctx context.Context, patcher *patch.SerialPatcher, clusterFirewall *v1ien.ClusterFirewall, result reconcile.Result, recErr error) (ctrl.Result, error) {
294 s := reconcile.NewSummarizer(patcher)
295 return s.SummarizeAndPatch(ctx, clusterFirewall,
296 reconcile.WithConditions(c.conditions),
297 reconcile.WithResult(result),
298 reconcile.WithError(recErr),
299 reconcile.WithIgnoreNotFound(),
300 reconcile.WithProcessors(
301 reconcile.RecordReconcileReq,
302 reconcile.RecordResult,
303 ),
304 reconcile.WithFieldOwner(c.name),
305 )
306 }
307
View as plain text