1 package provision
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 "k8s.io/client-go/tools/record"
9 ctrl "sigs.k8s.io/controller-runtime"
10 "sigs.k8s.io/controller-runtime/pkg/builder"
11 "sigs.k8s.io/controller-runtime/pkg/client"
12 "sigs.k8s.io/controller-runtime/pkg/event"
13 "sigs.k8s.io/controller-runtime/pkg/predicate"
14
15 "edge-infra.dev/pkg/k8s/runtime/conditions"
16 edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
17 "edge-infra.dev/pkg/k8s/runtime/patch"
18 "edge-infra.dev/pkg/lib/fog"
19 v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
20 "edge-infra.dev/pkg/sds/etcd/operator/internal/config"
21 "edge-infra.dev/pkg/sds/etcd/operator/internal/metrics"
22 "edge-infra.dev/pkg/sds/etcd/operator/internal/resources"
23 )
24
25 var (
26 caCertPath = "/etc/kubernetes/pki/etcd/ca.crt"
27 caKeyPath = "/etc/kubernetes/pki/etcd/ca.key"
28 operatorNamespace = "etcd-operator"
29 )
30
31
32
33 var Conditions = edgereconcile.Conditions{
34 Target: v1etcd.Ready,
35 Owned: []string{
36 v1etcd.Provisioned,
37 v1etcd.InProgress,
38 v1etcd.Reconciling,
39 },
40 Summarize: []string{
41 v1etcd.Installed,
42 v1etcd.InProgress,
43 v1etcd.Provisioned,
44 },
45 NegativePolarity: []string{
46 v1etcd.InProgress,
47 v1etcd.Reconciling,
48 },
49 }
50
51 type Reconciler struct {
52 config.Config
53 edgereconcile.Conditions
54 *metrics.Metrics
55 }
56
57 type summarizeOptions struct {
58 patcher *patch.SerialPatcher
59 handlers *Handlers
60 recErr error
61 }
62
63 type resultOptions struct {
64 startTime time.Time
65 handlers *Handlers
66 recErr error
67 }
68
69
70
71 type Handlers struct {
72 member *resources.EtcdMemberHandler
73 secret *resources.SecretHandler
74 }
75
76
77 func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error {
78 r.Config = cfg
79 r.Conditions = Conditions
80 r.Metrics = metrics.New(r.Mgr, "provision")
81 r.Metrics.Custom.Run(initialMembers)
82
83 return ctrl.NewControllerManagedBy(r.Mgr).
84 For(&v1etcd.EtcdMember{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
85 WithEventFilter(r.createEventFilter()).
86 Complete(r)
87 }
88
89
90
91 func (r *Reconciler) createEventFilter() predicate.Predicate {
92 return predicate.Funcs{
93 CreateFunc: func(e event.CreateEvent) bool {
94 etcdMember := e.Object.(*v1etcd.EtcdMember)
95 handler := resources.NewEtcdMemberHandlerBuilder().Build()
96 handler.DeepCopyFrom(etcdMember)
97
98
99 return !handler.IsProvisioned()
100 },
101 UpdateFunc: func(_ event.UpdateEvent) bool {
102 return false
103 },
104 DeleteFunc: func(_ event.DeleteEvent) bool {
105 return false
106 },
107 }
108 }
109
110
111
112
113
114 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
115 log := fog.FromContext(ctx).WithName(r.Name)
116 ctx = fog.IntoContext(ctx, log)
117 startTime := time.Now()
118 log.V(0).Info("started Secret provision reconciliation loop")
119 defer log.V(0).Info("ended Secret provision reconciliation loop")
120
121 handlers, err := r.generateHandlers(ctx, req)
122 if err != nil {
123 return ctrl.Result{}, err
124 }
125
126 if pass := r.checkPreconditions(ctx, handlers); !pass {
127 return ctrl.Result{}, nil
128 }
129
130 if err := r.setConditions(ctx, handlers); err != nil {
131 return ctrl.Result{}, err
132 }
133
134 patcher := patch.NewSerialPatcher(handlers.member.EtcdMember, handlers.member.Client.Client())
135 defer func() {
136 summarizeOpts := summarizeOptions{
137 patcher,
138 handlers,
139 recErr,
140 }
141 _, recErr = r.summarize(ctx, summarizeOpts)
142
143 resultOpts := resultOptions{
144 startTime,
145 handlers,
146 recErr,
147 }
148 r.recordResults(ctx, resultOpts)
149 }()
150
151 if err := r.reconcile(ctx, handlers); err != nil {
152 return ctrl.Result{}, err
153 }
154 log.V(0).Info("secret provisioned successfully", "eoaudit", "")
155
156 conditions.MarkTrue(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedSuccessReason, "%s", v1etcd.ProvisionedSuccessMessage)
157 return ctrl.Result{}, nil
158 }
159
160
161 func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) {
162 handlers := &Handlers{}
163 if err := r.setSecretHandler(ctx, req, handlers); err != nil {
164 return nil, err
165 }
166 if err := r.setMemberHandler(ctx, req, handlers); err != nil {
167 return nil, err
168 }
169 return handlers, nil
170 }
171
172
173
174 func (r *Reconciler) setSecretHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
175 key := req.NamespacedName
176 key.Namespace = operatorNamespace
177 handlers.secret = resources.NewSecretHandlerBuilder().
178 WithClient(r.KubeRetryClient).
179 WithKey(key).
180 HandlesSecret().
181 Named(req.Name).
182 InNamespace(operatorNamespace).
183 Build()
184
185 err := handlers.secret.ReconcileLocal(ctx)
186 if client.IgnoreNotFound(err) != nil {
187 return fmt.Errorf("failed to retrieve node: %w", err)
188 }
189
190 if err == nil {
191 handlers.secret.Found = true
192 }
193 return nil
194 }
195
196
197
198 func (r *Reconciler) setMemberHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
199 handlers.member = resources.NewEtcdMemberHandlerBuilder().
200 WithClient(r.KubeRetryClient).
201 WithKey(req.NamespacedName).
202 HandlesEtcdMember().
203 Named(req.Name).
204 Build()
205
206 err := handlers.member.ReconcileLocal(ctx)
207 if client.IgnoreNotFound(err) != nil {
208 return fmt.Errorf("failed to retrieve EtcdMember: %w", err)
209 }
210
211 if err == nil {
212 handlers.member.Found = true
213 }
214 return nil
215 }
216
217
218
219 func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) bool {
220 log := fog.FromContext(ctx)
221
222 if !handlers.member.Found {
223 log.V(0).Info("EtcdMember not found")
224 return false
225 }
226
227 if handlers.member.IsSuspended() {
228 log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true")
229 return false
230 }
231
232 if handlers.member.IsProvisioned() {
233 log.V(0).Info("Secret is already prepared")
234 return false
235 }
236
237 return true
238 }
239
240
241 func (r *Reconciler) setConditions(ctx context.Context, handlers *Handlers) error {
242 log := fog.FromContext(ctx)
243
244
245
246
247
248 return handlers.member.WithReconcileRemote(ctx, func(e *v1etcd.EtcdMember) {
249 if _, ok := e.GetCondition(v1etcd.Reconciling); ok {
250 return
251 }
252 log.V(1).Info("setting 'Reconciling' condition")
253 conditions.MarkTrue(e, v1etcd.Reconciling, v1etcd.ProvisionedReconcilingReason, "%s", v1etcd.ProvisionedReconcilingMessage)
254 log.V(1).Info("setting 'InProgress' condition")
255 conditions.MarkTrue(e, v1etcd.InProgress, v1etcd.InProgressReason, "%s", v1etcd.InProgressMessage)
256 })
257 }
258
259
260
261
262 func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error {
263 content, err := r.secretContent(handlers)
264 if err != nil {
265 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedFailedReason, "%s", v1etcd.ProvisionedContentFailedMessage)
266 return fmt.Errorf("%s: %w", v1etcd.ProvisionedContentFailedMessage, err)
267 }
268
269 if err := client.IgnoreNotFound(handlers.secret.DeleteRemote(ctx)); err != nil {
270 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedFailedReason, "%s", v1etcd.ProvisionedDeleteFailedMessage)
271 return fmt.Errorf("%s: %w", v1etcd.ProvisionedDeleteFailedMessage, err)
272 }
273
274 r.generateSecret(content, handlers)
275 if err := handlers.secret.CreateRemote(ctx); err != nil {
276 conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedFailedReason, "%s", v1etcd.ProvisionedCreateFailedMessage)
277 return fmt.Errorf("%s: %w", v1etcd.ProvisionedCreateFailedMessage, err)
278 }
279 return nil
280 }
281
282
283 func (r *Reconciler) summarize(ctx context.Context, opts summarizeOptions) (ctrl.Result, error) {
284 s := edgereconcile.NewSummarizer(opts.patcher)
285 return s.SummarizeAndPatch(ctx, opts.handlers.member.EtcdMember,
286 edgereconcile.WithConditions(r.Conditions),
287 edgereconcile.WithResult(edgereconcile.ResultEmpty),
288 edgereconcile.WithError(opts.recErr),
289 edgereconcile.WithIgnoreNotFound(),
290 edgereconcile.WithProcessors(
291 edgereconcile.RecordReconcileReq,
292 edgereconcile.RecordResult,
293 UnsetReconciling,
294 ),
295 edgereconcile.WithFieldOwner(r.Name),
296 )
297 }
298
299
300
301 func UnsetReconciling(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) {
302 log := fog.FromContext(ctx)
303 member, ok := obj.(*v1etcd.EtcdMember)
304 if !ok {
305 return
306 }
307 if err == nil {
308 log.V(1).Info("removing 'Reconciling' condition")
309 conditions.Delete(member, v1etcd.Reconciling)
310 }
311 }
312
313
314 func (r *Reconciler) recordResults(ctx context.Context, opts resultOptions) {
315 r.Metrics.Default.RecordDuration(ctx, opts.handlers.member.EtcdMember, opts.startTime)
316 r.Metrics.Custom.RecordReconciliation(opts.handlers.member.EtcdMember)
317 r.Metrics.Custom.RecordReconciliationError(opts.recErr, opts.handlers.member.EtcdMember)
318 }
319
View as plain text