package recovery import ( "context" "errors" "fmt" "net" "time" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/etcd/manager" "edge-infra.dev/pkg/sds/etcd/manager/internal/socket" ) // Initiate sends a message to the etcd manager to request an etcd cluster // reset. func Initiate(ctx context.Context) error { log := fog.FromContext(ctx) log.Info("initiating etcd cluster recovery", "eoaudit", "") conn, err := connWithDeadline(manager.RecoverySocket) if err != nil { return fmt.Errorf("failed to establish socket connection: %w", err) } defer conn.Close() if _, err := conn.Write([]byte(socket.ResetMessage)); err != nil { return err } return handleResponse(ctx, conn) } // connWithDeadline sets up a connection to the socket with a 30 second read and // write deadline. func connWithDeadline(socketPath string) (net.Conn, error) { conn, err := net.Dial("unix", socketPath) if err != nil { return conn, fmt.Errorf("failed to dial the Unix socket: %w", err) } err = conn.SetDeadline(time.Now().Add(30 * time.Second)) return conn, err } // handleResponse receives the response message and determines whether the // recovery was successful. func handleResponse(ctx context.Context, conn net.Conn) error { log := fog.FromContext(ctx) response, err := receiveMessage(conn) if err != nil { return err } switch response { case socket.SuccessResponse: log.Info("initiated etcd cluster recovery successfully", "eoaudit", "") case socket.LockedResponse: log.Info("etcd cluster recovery already initiated", "eoaudit", "") default: return errors.New("failed to initiate etcd cluster recovery") } return nil } // receiveMessage reads the connection and returns the message. func receiveMessage(conn net.Conn) (string, error) { buf := make([]byte, 1024) n, err := conn.Read(buf) if err != nil { return "", fmt.Errorf("failed to read socket message: %w", err) } msg := string(buf[0:n]) return msg, nil }