1 package install
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "time"
8
9 "github.com/spf13/afero"
10 corev1 "k8s.io/api/core/v1"
11 "k8s.io/client-go/tools/record"
12 ctrl "sigs.k8s.io/controller-runtime"
13 "sigs.k8s.io/controller-runtime/pkg/builder"
14 "sigs.k8s.io/controller-runtime/pkg/client"
15 "sigs.k8s.io/controller-runtime/pkg/event"
16 "sigs.k8s.io/controller-runtime/pkg/predicate"
17
18 "edge-infra.dev/pkg/k8s/runtime/conditions"
19 edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
20 "edge-infra.dev/pkg/k8s/runtime/patch"
21 "edge-infra.dev/pkg/lib/fog"
22 v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
23 "edge-infra.dev/pkg/sds/etcd/operator/internal/config"
24 "edge-infra.dev/pkg/sds/etcd/operator/internal/metrics"
25 "edge-infra.dev/pkg/sds/etcd/operator/internal/resources"
26 )
27
28 var (
29 OperatorFilewall = "-p tcp --dport 6443 -j REJECT"
30 operatorNamespace = "etcd-operator"
31 lanOutageFlagFile = "/zynstra/config/.lan_outage_mode"
32 )
33
34
35
36 var Conditions = edgereconcile.Conditions{
37 Target: v1etcd.Ready,
38 Owned: []string{
39 v1etcd.Installed,
40 v1etcd.InProgress,
41 v1etcd.Reconciling,
42 },
43 Summarize: []string{
44 v1etcd.Installed,
45 v1etcd.InProgress,
46 v1etcd.Provisioned,
47 },
48 NegativePolarity: []string{
49 v1etcd.InProgress,
50 v1etcd.Reconciling,
51 },
52 }
53
54 type containerImageVersions struct {
55 Containers map[string]string `yaml:"containers"`
56 }
57
58 type Reconciler struct {
59 config.Config
60 edgereconcile.Conditions
61 *metrics.Metrics
62 }
63
64 type summarizeOptions struct {
65 patcher *patch.SerialPatcher
66 handlers *Handlers
67 recErr error
68 }
69
70 type resultOptions struct {
71 startTime time.Time
72 handlers *Handlers
73 recErr error
74 }
75
76
77
78 type Handlers struct {
79 member *resources.EtcdMemberHandler
80 secret *resources.SecretHandler
81 }
82
83
84 func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error {
85 r.Config = cfg
86 r.Conditions = Conditions
87 r.Metrics = metrics.New(r.Mgr, "install")
88 localMember := &v1etcd.EtcdMemberList{}
89 for _, member := range initialMembers.Items {
90 if member.Name == cfg.NodeName {
91 localMember.Items = append(localMember.Items, member)
92 }
93 }
94 r.Metrics.Custom.Run(localMember)
95
96 return ctrl.NewControllerManagedBy(r.Mgr).
97 For(&corev1.Secret{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
98 WithEventFilter(r.createEventFilter()).
99 Complete(r)
100 }
101
102
103
104 func (r *Reconciler) createEventFilter() predicate.Predicate {
105 return predicate.Funcs{
106 CreateFunc: func(e event.CreateEvent) bool {
107 secret := e.Object.(*corev1.Secret)
108 handler := resources.NewSecretHandlerBuilder().Build()
109 handler.DeepCopyFrom(secret)
110
111
112 return handler.Name == os.Getenv("NODE_NAME") && handler.OwnedByEtcdMember()
113 },
114 UpdateFunc: func(_ event.UpdateEvent) bool {
115 return false
116 },
117 DeleteFunc: func(_ event.DeleteEvent) bool {
118 return false
119 },
120 }
121 }
122
123
124
125
126
127
128
129
130 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
131 log := fog.FromContext(ctx).WithName(r.Name)
132 ctx = fog.IntoContext(ctx, log)
133 startTime := time.Now()
134 log.V(0).Info("started etcd configuration reconciliation loop")
135 defer log.V(0).Info("ended etcd configuration reconciliation loop")
136
137 handlers, err := r.generateHandlers(ctx, req)
138 if err != nil {
139 return ctrl.Result{}, err
140 }
141
142 pass, err := r.checkPreconditions(ctx, handlers)
143 if err != nil {
144 return ctrl.Result{}, err
145 }
146 if !pass {
147 return ctrl.Result{}, nil
148 }
149
150
151
152
153 if !handlers.member.IsProvisioned() {
154 return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
155 }
156
157 if err := r.setReconciling(ctx, handlers); err != nil {
158 return ctrl.Result{}, err
159 }
160
161 patcher := patch.NewSerialPatcher(handlers.member.EtcdMember, handlers.member.Client.Client())
162 defer func() {
163 summarizeOpts := summarizeOptions{
164 patcher,
165 handlers,
166 recErr,
167 }
168 _, recErr = r.summarize(ctx, summarizeOpts)
169
170 resultOpts := resultOptions{
171 startTime,
172 handlers,
173 recErr,
174 }
175 r.recordResults(ctx, resultOpts)
176 }()
177
178 if err := r.reconcile(ctx, handlers); err != nil {
179 return ctrl.Result{}, err
180 }
181 log.V(0).Info("etcd installed successfully", "eoaudit", "")
182
183
184 conditions.MarkTrue(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledSuccessReason, "%s", v1etcd.InstalledSuccessMessage)
185 return ctrl.Result{}, nil
186 }
187
188
189 func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) {
190 handlers := &Handlers{}
191 if err := r.setSecretHandler(ctx, req, handlers); err != nil {
192 return nil, err
193 }
194 if err := r.setMemberHandler(ctx, req, handlers); err != nil {
195 return nil, err
196 }
197 return handlers, nil
198 }
199
200
201
202 func (r *Reconciler) setSecretHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
203 handlers.secret = resources.NewSecretHandlerBuilder().
204 WithClient(r.KubeRetryClient).
205 WithKey(req.NamespacedName).
206 HandlesSecret().
207 Named(req.Name).
208 InNamespace(operatorNamespace).
209 Build()
210
211 err := handlers.secret.ReconcileLocal(ctx)
212 if client.IgnoreNotFound(err) != nil {
213 return fmt.Errorf("failed to retrieve node: %w", err)
214 }
215
216 if err == nil {
217 handlers.secret.Found = true
218 }
219 return nil
220 }
221
222
223
224 func (r *Reconciler) setMemberHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
225 handlers.member = resources.NewEtcdMemberHandlerBuilder().
226 WithClient(r.KubeRetryClient).
227 WithKey(req.NamespacedName).
228 HandlesEtcdMember().
229 Named(req.Name).
230 Build()
231
232 err := handlers.member.ReconcileLocal(ctx)
233 if client.IgnoreNotFound(err) != nil {
234 return fmt.Errorf("failed to retrieve EtcdMember: %w", err)
235 }
236
237 if err == nil {
238 handlers.member.Found = true
239 }
240 return nil
241 }
242
243
244
245 func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) (bool, error) {
246 log := fog.FromContext(ctx)
247 exists, err := afero.Exists(r.Fs, lanOutageFlagFile)
248 if err != nil {
249 return false, err
250 }
251 if exists {
252 log.V(0).Info("node is in LAN outage mode")
253 return false, nil
254 }
255
256
257 if !handlers.secret.Found {
258 log.V(0).Info("Secret not found")
259 return false, nil
260 }
261 if !handlers.member.Found {
262 log.V(0).Info("EtcdMember not found")
263 return false, nil
264 }
265
266 if handlers.member.IsSuspended() {
267 log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true")
268 return false, nil
269 }
270
271 if handlers.member.IsInstalled() {
272 log.V(0).Info("etcd already installed")
273 return false, nil
274 }
275 return true, nil
276 }
277
278
279 func (r *Reconciler) setReconciling(ctx context.Context, handlers *Handlers) error {
280 log := fog.FromContext(ctx)
281
282
283 return handlers.member.WithReconcileRemote(ctx, func(e *v1etcd.EtcdMember) {
284 if _, ok := e.GetCondition(v1etcd.Reconciling); ok {
285 return
286 }
287 log.V(1).Info("setting 'Reconciling' condition")
288 conditions.MarkTrue(e, v1etcd.Reconciling, v1etcd.InstalledReconcilingReason, "%s", v1etcd.InstalledReconcilingMessage)
289 })
290 }
291
292
293
294
295 func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error {
296 if err := r.reconcileFiles(handlers); err != nil {
297 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledSecretFailedMessage)
298 return fmt.Errorf("%s: %w", v1etcd.InstalledSecretFailedMessage, err)
299 }
300
301 if err := r.WithDefaultEtcdRetryClient(ctx); err != nil {
302 return fmt.Errorf("failed to setup etcd retry client: %w", err)
303 }
304 defer r.EtcdRetryClient.Close()
305
306 if err := handlers.member.ReconcileMembershipStatus(ctx, r.EtcdRetryClient); err != nil {
307 return fmt.Errorf("failed to update membership status: %w", err)
308 }
309
310 memberID, err := r.addMemberAsLearner(ctx, handlers)
311 if err != nil {
312 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledLearnerFailedMessage)
313 return fmt.Errorf("%s: %w", v1etcd.InstalledLearnerFailedMessage, err)
314 }
315
316 if err := r.withFirewall(ctx, handlers, r.configureEtcd); err != nil {
317 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledManifestFailedMessage)
318 return fmt.Errorf("%s: %w", v1etcd.InstalledManifestFailedMessage, err)
319 }
320
321 if err := r.promoteLearner(ctx, memberID); err != nil {
322 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledPromoteFailedMessage)
323 return fmt.Errorf("%s: %w", v1etcd.InstalledPromoteFailedMessage, err)
324 }
325
326
327 if err := client.IgnoreNotFound(handlers.secret.DeleteRemote(ctx)); err != nil {
328 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledDeleteFailedMessage)
329 return fmt.Errorf("%s: %w", v1etcd.InstalledDeleteFailedMessage, err)
330 }
331 return nil
332 }
333
334
335 func (r *Reconciler) summarize(ctx context.Context, opts summarizeOptions) (ctrl.Result, error) {
336 s := edgereconcile.NewSummarizer(opts.patcher)
337 return s.SummarizeAndPatch(ctx, opts.handlers.member.EtcdMember,
338 edgereconcile.WithConditions(r.Conditions),
339 edgereconcile.WithResult(edgereconcile.ResultEmpty),
340 edgereconcile.WithError(opts.recErr),
341 edgereconcile.WithIgnoreNotFound(),
342 edgereconcile.WithProcessors(
343 edgereconcile.RecordReconcileReq,
344 edgereconcile.RecordResult,
345 UnsetReconciling,
346 UnsetInProgress,
347 ),
348 edgereconcile.WithFieldOwner(r.Name),
349 )
350 }
351
352
353
354 func UnsetInProgress(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) {
355 log := fog.FromContext(ctx)
356 etcdMember, ok := obj.(*v1etcd.EtcdMember)
357 if !ok {
358 return
359 }
360 if err == nil {
361 log.V(1).Info("removing 'InProgress' condition")
362 conditions.Delete(etcdMember, v1etcd.InProgress)
363 }
364 }
365
366
367
368 func UnsetReconciling(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) {
369 log := fog.FromContext(ctx)
370 etcdMember, ok := obj.(*v1etcd.EtcdMember)
371 if !ok {
372 return
373 }
374 if err == nil {
375 log.V(1).Info("removing 'Reconciling' condition")
376 conditions.Delete(etcdMember, v1etcd.Reconciling)
377 }
378 }
379
380
381 func (r *Reconciler) recordResults(ctx context.Context, opts resultOptions) {
382 r.Metrics.Default.RecordDuration(ctx, opts.handlers.member.EtcdMember, opts.startTime)
383 r.Metrics.Custom.RecordReconciliation(opts.handlers.member.EtcdMember)
384 r.Metrics.Custom.RecordReconciliationError(opts.recErr, opts.handlers.member.EtcdMember)
385 }
386
View as plain text