...
1 package recovery
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net"
8 "time"
9
10 "edge-infra.dev/pkg/lib/fog"
11 "edge-infra.dev/pkg/sds/etcd/manager"
12 "edge-infra.dev/pkg/sds/etcd/manager/internal/socket"
13 )
14
15
16
17 func Initiate(ctx context.Context) error {
18 log := fog.FromContext(ctx)
19 log.Info("initiating etcd cluster recovery", "eoaudit", "")
20
21 conn, err := connWithDeadline(manager.RecoverySocket)
22 if err != nil {
23 return fmt.Errorf("failed to establish socket connection: %w", err)
24 }
25 defer conn.Close()
26
27 if _, err := conn.Write([]byte(socket.ResetMessage)); err != nil {
28 return err
29 }
30 return handleResponse(ctx, conn)
31 }
32
33
34
35 func connWithDeadline(socketPath string) (net.Conn, error) {
36 conn, err := net.Dial("unix", socketPath)
37 if err != nil {
38 return conn, fmt.Errorf("failed to dial the Unix socket: %w", err)
39 }
40
41 err = conn.SetDeadline(time.Now().Add(30 * time.Second))
42 return conn, err
43 }
44
45
46
47 func handleResponse(ctx context.Context, conn net.Conn) error {
48 log := fog.FromContext(ctx)
49 response, err := receiveMessage(conn)
50 if err != nil {
51 return err
52 }
53
54 switch response {
55 case socket.SuccessResponse:
56 log.Info("initiated etcd cluster recovery successfully", "eoaudit", "")
57 case socket.LockedResponse:
58 log.Info("etcd cluster recovery already initiated", "eoaudit", "")
59 default:
60 return errors.New("failed to initiate etcd cluster recovery")
61 }
62 return nil
63 }
64
65
66 func receiveMessage(conn net.Conn) (string, error) {
67 buf := make([]byte, 1024)
68 n, err := conn.Read(buf)
69 if err != nil {
70 return "", fmt.Errorf("failed to read socket message: %w", err)
71 }
72 msg := string(buf[0:n])
73 return msg, nil
74 }
75
View as plain text