package server import ( "context" "fmt" "slices" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "edge-infra.dev/pkg/sds/lib/containerd" "edge-infra.dev/pkg/sds/lib/dbus/systemd" "edge-infra.dev/pkg/sds/lib/k8s/manifest" ) var ( FNCFlag = "--force-new-cluster=true" ) // EnterNewClusterMode creates a new etcd cluster with the current node as the only member. // Always run ExitNewClusterMode after you have finished configuring your new cluster // after using EnterNewClusterMode. Failure to do so will result in the cluster being reset // every time the etcd container restarts. func EnterNewClusterMode(ctx context.Context, m *manifest.Manifest, restarter systemd.Restarter, terminator containerd.Terminator) error { return WithRestart(ctx, restarter, terminator, func() error { return m.WithUpdate(func(obj runtime.Object) error { pod, ok := obj.(*corev1.Pod) if !ok { return fmt.Errorf("current content of the etcd manifest is not a valid pod") } return SetFNCFlag(pod) }) }) } // ExitNewClusterMode prevents the etcd cluster from being restarted when the etcd // container restarts. func ExitNewClusterMode(ctx context.Context, m *manifest.Manifest, restarter systemd.Restarter, terminator containerd.Terminator) error { return WithRestart(ctx, restarter, terminator, func() error { return m.WithUpdate(func(obj runtime.Object) error { pod, ok := obj.(*corev1.Pod) if !ok { return fmt.Errorf("current content of the etcd manifest is not a valid pod") } return ClearFNCFlag(pod) }) }) } // WithRestart takes a function and adds functionality to restart the etcd // container and kubelet service. func WithRestart(ctx context.Context, restarter systemd.Restarter, terminator containerd.Terminator, fn func() error) error { labelFilters := containerd.NewLabelFiltersBuilder().WithContainerName("etcd").WithPodNamespace(metav1.NamespaceSystem).Build() err := terminator.WithTerminate(ctx, *labelFilters, func() error { return fn() }) if err != nil { return err } return restarter.Restart(ctx, "kubelet.service", systemd.Replace, false) } // SetFNCFlag adds the --force-new-cluster=true flag to the etcd container command if // it does not already exist. func SetFNCFlag(pod *corev1.Pod) error { if len(pod.Spec.Containers) == 0 { return fmt.Errorf("etcd container not found in etcd manifest") } if !slices.Contains(pod.Spec.Containers[0].Command, FNCFlag) { pod.Spec.Containers[0].Command = append(pod.Spec.Containers[0].Command, FNCFlag) } return nil } // ClearFNCFlag removes the --force-new-cluster=true flag from the etcd container command // if it exists. func ClearFNCFlag(pod *corev1.Pod) error { if len(pod.Spec.Containers) == 0 { return fmt.Errorf("etcd container not found in etcd manifest") } updatedCommand := make([]string, 0, len(pod.Spec.Containers[0].Command)) for _, value := range pod.Spec.Containers[0].Command { if value != FNCFlag { // Value doesn't match the forceNewClusterFlag, keep it in the updatedCommand array updatedCommand = append(updatedCommand, value) } } pod.Spec.Containers[0].Command = updatedCommand return nil }