1 package lifecycle
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 corev1 "k8s.io/api/core/v1"
9 k8stypes "k8s.io/apimachinery/pkg/types"
10 ctrl "sigs.k8s.io/controller-runtime"
11 "sigs.k8s.io/controller-runtime/pkg/builder"
12 "sigs.k8s.io/controller-runtime/pkg/client"
13 "sigs.k8s.io/controller-runtime/pkg/event"
14 "sigs.k8s.io/controller-runtime/pkg/handler"
15 "sigs.k8s.io/controller-runtime/pkg/predicate"
16 "sigs.k8s.io/controller-runtime/pkg/reconcile"
17
18 "edge-infra.dev/pkg/lib/fog"
19 v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
20 "edge-infra.dev/pkg/sds/etcd/operator/internal/config"
21 "edge-infra.dev/pkg/sds/etcd/operator/internal/metrics"
22 "edge-infra.dev/pkg/sds/etcd/operator/internal/resources"
23 )
24
25 type Reconciler struct {
26 config.Config
27 *metrics.Metrics
28 }
29
30 type resultOptions struct {
31 startTime time.Time
32 handlers *Handlers
33 recErr error
34 }
35
36
37
38 type Handlers struct {
39 member *resources.EtcdMemberHandler
40 node *resources.NodeHandler
41 }
42
43
44 func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error {
45 r.Config = cfg
46 r.Metrics = metrics.New(r.Mgr, "lifecycle")
47 r.Metrics.Custom.Run(initialMembers)
48
49 return ctrl.NewControllerManagedBy(r.Mgr).
50 For(&v1etcd.EtcdMember{}, builder.WithPredicates(r.etcdMemberEventFilter())).
51 Watches(
52 &corev1.Node{},
53 handler.EnqueueRequestsFromMapFunc(r.nodeReconcileRequests),
54 builder.WithPredicates(r.nodeEventFilter()),
55 ).
56 Complete(r)
57 }
58
59
60 func (r *Reconciler) nodeReconcileRequests(_ context.Context, obj client.Object) []ctrl.Request {
61 nodeName := obj.GetName()
62
63 return []reconcile.Request{
64 {
65 NamespacedName: k8stypes.NamespacedName{
66 Name: nodeName,
67 },
68 },
69 }
70 }
71
72
73
74 func (r *Reconciler) etcdMemberEventFilter() predicate.Predicate {
75 return predicate.Funcs{
76 CreateFunc: func(_ event.CreateEvent) bool {
77 return false
78 },
79 UpdateFunc: func(e event.UpdateEvent) bool {
80 etcdMember := e.ObjectNew.(*v1etcd.EtcdMember)
81
82 handler := resources.NewEtcdMemberHandlerBuilder().Build()
83 handler.DeepCopyFrom(etcdMember)
84
85 return !handler.DeletionTimestamp.IsZero()
86 },
87 DeleteFunc: func(_ event.DeleteEvent) bool {
88 return false
89 },
90 }
91 }
92
93
94
95 func (r *Reconciler) nodeEventFilter() predicate.Predicate {
96 return predicate.Funcs{
97
98 CreateFunc: func(_ event.CreateEvent) bool {
99 return true
100 },
101 UpdateFunc: func(e event.UpdateEvent) bool {
102 return hasIENVersionChanged(e.ObjectOld, e.ObjectNew) || hasNodeRoleChanged(e.ObjectOld, e.ObjectNew)
103 },
104 DeleteFunc: func(_ event.DeleteEvent) bool {
105 return false
106 },
107 }
108 }
109
110 func hasNodeRoleChanged(oldObj, newObj client.Object) bool {
111 oldNode := oldObj.(*corev1.Node)
112 oldHandler := resources.NewNodeHandlerBuilder().Build()
113 oldHandler.DeepCopyFrom(oldNode)
114
115 newNode := newObj.(*corev1.Node)
116 newHandler := resources.NewNodeHandlerBuilder().Build()
117 newHandler.DeepCopyFrom(newNode)
118
119 return oldHandler.IsWorker() != newHandler.IsWorker()
120 }
121
122 func hasIENVersionChanged(oldObj, newObj client.Object) bool {
123 oldNode := oldObj.(*corev1.Node)
124 oldHandler := resources.NewNodeHandlerBuilder().Build()
125 oldHandler.DeepCopyFrom(oldNode)
126
127 newNode := newObj.(*corev1.Node)
128 newHandler := resources.NewNodeHandlerBuilder().Build()
129 newHandler.DeepCopyFrom(newNode)
130
131 return newHandler.IENVersion() != oldHandler.IENVersion()
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
147 log := fog.FromContext(ctx).WithName(r.Name)
148 ctx = fog.IntoContext(ctx, log)
149 startTime := time.Now()
150 log.V(0).Info("started EtcdMember lifecycle reconciliation loop")
151 defer log.V(0).Info("ended EtcdMember lifecycle reconciliation loop")
152
153 if err := r.WithDefaultEtcdRetryClient(ctx); err != nil {
154 return ctrl.Result{}, fmt.Errorf("failed to setup etcd retry client: %w", err)
155 }
156 defer r.EtcdRetryClient.Close()
157
158 handlers, err := r.generateHandlers(ctx, req)
159 if err != nil {
160 return ctrl.Result{}, err
161 }
162
163 if pass := r.checkPreconditions(ctx, handlers); !pass {
164 return ctrl.Result{}, nil
165 }
166
167 var recErr error
168 defer func() {
169 resultOpts := resultOptions{
170 startTime,
171 handlers,
172 recErr,
173 }
174 r.recordResults(ctx, resultOpts)
175 }()
176
177 recErr = r.reconcile(ctx, handlers)
178 if recErr != nil {
179 return ctrl.Result{}, recErr
180 }
181
182 return ctrl.Result{}, nil
183 }
184
185
186
187 func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) {
188 handlers := &Handlers{}
189 if err := r.setNodeHandler(ctx, req, handlers); err != nil {
190 return nil, err
191 }
192 if err := r.setMemberHandler(ctx, req, handlers); err != nil {
193 return nil, err
194 }
195 return handlers, nil
196 }
197
198
199
200 func (r *Reconciler) setNodeHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
201 handlers.node = resources.NewNodeHandlerBuilder().
202 WithClient(r.KubeRetryClient).
203 WithKey(req.NamespacedName).
204 HandlesNode().
205 Named(req.Name).
206 Build()
207
208 err := handlers.node.ReconcileLocal(ctx)
209 if client.IgnoreNotFound(err) != nil {
210 return fmt.Errorf("failed to retrieve node: %w", err)
211 }
212
213 if err == nil {
214 handlers.node.Found = true
215 }
216 return nil
217 }
218
219
220
221 func (r *Reconciler) setMemberHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
222 handlers.member = resources.NewEtcdMemberHandlerBuilder().
223 WithClient(r.KubeRetryClient).
224 WithKey(req.NamespacedName).
225 HandlesEtcdMember().
226 Named(req.Name).
227 Build()
228
229 err := handlers.member.ReconcileLocal(ctx)
230 if client.IgnoreNotFound(err) != nil {
231 return fmt.Errorf("failed to retrieve EtcdMember: %w", err)
232 }
233
234 if err == nil {
235 handlers.member.Found = true
236 }
237 return nil
238 }
239
240
241
242 func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) bool {
243 log := fog.FromContext(ctx)
244
245
246 if handlers.node.Found && !handlers.node.IsWorker() {
247 log.V(0).Info("node does not have the worker role")
248 return false
249 }
250
251 if handlers.member.IsSuspended() {
252 log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true")
253 return false
254 }
255
256 return true
257 }
258
259
260
261 func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error {
262 log := fog.FromContext(ctx)
263
264
265 IENVersion := handlers.node.IENVersion()
266 if handlers.member.Found && handlers.member.DeletionRequired(IENVersion) {
267 if err := r.destroy(ctx, handlers); err != nil {
268 return err
269 }
270 log.V(0).Info("deleted EtcdMember", "eoaudit", "")
271 }
272
273
274 if !handlers.node.Found {
275 return nil
276 }
277 return r.create(ctx, handlers)
278 }
279
280
281 func (r *Reconciler) recordResults(ctx context.Context, opts resultOptions) {
282 r.Metrics.Default.RecordDuration(ctx, opts.handlers.member.EtcdMember, opts.startTime)
283 r.Metrics.Custom.RecordReconciliation(opts.handlers.member.EtcdMember)
284 r.Metrics.Custom.RecordReconciliationError(opts.recErr, opts.handlers.member.EtcdMember)
285 }
286
View as plain text