package manager import ( "context" "fmt" "os" "os/signal" "sync" "time" "github.com/containerd/containerd/namespaces" "github.com/spf13/afero" "golang.org/x/sys/unix" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/etcd/manager/cluster" "edge-infra.dev/pkg/sds/etcd/manager/internal/config" "edge-infra.dev/pkg/sds/etcd/manager/internal/observability" "edge-infra.dev/pkg/sds/etcd/manager/internal/socket" v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1" "edge-infra.dev/pkg/sds/lib/etcd/server" "edge-infra.dev/pkg/sds/lib/k8s/manifest" ) const ( RecoverySocket = "/run/etcd-manager/etcd-manager.sock" etcdManifestPath = "/etc/kubernetes/manifests/etcd.yaml" containerdSocket = "/run/containerd/containerd.sock" ) // etcdmanager represents an instance of the etcdmanager to monitor an etcd cluster type etcdManager struct { cfg config.Config cluster cluster.Cluster stateLogger observability.StateLogger mutex *sync.Mutex } func newEtcdManager(cfg config.Config, cluster cluster.Cluster, stateLogger observability.StateLogger) *etcdManager { return &etcdManager{ cfg, cluster, stateLogger, &sync.Mutex{}, } } // Run is the entry point for the etcdmanager which handles setup and initialization func Run() error { cfg, err := config.New() if err != nil { return fmt.Errorf("config validation failed: %w", err) } log := fog.New(fog.WithLevel(cfg.LogLevel())).WithName("etcd-manager") ctx := fog.IntoContext(context.Background(), log) etcdManager := createEtcdManager(cfg) _ = etcdManager.runExitHandler(ctx, exit) server := observability.NewServer(9085) go runMetricsServer(ctx, server) // create a Unix socket listener for the Etcd Manager for instant reset requests sock := socket.NewSocket(etcdManager.cfg.Fs(), RecoverySocket) if err := sock.Listen(); err != nil { return fmt.Errorf("failed to listen on socket: %w", err) } go sock.Accept(ctx) go sock.Handle(ctx, etcdManager) return etcdManager.run(ctx) } func createEtcdManager(cfg config.Config) *etcdManager { cluster := cluster.New(cfg.Endpoint(), cfg.MaxUnhealthy(), cluster.Status{}) cluster.InitializeStatus() stateLogger := observability.NewStateLogger(cfg.LogLevel()) return newEtcdManager(cfg, cluster, stateLogger) } func runMetricsServer(ctx context.Context, server *observability.Server) { var lastLoggedError time.Time log := fog.FromContext(ctx) for { if err := server.Run(); err != nil { if time.Since(lastLoggedError) > 5*time.Minute { log.Error(err, "failure in metrics server") lastLoggedError = time.Now() } // sleep for 20 seconds here to prevent continuous retries time.Sleep(20 * time.Second) } } } // run is the main run loop for the etcdManager func (e *etcdManager) run(ctx context.Context) error { log := fog.FromContext(ctx).WithValues("routine", "main") ctx = fog.IntoContext(ctx, log) e.cluster.InitializeStatus() for { if err := e.monitorHealth(ctx); err != nil { log.Error(err, "failed to monitor etcd health") } time.Sleep(30 * time.Second) } } func (e *etcdManager) monitorHealth(ctx context.Context) error { client, err := e.cfg.EtcdRetryClient() if err != nil { return err } defer client.Close() // update the local state for cluster health e.cluster.UpdateStatus(ctx, client) // log the new found state if it has changed e.stateLogger.LogIfStateChanged(e.cluster.IsHealthy()) observability.ReportHealthMetrics(e.cluster.IsHealthy()) // get the cluster alarms and report them to metrics alarms := cluster.GetAlarms(ctx, client) observability.ReportAlarmMetrics(alarms) // if the cluster is unhealthy, has no alarms set, // and the max unhealthy threshold has been reached, reset the cluster if len(alarms) == 0 && e.cluster.IsResetRequired() { return resetCluster(ctx, e) } return nil } func resetCluster(ctx context.Context, etcdManager *etcdManager) error { log := fog.FromContext(ctx) log.V(0).Info("etcd cluster is unhealthy, resetting cluster...", "minutesUnhealthy", etcdManager.cfg.MaxUnhealthy().Minutes(), "emaudit", "") acquired, err := etcdManager.WithTryLock(ctx, nil, etcdManager.ResetCluster) if err != nil { return err } if !acquired { log.V(0).Info("cluster reset already in progress") } return nil } // WithTryLock will attempt to acquire the mutex lock before running the provided function. // If locked is not nil, then this channel will be sent confirmation about whether the // lock could be acquired or not. If the lock could not be acquired, the method will return // true, nil. func (e *etcdManager) WithTryLock(ctx context.Context, locked chan<- bool, fn func(context.Context) error) (bool, error) { acquired := e.mutex.TryLock() if locked != nil { locked <- !acquired } if !acquired { return false, nil } defer e.mutex.Unlock() return true, fn(ctx) } // ResetCluster resets the etcd cluster. It does this by setting the FNC flag in the etcd manifest // and deleting all EtcdMember custom resources. This will trigger the reconfiguration process on // all worker nodes. func (e *etcdManager) ResetCluster(ctx context.Context) error { log := fog.FromContext(ctx) e.cluster.ResetTimer() conn, err := e.cfg.SystemdConnection(ctx) if err != nil { return fmt.Errorf("failed to establish a connection to systemd: %w", err) } defer conn.Close() containerdClient, err := e.cfg.ContainerdClient() if err != nil { return fmt.Errorf("failed to create containerd client: %w", err) } defer containerdClient.Close() ctx = namespaces.WithNamespace(ctx, "k8s.io") m := manifest.New(e.cfg.Fs(), etcdManifestPath, &corev1.Pod{}, 0) // defer the removal of the FNC flag from the etcd manifest, which will set // the cluster back to normal operation defer func() { log.V(0).Info("taking etcd out of new cluster mode") if err := server.ExitNewClusterMode(ctx, &m, conn, containerdClient); err != nil { log.Error(err, "failed to cleanup reset") } }() log.V(0).Info("putting etcd into new cluster mode") // set the FNC flag in the etcd manifest to force a new cluster if err := server.EnterNewClusterMode(ctx, &m, conn, containerdClient); err != nil { return fmt.Errorf("failed to reset cluster: %w", err) } // delete all EtcdMember custom resources to trigger the reconfiguration process // on all worker nodes if err := e.deleteEtcdMembersAfterAPIServerRecovery(ctx); err != nil { return fmt.Errorf("failed to delete all EtcdMembers: %w", err) } log.V(0).Info("cluster reset successfully", "emaudit", "") return nil } // runExitHandler runs the exit handler for the etcdManager to ensure the FNC flag is removed func (e *etcdManager) runExitHandler(ctx context.Context, exitFn func(context.Context, afero.Fs, func(int))) chan os.Signal { log := fog.FromContext(ctx).WithValues("routine", "exit") ctx = fog.IntoContext(ctx, log) sigchnl := make(chan os.Signal, 1) // catch SIGTERM and SIGINT signals signal.Notify(sigchnl, unix.SIGTERM, unix.SIGINT) go func() { for { <-sigchnl log.V(0).Info("exit signal received - taking etcd out of new cluster mode") exitFn(ctx, e.cfg.Fs(), os.Exit) return } }() return sigchnl } // exit ensures the FNC flag is removed after program exit. This is necessary to // ensure the cluster is not left with the force new cluster flag set. If the // force new cluster flag is set, every time the controlplane etcd pod is // restarted, it will create a new cluster, causing panics and quorum loss. func exit(ctx context.Context, fs afero.Fs, exiter func(int)) { log := fog.FromContext(ctx) m := manifest.New(fs, etcdManifestPath, &corev1.Pod{}, 0) if err := m.WithUpdate(func(obj runtime.Object) error { pod, ok := obj.(*corev1.Pod) if !ok { return fmt.Errorf("current content of the manifest is not a valid pod") } return server.ClearFNCFlag(pod) }); err != nil { log.Error(err, "failed to remove force new cluster flag from the etcd manifest") } exiter(1) } // deleteEtcdMembersAfterAPIServerRecovery will wait for the time specified in // the kube blocking retry client retry duration. If the API server returns // during this period, all EtcdMember custom resources are deleted to trigger // the reconfiguration process on all worker nodes. func (e *etcdManager) deleteEtcdMembersAfterAPIServerRecovery(ctx context.Context) error { etcdMembers, err := e.getEtcdMembers(ctx) if err != nil { return err } eclient, err := e.cfg.EtcdRetryClient() if err != nil { return fmt.Errorf("failed to retrieve etcd retry client: %w", err) } defer eclient.Close() e.cluster.UpdateStatus(ctx, eclient) return e.deleteEtcdMembers(ctx, etcdMembers) } // getEtcdMembers will wait for the API server to become healthy again and then // return a list of all EtcdMembers. func (e *etcdManager) getEtcdMembers(ctx context.Context) (*v1etcd.EtcdMemberList, error) { log := fog.FromContext(ctx) log.V(0).Info("waiting for API server to become available...") // retrieve the blocking kube retry client. If one has not yet been configured, // then one will be created. The blocking kube retry client retry duration will // be respected during this creation process kclient, err := e.cfg.BlockingKubeRetryClient() if err != nil { return nil, fmt.Errorf("failed to retrieve blocking kube retry client: %w", err) } etcdMembers := &v1etcd.EtcdMemberList{} // the blocking kube retry client will block here until the API server // is available or the retry duration has been reached if err := kclient.SafeList(ctx, etcdMembers); err != nil { return nil, fmt.Errorf("failed to list etcd members: %w", err) } log.V(0).Info("API server is available") return etcdMembers, nil } // deleteEtcdMembers takes a list of EtcdMembers and will loop through and // delete all of them. func (e *etcdManager) deleteEtcdMembers(ctx context.Context, etcdMembers *v1etcd.EtcdMemberList) error { log := fog.FromContext(ctx) log.V(0).Info("deleting all EtcdMembers") kclient, err := e.cfg.KubeRetryClient() if err != nil { return fmt.Errorf("failed to retrieve kube retry client: %w", err) } // loop through all EtcdMember custom resources and delete them to trigger // the reconfiguration process on all worker nodes for i := range etcdMembers.Items { if err := kclient.SafeDelete(ctx, &etcdMembers.Items[i]); err != nil { return fmt.Errorf("failed to delete etcd member: %w", err) } } return nil }