...

Source file src/edge-infra.dev/pkg/sds/etcd/manager/manager.go

Documentation: edge-infra.dev/pkg/sds/etcd/manager

     1  package manager
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"os/signal"
     8  	"sync"
     9  	"time"
    10  
    11  	"github.com/containerd/containerd/namespaces"
    12  	"github.com/spf13/afero"
    13  	"golang.org/x/sys/unix"
    14  	corev1 "k8s.io/api/core/v1"
    15  	"k8s.io/apimachinery/pkg/runtime"
    16  
    17  	"edge-infra.dev/pkg/lib/fog"
    18  	"edge-infra.dev/pkg/sds/etcd/manager/cluster"
    19  	"edge-infra.dev/pkg/sds/etcd/manager/internal/config"
    20  	"edge-infra.dev/pkg/sds/etcd/manager/internal/observability"
    21  	"edge-infra.dev/pkg/sds/etcd/manager/internal/socket"
    22  	v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
    23  	"edge-infra.dev/pkg/sds/lib/etcd/server"
    24  	"edge-infra.dev/pkg/sds/lib/k8s/manifest"
    25  )
    26  
    27  const (
    28  	RecoverySocket   = "/run/etcd-manager/etcd-manager.sock"
    29  	etcdManifestPath = "/etc/kubernetes/manifests/etcd.yaml"
    30  	containerdSocket = "/run/containerd/containerd.sock"
    31  )
    32  
    33  // etcdmanager represents an instance of the etcdmanager to monitor an etcd cluster
    34  type etcdManager struct {
    35  	cfg         config.Config
    36  	cluster     cluster.Cluster
    37  	stateLogger observability.StateLogger
    38  	mutex       *sync.Mutex
    39  }
    40  
    41  func newEtcdManager(cfg config.Config, cluster cluster.Cluster, stateLogger observability.StateLogger) *etcdManager {
    42  	return &etcdManager{
    43  		cfg,
    44  		cluster,
    45  		stateLogger,
    46  		&sync.Mutex{},
    47  	}
    48  }
    49  
    50  // Run is the entry point for the etcdmanager which handles setup and initialization
    51  func Run() error {
    52  	cfg, err := config.New()
    53  	if err != nil {
    54  		return fmt.Errorf("config validation failed: %w", err)
    55  	}
    56  
    57  	log := fog.New(fog.WithLevel(cfg.LogLevel())).WithName("etcd-manager")
    58  	ctx := fog.IntoContext(context.Background(), log)
    59  
    60  	etcdManager := createEtcdManager(cfg)
    61  	_ = etcdManager.runExitHandler(ctx, exit)
    62  
    63  	server := observability.NewServer(9085)
    64  	go runMetricsServer(ctx, server)
    65  
    66  	// create a Unix socket listener for the Etcd Manager for instant reset requests
    67  	sock := socket.NewSocket(etcdManager.cfg.Fs(), RecoverySocket)
    68  	if err := sock.Listen(); err != nil {
    69  		return fmt.Errorf("failed to listen on socket: %w", err)
    70  	}
    71  	go sock.Accept(ctx)
    72  	go sock.Handle(ctx, etcdManager)
    73  
    74  	return etcdManager.run(ctx)
    75  }
    76  
    77  func createEtcdManager(cfg config.Config) *etcdManager {
    78  	cluster := cluster.New(cfg.Endpoint(), cfg.MaxUnhealthy(), cluster.Status{})
    79  	cluster.InitializeStatus()
    80  	stateLogger := observability.NewStateLogger(cfg.LogLevel())
    81  	return newEtcdManager(cfg, cluster, stateLogger)
    82  }
    83  
    84  func runMetricsServer(ctx context.Context, server *observability.Server) {
    85  	var lastLoggedError time.Time
    86  	log := fog.FromContext(ctx)
    87  	for {
    88  		if err := server.Run(); err != nil {
    89  			if time.Since(lastLoggedError) > 5*time.Minute {
    90  				log.Error(err, "failure in metrics server")
    91  				lastLoggedError = time.Now()
    92  			}
    93  			// sleep for 20 seconds here to prevent continuous retries
    94  			time.Sleep(20 * time.Second)
    95  		}
    96  	}
    97  }
    98  
    99  // run is the main run loop for the etcdManager
   100  func (e *etcdManager) run(ctx context.Context) error {
   101  	log := fog.FromContext(ctx).WithValues("routine", "main")
   102  	ctx = fog.IntoContext(ctx, log)
   103  
   104  	e.cluster.InitializeStatus()
   105  	for {
   106  		if err := e.monitorHealth(ctx); err != nil {
   107  			log.Error(err, "failed to monitor etcd health")
   108  		}
   109  		time.Sleep(30 * time.Second)
   110  	}
   111  }
   112  
   113  func (e *etcdManager) monitorHealth(ctx context.Context) error {
   114  	client, err := e.cfg.EtcdRetryClient()
   115  	if err != nil {
   116  		return err
   117  	}
   118  	defer client.Close()
   119  
   120  	// update the local state for cluster health
   121  	e.cluster.UpdateStatus(ctx, client)
   122  	// log the new found state if it has changed
   123  	e.stateLogger.LogIfStateChanged(e.cluster.IsHealthy())
   124  	observability.ReportHealthMetrics(e.cluster.IsHealthy())
   125  
   126  	// get the cluster alarms and report them to metrics
   127  	alarms := cluster.GetAlarms(ctx, client)
   128  	observability.ReportAlarmMetrics(alarms)
   129  
   130  	// if the cluster is unhealthy, has no alarms set,
   131  	// and the max unhealthy threshold has been reached, reset the cluster
   132  	if len(alarms) == 0 && e.cluster.IsResetRequired() {
   133  		return resetCluster(ctx, e)
   134  	}
   135  
   136  	return nil
   137  }
   138  
   139  func resetCluster(ctx context.Context, etcdManager *etcdManager) error {
   140  	log := fog.FromContext(ctx)
   141  	log.V(0).Info("etcd cluster is unhealthy, resetting cluster...", "minutesUnhealthy", etcdManager.cfg.MaxUnhealthy().Minutes(), "emaudit", "")
   142  	acquired, err := etcdManager.WithTryLock(ctx, nil, etcdManager.ResetCluster)
   143  	if err != nil {
   144  		return err
   145  	}
   146  	if !acquired {
   147  		log.V(0).Info("cluster reset already in progress")
   148  	}
   149  	return nil
   150  }
   151  
   152  // WithTryLock will attempt to acquire the mutex lock before running the provided function.
   153  // If locked is not nil, then this channel will be sent confirmation about whether the
   154  // lock could be acquired or not. If the lock could not be acquired, the method will return
   155  // true, nil.
   156  func (e *etcdManager) WithTryLock(ctx context.Context, locked chan<- bool, fn func(context.Context) error) (bool, error) {
   157  	acquired := e.mutex.TryLock()
   158  	if locked != nil {
   159  		locked <- !acquired
   160  	}
   161  	if !acquired {
   162  		return false, nil
   163  	}
   164  	defer e.mutex.Unlock()
   165  
   166  	return true, fn(ctx)
   167  }
   168  
   169  // ResetCluster resets the etcd cluster. It does this by setting the FNC flag in the etcd manifest
   170  // and deleting all EtcdMember custom resources. This will trigger the reconfiguration process on
   171  // all worker nodes.
   172  func (e *etcdManager) ResetCluster(ctx context.Context) error {
   173  	log := fog.FromContext(ctx)
   174  	e.cluster.ResetTimer()
   175  
   176  	conn, err := e.cfg.SystemdConnection(ctx)
   177  	if err != nil {
   178  		return fmt.Errorf("failed to establish a connection to systemd: %w", err)
   179  	}
   180  	defer conn.Close()
   181  
   182  	containerdClient, err := e.cfg.ContainerdClient()
   183  	if err != nil {
   184  		return fmt.Errorf("failed to create containerd client: %w", err)
   185  	}
   186  	defer containerdClient.Close()
   187  	ctx = namespaces.WithNamespace(ctx, "k8s.io")
   188  
   189  	m := manifest.New(e.cfg.Fs(), etcdManifestPath, &corev1.Pod{}, 0)
   190  	// defer the removal of the FNC flag from the etcd manifest, which will set
   191  	// the cluster back to normal operation
   192  	defer func() {
   193  		log.V(0).Info("taking etcd out of new cluster mode")
   194  		if err := server.ExitNewClusterMode(ctx, &m, conn, containerdClient); err != nil {
   195  			log.Error(err, "failed to cleanup reset")
   196  		}
   197  	}()
   198  
   199  	log.V(0).Info("putting etcd into new cluster mode")
   200  	// set the FNC flag in the etcd manifest to force a new cluster
   201  	if err := server.EnterNewClusterMode(ctx, &m, conn, containerdClient); err != nil {
   202  		return fmt.Errorf("failed to reset cluster: %w", err)
   203  	}
   204  
   205  	// delete all EtcdMember custom resources to trigger the reconfiguration process
   206  	// on all worker nodes
   207  	if err := e.deleteEtcdMembersAfterAPIServerRecovery(ctx); err != nil {
   208  		return fmt.Errorf("failed to delete all EtcdMembers: %w", err)
   209  	}
   210  	log.V(0).Info("cluster reset successfully", "emaudit", "")
   211  	return nil
   212  }
   213  
   214  // runExitHandler runs the exit handler for the etcdManager to ensure the FNC flag is removed
   215  func (e *etcdManager) runExitHandler(ctx context.Context, exitFn func(context.Context, afero.Fs, func(int))) chan os.Signal {
   216  	log := fog.FromContext(ctx).WithValues("routine", "exit")
   217  	ctx = fog.IntoContext(ctx, log)
   218  	sigchnl := make(chan os.Signal, 1)
   219  	// catch SIGTERM and SIGINT signals
   220  	signal.Notify(sigchnl, unix.SIGTERM, unix.SIGINT)
   221  	go func() {
   222  		for {
   223  			<-sigchnl
   224  			log.V(0).Info("exit signal received - taking etcd out of new cluster mode")
   225  			exitFn(ctx, e.cfg.Fs(), os.Exit)
   226  			return
   227  		}
   228  	}()
   229  	return sigchnl
   230  }
   231  
   232  // exit ensures the FNC flag is removed after program exit. This is necessary to
   233  // ensure the cluster is not left with the force new cluster flag set. If the
   234  // force new cluster flag is set, every time the controlplane etcd pod is
   235  // restarted, it will create a new cluster, causing panics and quorum loss.
   236  func exit(ctx context.Context, fs afero.Fs, exiter func(int)) {
   237  	log := fog.FromContext(ctx)
   238  	m := manifest.New(fs, etcdManifestPath, &corev1.Pod{}, 0)
   239  
   240  	if err := m.WithUpdate(func(obj runtime.Object) error {
   241  		pod, ok := obj.(*corev1.Pod)
   242  		if !ok {
   243  			return fmt.Errorf("current content of the manifest is not a valid pod")
   244  		}
   245  		return server.ClearFNCFlag(pod)
   246  	}); err != nil {
   247  		log.Error(err, "failed to remove force new cluster flag from the etcd manifest")
   248  	}
   249  	exiter(1)
   250  }
   251  
   252  // deleteEtcdMembersAfterAPIServerRecovery will wait for the time specified in
   253  // the kube blocking retry client retry duration. If the API server returns
   254  // during this period, all EtcdMember custom resources are deleted to trigger
   255  // the reconfiguration process on all worker nodes.
   256  func (e *etcdManager) deleteEtcdMembersAfterAPIServerRecovery(ctx context.Context) error {
   257  	etcdMembers, err := e.getEtcdMembers(ctx)
   258  	if err != nil {
   259  		return err
   260  	}
   261  
   262  	eclient, err := e.cfg.EtcdRetryClient()
   263  	if err != nil {
   264  		return fmt.Errorf("failed to retrieve etcd retry client: %w", err)
   265  	}
   266  	defer eclient.Close()
   267  
   268  	e.cluster.UpdateStatus(ctx, eclient)
   269  	return e.deleteEtcdMembers(ctx, etcdMembers)
   270  }
   271  
   272  // getEtcdMembers will wait for the API server to become healthy again and then
   273  // return a list of all EtcdMembers.
   274  func (e *etcdManager) getEtcdMembers(ctx context.Context) (*v1etcd.EtcdMemberList, error) {
   275  	log := fog.FromContext(ctx)
   276  	log.V(0).Info("waiting for API server to become available...")
   277  	// retrieve the blocking kube retry client. If one has not yet been configured,
   278  	// then one will be created. The blocking kube retry client retry duration will
   279  	// be respected during this creation process
   280  	kclient, err := e.cfg.BlockingKubeRetryClient()
   281  	if err != nil {
   282  		return nil, fmt.Errorf("failed to retrieve blocking kube retry client: %w", err)
   283  	}
   284  
   285  	etcdMembers := &v1etcd.EtcdMemberList{}
   286  	// the blocking kube retry client will block here until the API server
   287  	// is available or the retry duration has been reached
   288  	if err := kclient.SafeList(ctx, etcdMembers); err != nil {
   289  		return nil, fmt.Errorf("failed to list etcd members: %w", err)
   290  	}
   291  	log.V(0).Info("API server is available")
   292  	return etcdMembers, nil
   293  }
   294  
   295  // deleteEtcdMembers takes a list of EtcdMembers and will loop through and
   296  // delete all of them.
   297  func (e *etcdManager) deleteEtcdMembers(ctx context.Context, etcdMembers *v1etcd.EtcdMemberList) error {
   298  	log := fog.FromContext(ctx)
   299  	log.V(0).Info("deleting all EtcdMembers")
   300  	kclient, err := e.cfg.KubeRetryClient()
   301  	if err != nil {
   302  		return fmt.Errorf("failed to retrieve kube retry client: %w", err)
   303  	}
   304  	// loop through all EtcdMember custom resources and delete them to trigger
   305  	// the reconfiguration process on all worker nodes
   306  	for i := range etcdMembers.Items {
   307  		if err := kclient.SafeDelete(ctx, &etcdMembers.Items[i]); err != nil {
   308  			return fmt.Errorf("failed to delete etcd member: %w", err)
   309  		}
   310  	}
   311  	return nil
   312  }
   313  

View as plain text