1 package inform
2
3 import (
4 "context"
5 "fmt"
6
7 corev1 "k8s.io/api/core/v1"
8 ctrl "sigs.k8s.io/controller-runtime"
9 "sigs.k8s.io/controller-runtime/pkg/builder"
10 "sigs.k8s.io/controller-runtime/pkg/client"
11 "sigs.k8s.io/controller-runtime/pkg/controller"
12 "sigs.k8s.io/controller-runtime/pkg/event"
13 "sigs.k8s.io/controller-runtime/pkg/predicate"
14
15 "edge-infra.dev/pkg/k8s/runtime/conditions"
16 edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
17 "edge-infra.dev/pkg/k8s/runtime/patch"
18 "edge-infra.dev/pkg/lib/fog"
19 v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
20 "edge-infra.dev/pkg/sds/etcd/operator/internal/config"
21 "edge-infra.dev/pkg/sds/etcd/operator/internal/metrics"
22 "edge-infra.dev/pkg/sds/etcd/operator/internal/resources"
23 )
24
25 var operatorNamespace = "etcd-operator"
26
27
28
29 var Conditions = edgereconcile.Conditions{
30 Target: v1etcd.Health,
31 Owned: []string{
32 v1etcd.Health,
33 },
34 Summarize: []string{
35 v1etcd.Health,
36 },
37 NegativePolarity: []string{},
38 }
39
40 type Reconciler struct {
41 config.Config
42 edgereconcile.Conditions
43 *metrics.Metrics
44 }
45
46 type summarizeOptions struct {
47 patcher *patch.SerialPatcher
48 handlers *Handlers
49 recErr error
50 }
51
52 type resultOptions struct {
53 handlers *Handlers
54 }
55
56
57
58 type Handlers struct {
59 member *resources.EtcdMemberHandler
60 }
61
62
63 func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error {
64 r.Config = cfg
65 r.Conditions = Conditions
66 r.Metrics = metrics.New(r.Mgr, "inform")
67 r.Metrics.Custom.Run(initialMembers)
68 opts := controller.Options{
69 MaxConcurrentReconciles: 2,
70 }
71
72 return ctrl.NewControllerManagedBy(r.Mgr).
73 For(&v1etcd.EtcdMember{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
74 WithEventFilter(r.createEventFilter()).
75 WithOptions(opts).
76 Complete(r)
77 }
78
79
80
81 func (r *Reconciler) createEventFilter() predicate.Predicate {
82 return predicate.Funcs{
83 CreateFunc: func(_ event.CreateEvent) bool {
84 return true
85 },
86 UpdateFunc: func(_ event.UpdateEvent) bool {
87 return false
88 },
89 DeleteFunc: func(_ event.DeleteEvent) bool {
90 return false
91 },
92 }
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
107 log := fog.FromContext(ctx).WithName(r.Name)
108 ctx = fog.IntoContext(ctx, log)
109 log.V(1).Info("reconciling EtcdMember health status")
110
111 if err := r.WithDefaultEtcdRetryClient(ctx); err != nil {
112 return ctrl.Result{}, fmt.Errorf("failed to setup etcd retry client: %w", err)
113 }
114 defer r.EtcdRetryClient.Close()
115
116 handlers, err := r.generateHandlers(ctx, req)
117 if err != nil {
118 return ctrl.Result{}, err
119 }
120
121 if pass := r.checkPreconditions(ctx, handlers); !pass {
122 log.V(0).Info("preconditions not met")
123 return ctrl.Result{}, nil
124 }
125
126 patcher := patch.NewSerialPatcher(handlers.member.EtcdMember, handlers.member.Client.Client())
127 defer func() {
128 summarizeOpts := summarizeOptions{
129 patcher,
130 handlers,
131 recErr,
132 }
133 _, recErr = r.summarize(ctx, summarizeOpts)
134
135 resultOpts := resultOptions{
136 handlers,
137 }
138 r.recordResults(resultOpts)
139 }()
140
141 if err := r.reconcile(ctx, handlers); err != nil {
142 log.Error(err, "failure in EtcdMember informer reconciliation")
143 }
144
145
146
147 return ctrl.Result{RequeueAfter: handlers.member.RequeueAfter()}, nil
148 }
149
150
151
152 func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) {
153 handlers := &Handlers{}
154 handlers.member = resources.NewEtcdMemberHandlerBuilder().
155 WithClient(r.KubeRetryClient).
156 WithKey(req.NamespacedName).
157 HandlesEtcdMember().
158 Named(req.Name).
159 Build()
160
161 err := handlers.member.ReconcileLocal(ctx)
162 if client.IgnoreNotFound(err) != nil {
163 return nil, fmt.Errorf("failed to retrieve EtcdMember: %w", err)
164 }
165
166 if err == nil {
167 handlers.member.Found = true
168 }
169 return handlers, nil
170 }
171
172
173
174 func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) bool {
175 log := fog.FromContext(ctx)
176
177 if !handlers.member.Found {
178 return false
179 }
180
181 if handlers.member.IsSuspended() {
182 log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true")
183 return false
184 }
185 return true
186 }
187
188
189 func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error {
190 if !r.isEtcdHealthy(ctx, handlers) && handlers.member.ReconfigurationRequired() {
191 return r.KubeRetryClient.SafeDelete(ctx, handlers.member.EtcdMember)
192 }
193
194 secretLost, err := r.isSecretLost(ctx, handlers)
195 if err != nil {
196 return err
197 }
198 if secretLost {
199 return r.KubeRetryClient.SafeDelete(ctx, handlers.member.EtcdMember)
200 }
201
202 return nil
203 }
204
205 func (r *Reconciler) isEtcdHealthy(ctx context.Context, handlers *Handlers) bool {
206 resp, err := r.EtcdRetryClient.SafeStatus(ctx, handlers.member.EtcdMember.Spec.ClientURL())
207 if err != nil {
208
209 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Health, v1etcd.HealthFailedReason, "%s", v1etcd.HealthNoResponseMessage)
210 return false
211 }
212 if len(resp.Errors) != 0 {
213
214 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Health, v1etcd.HealthFailedReason, "%s", v1etcd.HealthFailedResponseMessage)
215 return false
216 }
217
218 conditions.MarkTrue(handlers.member.EtcdMember, v1etcd.Health, v1etcd.HealthSuccessReason, "%s", v1etcd.HealthSuccessMessage)
219 return true
220 }
221
222 func (r *Reconciler) isSecretLost(ctx context.Context, handlers *Handlers) (bool, error) {
223 if !conditions.IsTrue(handlers.member, v1etcd.Provisioned) {
224 return false, nil
225 }
226 if conditions.IsTrue(handlers.member, v1etcd.Installed) {
227 return false, nil
228 }
229
230 key := handlers.member.Key
231 key.Namespace = operatorNamespace
232 err := r.KubeRetryClient.SafeGet(ctx, key, &corev1.Secret{})
233 if client.IgnoreNotFound(err) != nil {
234 return false, err
235 }
236 if err != nil {
237 return true, nil
238 }
239 return false, nil
240 }
241
242
243 func (r *Reconciler) summarize(ctx context.Context, opts summarizeOptions) (ctrl.Result, error) {
244 s := edgereconcile.NewSummarizer(opts.patcher)
245 return s.SummarizeAndPatch(ctx, opts.handlers.member.EtcdMember,
246 edgereconcile.WithConditions(r.Conditions),
247 edgereconcile.WithResult(edgereconcile.ResultRequeue),
248 edgereconcile.WithError(opts.recErr),
249 edgereconcile.WithIgnoreNotFound(),
250 edgereconcile.WithProcessors(
251 edgereconcile.RecordReconcileReq,
252 edgereconcile.RecordResult,
253 ),
254 edgereconcile.WithFieldOwner(r.Name),
255 )
256 }
257
258
259 func (r *Reconciler) recordResults(opts resultOptions) {
260 conditions := []string{
261 v1etcd.Health,
262 v1etcd.Provisioned,
263 v1etcd.Installed,
264 v1etcd.Ready,
265 }
266 for _, c := range conditions {
267 condition, _ := opts.handlers.member.EtcdMember.GetCondition(c)
268 r.Metrics.RecordCondition(opts.handlers.member.EtcdMember, c, condition.Status, !opts.handlers.member.EtcdMember.GetDeletionTimestamp().IsZero())
269 }
270 }
271
View as plain text