1 package detector
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/spf13/afero"
10 ctrl "sigs.k8s.io/controller-runtime"
11
12 "edge-infra.dev/pkg/lib/fog"
13 etcdconstants "edge-infra.dev/pkg/sds/etcd/operator/constants"
14 "edge-infra.dev/pkg/sds/lanoutage/detector/internal/config"
15 "edge-infra.dev/pkg/sds/lanoutage/detector/internal/constants"
16 "edge-infra.dev/pkg/sds/lanoutage/detector/internal/interlock"
17 "edge-infra.dev/pkg/sds/lib/dbus/systemd"
18 )
19
20 const (
21 shortEnterTime time.Duration = 2 * time.Minute
22 shortLeaveTime time.Duration = 0 * time.Second
23 enterTime time.Duration = 30 * time.Minute
24 leaveTime time.Duration = 10 * time.Minute
25 )
26
27
28 type Action int
29
30 const (
31 None Action = iota
32 Enter
33 Leave
34 Stay
35 )
36
37 func Run() error {
38 ctx := context.Background()
39
40 cfg, err := config.New()
41 if err != nil {
42 return err
43 }
44
45 log := fog.New(fog.WithLevel(cfg.LOMConfig.LogLevel())).WithName("lan-outage-detector")
46 ctx = fog.IntoContext(ctx, log)
47 ctrl.SetLogger(log)
48
49 if !cfg.ThickPos {
50 log.Info("Thick POS mode is disabled. LAN Outage Detector will not run.")
51 return nil
52 }
53
54 if err := removeOldLANOutageDetector(ctx); err != nil {
55 return fmt.Errorf("error removing old LAN Outage Detector: %w", err)
56 }
57
58 isLOM, err := afero.Exists(cfg.Fs, constants.LOMFlagFilepath)
59 if err != nil {
60 return fmt.Errorf("failed to check if node is in LAN Outage Mode: %w", err)
61 }
62
63 firstBoot, err := isFirstBoot(cfg.Fs)
64 if err != nil {
65 return fmt.Errorf("failed to check if LOD has restarted: %w", err)
66 }
67
68 if firstBoot {
69 if err := cfg.Fs.RemoveAll(etcdconstants.OperatorFilewallFilepath); err != nil {
70 return fmt.Errorf("failed to remove etcd operator firewall: %w", err)
71 }
72 }
73
74 log.Info("LAN Outage Detector running..")
75
76 lomReconciler := NewLOMReconciler(cfg, isLOM)
77 lanDetector, err := newLANDetector(lomReconciler)
78 if err != nil {
79 return fmt.Errorf("failed to initialize LAN outage detector: %w", err)
80 }
81
82 if firstBoot {
83 lomReconciler.enterDeadline = time.Now().Add(shortEnterTime)
84 lomReconciler.leaveDeadline = time.Now().Add(shortLeaveTime)
85 }
86
87 if isLOM && firstBoot {
88 err := lomReconciler.Enter(ctx)
89 if err != nil {
90 log.Error(err, "failed to completely re-enter LAN Outage Mode")
91 }
92 }
93
94
95 sh, err := NewSocketHandler(constants.SocketFilepath)
96 if err != nil {
97 return fmt.Errorf("failed to create the socket handler: %w", err)
98 }
99
100
101 go forceClientHandler(ctx, lomReconciler, sh)
102 return detectLoop(ctx, &lanDetector)
103 }
104
105
106
107
108
109
110 func detectLoop(ctx context.Context, detector *LANDetector) error {
111 log := fog.FromContext(ctx)
112
113 if err := detector.Detect(); err != nil {
114 log.Error(err, "failed to run initial cluster connectivity check")
115 }
116
117 log.Info("initial connection status", "connected", detector.canConnect)
118 wasConnected := detector.canConnect
119
120 for {
121 if err := interlock.UpdateInterlock(detector.canConnect, detector.isLOM); err != nil {
122 log.Error(err, "failed to update the Interlock API state")
123 }
124
125 time.Sleep(10 * time.Second)
126 if err := detector.Detect(); err != nil {
127 log.Error(err, "failed to run cluster connectivity check")
128 continue
129 }
130
131
132 if detector.canConnect != wasConnected {
133 log.Info("connection status changed", "connected", detector.canConnect)
134 wasConnected = detector.canConnect
135 detector.LOMReconciler.resetDeadline()
136 }
137
138 var actionTaken Action
139 var err error
140 if detector.canConnect {
141 actionTaken, err = onConnection(ctx, detector)
142 if err != nil {
143 log.Error(err, "failed to handle connected state")
144 }
145 } else {
146 actionTaken, err = noConnection(ctx, detector)
147 if err != nil {
148 log.Error(err, "failed to handle disconnected state")
149 }
150 }
151
152
153
154 if actionTaken != None {
155 detector.LOMReconciler.resetDeadline()
156 setFirstBoot(ctx, detector.GetFs())
157 }
158 }
159 }
160
161
162
163
164 func onConnection(ctx context.Context, detector *LANDetector) (Action, error) {
165 log := fog.FromContext(ctx)
166
167 if !detector.isLOM {
168 return Stay, nil
169 }
170
171 if detector.isPastLeaveDeadline() {
172 acquired, leaveErr := detector.WithLock(func() error {
173 return tryLeave(ctx, detector)
174 })
175
176 if !acquired {
177 log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
178 return None, nil
179 }
180
181
182 if errors.Is(leaveErr, errPrecondition) {
183 return None, leaveErr
184 }
185
186
187 if leaveErr != nil {
188 log.Error(leaveErr, "there was an error in the leave LAN Outage Mode process")
189 log.Info("applying cleanup for leave, now entering...")
190
191 return Enter, leaveCleanup(ctx, detector)
192 }
193
194 return Leave, nil
195 }
196 return None, nil
197 }
198
199
200 func tryLeave(ctx context.Context, detector *LANDetector) error {
201 required, err := detector.LeavePreconditionCheck(ctx)
202 if err != nil {
203 return fmt.Errorf("%w - failed to leave: %w", errPrecondition, err)
204 }
205
206 if required {
207 return detector.Leave(ctx)
208 }
209
210 return nil
211 }
212
213
214
215
216 func setFirstBoot(ctx context.Context, fs afero.Fs) {
217 log := fog.FromContext(ctx)
218 _, err := fs.Create(constants.FirstRunFlagFilepath)
219 if err != nil {
220 log.Error(err, "failed to create the first run flag file")
221 }
222 }
223
224 func leaveCleanup(ctx context.Context, detector *LANDetector) error {
225 log := fog.FromContext(ctx)
226 acquired, err := detector.WithLock(func() error {
227 return detector.Enter(ctx)
228 })
229
230 if !acquired {
231 log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
232 }
233
234 if err != nil {
235 return fmt.Errorf("failed to enter LAN Outage Mode in the leave fallback process: %w", err)
236 }
237 return nil
238 }
239
240
241
242
243 func noConnection(ctx context.Context, detector *LANDetector) (Action, error) {
244 log := fog.FromContext(ctx)
245
246 if detector.isLOM {
247 return Stay, nil
248 }
249
250 if detector.isPastEnterDeadline() {
251 acquired, enterErr := detector.WithLock(func() error {
252 return tryEnter(ctx, detector)
253 })
254
255 if !acquired {
256 log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
257 return None, nil
258 }
259
260
261 if errors.Is(enterErr, errPrecondition) {
262 return None, enterErr
263 }
264
265
266 if enterErr != nil {
267 log.Error(enterErr, "there was an error in the enter LAN Outage Mode process")
268 log.Info("applying cleanup for enter, now leaving...")
269
270 return Leave, enterCleanup(ctx, detector)
271 }
272
273 return Enter, nil
274 }
275 return None, nil
276 }
277
278
279
280 func tryEnter(ctx context.Context, detector *LANDetector) error {
281 required, err := detector.EnterPreconditionCheck(ctx)
282 if err != nil {
283 return fmt.Errorf("%w - failed to enter: %w", errPrecondition, err)
284 }
285 if !required {
286 return nil
287 }
288 return detector.Enter(ctx)
289 }
290
291
292 func removeOldLANOutageDetector(ctx context.Context) error {
293 log := fog.FromContext(ctx)
294 conn, err := systemd.NewConnection(ctx)
295 if err != nil {
296 return fmt.Errorf("failed to establish a connection to systemd: %w", err)
297 }
298 defer conn.Close()
299
300
301 active, err := serviceActive(ctx, conn, constants.LANOutageServiceName)
302 if err != nil || !active {
303 return err
304 }
305
306 log.Info("attempting to stop prexisting service", "service", constants.LANOutageServiceName)
307
308 if err := conn.Stop(ctx, constants.LANOutageServiceName, systemd.Replace, false); err != nil {
309 return err
310 }
311
312
313 log.Info("attempting to disable prexisting service", "service", constants.LANOutageServiceName)
314 return conn.Disable(ctx, []string{constants.LANOutageServiceName})
315 }
316
317
318 func serviceActive(ctx context.Context, conn systemd.StatusChecker, service string) (bool, error) {
319 log := fog.FromContext(ctx)
320
321 exists, err := conn.Exists(ctx, service)
322 if err != nil {
323 return false, fmt.Errorf("failed to check for prexisting service: %w", err)
324 }
325
326 if !exists {
327 return false, nil
328 }
329
330 state, err := conn.ActiveState(ctx, constants.LANOutageServiceName)
331 if err != nil {
332 return false, err
333 }
334
335 if state == "inactive" {
336 return false, nil
337 }
338 log.Info("found an active service", "service", service)
339
340 return true, nil
341 }
342
343
344
345 func enterCleanup(ctx context.Context, detector *LANDetector) error {
346 log := fog.FromContext(ctx)
347 acquired, err := detector.WithLock(func() error {
348 return detector.Leave(ctx)
349 })
350
351 if !acquired {
352 log.Error(errors.New(constants.FailedToAcquireLockMessage), "failed to acquire the reconciler lock")
353 }
354 if err != nil {
355 return fmt.Errorf("failed to leave LAN Outage Mode in the enter fallback process: %w", err)
356 }
357
358 return nil
359 }
360
361 func isFirstBoot(fs afero.Fs) (bool, error) {
362 exists, err := afero.Exists(fs, constants.FirstRunFlagFilepath)
363 if err != nil {
364 return false, err
365 }
366 return !exists, nil
367 }
368
369
370
371 func forceClientHandler(ctx context.Context, reconciler *LOMReconciler, sh *SocketHandler) {
372 log := fog.FromContext(ctx)
373 log.Info("starting force client handler routine...")
374
375 for {
376 conn, err := sh.ReceiveConnection()
377 if err != nil {
378 log.Error(err, "failed to read unix socket")
379 continue
380 }
381
382 go handleSocketConnection(ctx, conn, reconciler)
383 }
384 }
385
386
387
388 func handleSocketConnection(ctx context.Context, conn *Conn, reconciler *LOMReconciler) {
389 log := fog.FromContext(ctx)
390
391 message, err := conn.Receive()
392 if err != nil {
393 log.Error(err, "failed to read message from socket connection")
394 if err := conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToReceiveMessage); err != nil {
395 log.Error(err, "failed to reply to socket request")
396 }
397 return
398 }
399
400 switch message {
401 case constants.EnterCommand:
402 if err := enterHandler(ctx, reconciler, conn); err != nil {
403 log.Error(err, "failed to process enter LAN Outage Mode request")
404 }
405 case constants.LeaveCommand:
406 if err := leaveHandler(ctx, reconciler, conn); err != nil {
407 log.Error(err, "failed to process leave LAN Outage Mode request")
408 }
409 default:
410 log.Info(constants.UnexpectedSocketCommandMessage)
411
412 if err := conn.Respond(constants.FailedToProcessRequestMessage, constants.UnexpectedSocketCommandMessage); err != nil {
413 log.Error(err, "failed to reply to socket request")
414 }
415 }
416 }
417
418
419
420
421 func enterHandler(ctx context.Context, reconciler *LOMReconciler, conn *Conn) error {
422 acquired, err := reconciler.WithLock(func() error {
423 if err := conn.Respond(constants.EnteringMessage, ""); err != nil {
424 return err
425 }
426
427 return reconciler.Enter(ctx)
428 })
429
430 if !acquired {
431 return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToAcquireLockMessage)
432 }
433 if err != nil {
434 fog.FromContext(ctx).Error(err, constants.FailedToEnterMessage)
435 return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToEnterMessage)
436 }
437
438 return conn.Respond(constants.EnterSuccessMessage, "")
439 }
440
441
442
443
444 func leaveHandler(ctx context.Context, reconciler *LOMReconciler, conn *Conn) error {
445 acquired, err := reconciler.WithLock(func() error {
446 if err := conn.Respond(constants.LeavingMessage, ""); err != nil {
447 return err
448 }
449
450 return reconciler.Leave(ctx)
451 })
452
453 if !acquired {
454 return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToAcquireLockMessage)
455 }
456
457 if err != nil {
458 fog.FromContext(ctx).Error(err, constants.FailedToLeaveMessage)
459 return conn.Respond(constants.FailedToProcessRequestMessage, constants.FailedToLeaveMessage)
460 }
461
462 return conn.Respond(constants.LeaveSuccessMessage, "")
463 }
464
View as plain text