1 package controller
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "slices"
8 "strings"
9 "time"
10
11 "github.com/go-logr/logr"
12 "github.com/hashicorp/go-version"
13 "github.com/spf13/afero"
14 ctrl "sigs.k8s.io/controller-runtime"
15 "sigs.k8s.io/controller-runtime/pkg/builder"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17 "sigs.k8s.io/controller-runtime/pkg/predicate"
18
19 "edge-infra.dev/pkg/k8s/meta/status"
20
21 corev1 "k8s.io/api/core/v1"
22 "k8s.io/apimachinery/pkg/types"
23
24 kuberecorder "k8s.io/client-go/tools/record"
25
26 "edge-infra.dev/pkg/k8s/runtime/events"
27
28 "edge-infra.dev/pkg/k8s/runtime/conditions"
29 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
30 controllerReconciler "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
31 patch "edge-infra.dev/pkg/k8s/runtime/patch"
32 "edge-infra.dev/pkg/sds/patching/common"
33 v1ienpatch "edge-infra.dev/pkg/sds/patching/k8s/apis/ienpatch/v1"
34 patchMetrics "edge-infra.dev/pkg/sds/patching/k8s/controller/internal/metrics"
35 patchManager "edge-infra.dev/pkg/sds/patching/patchmanager"
36 )
37
38 var predicates = builder.WithPredicates(predicate.GenerationChangedPredicate{})
39
40 type PatchController struct {
41 client.Client
42 kuberecorder.EventRecorder
43 K8sClient client.Client
44 Log logr.Logger
45 Name string
46 RequeueTime time.Duration
47 Conditions controllerReconciler.Conditions
48 Metrics patchMetrics.Metrics
49 }
50
51
52
53 func NewPatchController(mgr ctrl.Manager, log logr.Logger) *PatchController {
54 noCacheClient, _ := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
55 k8sClient := mgr.GetClient()
56 mgr.GetClient()
57 hostname := os.Getenv("NODE_NAME")
58 patchConditions := reconcile.Conditions{
59 Target: status.ReadyCondition,
60 Owned: []string{
61 hostname,
62 },
63 Summarize: []string{},
64 NegativePolarity: []string{},
65 }
66 metrics := patchMetrics.PatchingMetrics(mgr)
67
68 eventRecorder := events.NewRecorder(mgr, ctrl.Log, "patchctl")
69
70 return &PatchController{k8sClient, eventRecorder, noCacheClient, log, hostname, requeueTime, patchConditions, metrics}
71 }
72
73 func (c *PatchController) SetUpWithManager(mgr ctrl.Manager) error {
74 return ctrl.NewControllerManagedBy(mgr).For(&v1ienpatch.IENPatch{}, predicates).Complete(c)
75 }
76
77
78 func (c *PatchController) Summarizer(ctx context.Context, ns types.NamespacedName, ienpatch *v1ienpatch.IENPatch, result controllerReconciler.Result, patchErr error) (res ctrl.Result, recErr error) {
79 condition := conditions.Get(ienpatch, c.Name)
80 if err := c.Client.Get(ctx, ns, ienpatch); err != nil {
81 c.Log.Error(err, "error getting ienpatch object")
82 }
83 patcher := patch.NewSerialPatcher(ienpatch, c.Client)
84
85 c.clearDeletedNodes(ienpatch)
86
87 if condition != nil {
88 conditions.Set(ienpatch, condition)
89 }
90
91 c.patchOtherNodes(ienpatch)
92 s := controllerReconciler.NewSummarizer(patcher)
93 res, recErr = s.SummarizeAndPatch(ctx, ienpatch,
94 controllerReconciler.WithConditions(c.Conditions),
95 controllerReconciler.WithResult(result),
96
97 controllerReconciler.WithError(patchErr),
98 controllerReconciler.WithIgnoreNotFound(),
99 controllerReconciler.WithProcessors(
100 controllerReconciler.RecordReconcileReq,
101 controllerReconciler.RecordResult,
102 ),
103 controllerReconciler.WithFieldOwner(c.Name),
104 )
105 if recErr != nil && strings.Contains(fmt.Sprint(recErr), "modified by a different process") {
106 c.Log.Info("Conflict when patching Ready status, retrying")
107 return c.Summarizer(ctx, ns, ienpatch, result, patchErr)
108 }
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 if res.Requeue {
125 res.RequeueAfter = c.RequeueTime
126 }
127 return res, nil
128 }
129
130 func (c *PatchController) clearDeletedNodes(ienpatch *v1ienpatch.IENPatch) {
131 var oldNodes []string
132 for _, node := range ienpatch.GetConditions() {
133
134 if node.Type == "Ready" {
135 continue
136 }
137 oldNodes = append(oldNodes, node.Type)
138 }
139
140 var deletedNodes []string
141 for _, oldNode := range oldNodes {
142 if slices.Contains(c.Conditions.Summarize, oldNode) {
143 continue
144 }
145 deletedNodes = append(deletedNodes, oldNode)
146 }
147
148 for _, node := range deletedNodes {
149 c.Log.Info(fmt.Sprintf("Removing Node %s from ienpatch", node))
150 conditions.Delete(ienpatch, node)
151 }
152 }
153
154 func (c *PatchController) setPendingStatus(ctx context.Context, namespacedName types.NamespacedName, patchmanager patchManager.PatchManager, result reconcile.Result) (res ctrl.Result, recErr error) {
155
156 conditions.MarkFalse(patchmanager.Ienpatch, c.Name, "Progressing", "Reconciling upgrade for %s to %s", c.Name, patchmanager.TargetVer)
157
158 if err := c.overrideOlderStatus(ctx, patchmanager.Ienpatch, patchmanager.TargetVer); err != nil {
159 c.Log.Error(err, "Unable to to override status")
160 }
161
162 return c.Summarizer(ctx, namespacedName, patchmanager.Ienpatch, result, nil)
163 }
164
165
166 func (c *PatchController) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
167 reconcileStart := time.Now()
168
169
170 ienpatch := v1ienpatch.IENPatch{}
171 if err := c.Client.Get(ctx, req.NamespacedName, &ienpatch); err != nil {
172 return ctrl.Result{RequeueAfter: c.RequeueTime}, client.IgnoreNotFound(err)
173 }
174
175 if err := c.readNodeList(ctx); err != nil {
176 return ctrl.Result{RequeueAfter: c.RequeueTime}, err
177 }
178
179 patchmanager, err := c.NewPatchManager(ctx, &ienpatch)
180 if err != nil {
181 return ctrl.Result{RequeueAfter: c.RequeueTime}, err
182 }
183
184 result := reconcile.ResultEmpty
185 metrics := patchMetrics.CreateNewControllerMetrics(&ienpatch, c.Name, patchmanager.CurrentVer, patchmanager.TargetVer)
186 defer func() {
187 res, recErr = c.Summarizer(ctx, req.NamespacedName, &ienpatch, result, err)
188 metrics.RecordMetrics(time.Since(reconcileStart).Seconds())
189 }()
190
191 result, err = c.Patch(ctx, req, patchmanager, metrics)
192
193 return res, recErr
194 }
195
196 func (c *PatchController) Patch(ctx context.Context, req ctrl.Request, patchmanager patchManager.PatchManager, metrics *patchMetrics.ControllerMetrics) (result controllerReconciler.Result, patchErr error) {
197 status, err := patchmanager.PreChecks()
198 if status == v1ienpatch.Pending {
199 _, err := c.setPendingStatus(ctx, req.NamespacedName, patchmanager, result)
200 if err != nil {
201 c.Log.Error(err, "Failed to set pending status")
202 }
203 status, err = patchmanager.PatchIEN()
204 }
205
206 metrics.RecordReadiness()
207
208 result = c.updateConditionsByStatus(status, patchmanager.Ienpatch, patchmanager.TargetVer)
209 return result, err
210 }
211
212 func (c *PatchController) NewPatchManager(ctx context.Context, ienpatch *v1ienpatch.IENPatch) (patchmanager patchManager.PatchManager, err error) {
213 osFs, _ := afero.NewOsFs().(afero.OsFs)
214
215
216 targetVersion := os.Getenv("TARGET_VERSION")
217 currentVersion, err := common.ReadCurrentVer()
218 if err != nil {
219 c.Log.Error(err, "error creating config struct")
220 return
221 }
222
223 cfg, err := common.NewConfig()
224 if err != nil {
225 c.Log.Error(err, "error creating config struct")
226 return
227 }
228
229
230 patchmanager = patchManager.PatchManager{
231 Ctx: ctx,
232 K8sClient: c.K8sClient,
233 Log: ctrl.LoggerFrom(ctx),
234 HostName: c.Name,
235 Ienpatch: ienpatch,
236 CurrentVer: currentVersion,
237 TargetVer: targetVersion,
238 Fs: osFs,
239 Cfg: cfg,
240 }
241 return
242 }
243
244
245 func (c *PatchController) patchOtherNodes(ienpatch *v1ienpatch.IENPatch) {
246 for _, node := range c.Conditions.Summarize {
247 if conditions.Get(ienpatch, node) == nil {
248 conditions.MarkFalse(ienpatch, node, "Pending", "Pending patchctl pod to start on this node")
249 }
250 }
251 }
252
253
254 func (c *PatchController) overrideOlderStatus(ctx context.Context, ienpatch *v1ienpatch.IENPatch, targetVersion string) error {
255 var changesMade = false
256 patcher := patch.NewSerialPatcher(ienpatch, c.Client)
257
258 for _, condition := range ienpatch.Status.Conditions {
259 messageVersion := strings.Split(condition.Message, " ")[len(strings.Split(condition.Message, " "))-1]
260
261 conditionVersionObj, err := version.NewVersion(messageVersion)
262 if err != nil {
263 c.Log.Error(err, fmt.Sprintf("Error parsing message version %s", messageVersion))
264 return err
265 }
266 targetVersionObj, err := version.NewVersion(targetVersion)
267 if err != nil {
268 c.Log.Error(err, fmt.Sprintf("Error parsing target version %s", targetVersion))
269 return err
270 }
271
272 if len(targetVersionObj.Prerelease()) != 0 {
273 return nil
274 }
275
276 if conditionVersionObj.Core().LessThan(targetVersionObj.Core()) {
277 conditions.MarkFalse(ienpatch, condition.Type, "Pending", "Pending patchctl pod to start for %s to %s", condition.Type, targetVersion)
278 changesMade = true
279 }
280 }
281
282 if !changesMade {
283 return nil
284 }
285
286 if err := patcher.Patch(ctx, ienpatch); err != nil {
287 c.Log.Error(err, "Failed to patch outdated status condition")
288 return err
289 }
290 return nil
291 }
292
293 func (c *PatchController) readNodeList(ctx context.Context) error {
294
295 nodes := corev1.NodeList{}
296 if err := c.K8sClient.List(ctx, &nodes); err != nil {
297 return err
298 }
299 for _, node := range nodes.Items {
300 c.Conditions.Summarize = append(c.Conditions.Summarize, node.Name)
301 }
302 return nil
303 }
304
305 func (c *PatchController) updateConditionsByStatus(status v1ienpatch.PatchStatus, ienpatch *v1ienpatch.IENPatch, targetVersion string) (results controllerReconciler.Result) {
306 switch status {
307 case v1ienpatch.Retry:
308 conditions.MarkFalse(ienpatch, c.Name, "Failed", "Retrying upgrade for %s to %s", c.Name, targetVersion)
309 return reconcile.ResultRequeue
310 case v1ienpatch.Pending:
311 conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Pending upgrade for %s to %s", c.Name, targetVersion)
312 case v1ienpatch.DownloadComplete:
313 conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Download complete for %s to %s", c.Name, targetVersion)
314 case v1ienpatch.Reboot:
315 conditions.MarkFalse(ienpatch, c.Name, "Progressing", "Pending reboot for %s to %s", c.Name, targetVersion)
316 case v1ienpatch.Failed:
317 conditions.MarkFalse(ienpatch, c.Name, "Failed", "Failed upgrade for %s to %s", c.Name, targetVersion)
318 case v1ienpatch.Success:
319 conditions.MarkTrue(ienpatch, c.Name, "Successful", "Successful upgrade for %s to %s", c.Name, targetVersion)
320 }
321
322 return reconcile.ResultEmpty
323 }
324
View as plain text