1 package manager
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "os/signal"
8 "sync"
9 "time"
10
11 "github.com/containerd/containerd/namespaces"
12 "github.com/spf13/afero"
13 "golang.org/x/sys/unix"
14 corev1 "k8s.io/api/core/v1"
15 "k8s.io/apimachinery/pkg/runtime"
16
17 "edge-infra.dev/pkg/lib/fog"
18 "edge-infra.dev/pkg/sds/etcd/manager/cluster"
19 "edge-infra.dev/pkg/sds/etcd/manager/internal/config"
20 "edge-infra.dev/pkg/sds/etcd/manager/internal/observability"
21 "edge-infra.dev/pkg/sds/etcd/manager/internal/socket"
22 v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
23 "edge-infra.dev/pkg/sds/lib/etcd/server"
24 "edge-infra.dev/pkg/sds/lib/k8s/manifest"
25 )
26
27 const (
28 RecoverySocket = "/run/etcd-manager/etcd-manager.sock"
29 etcdManifestPath = "/etc/kubernetes/manifests/etcd.yaml"
30 containerdSocket = "/run/containerd/containerd.sock"
31 )
32
33
34 type etcdManager struct {
35 cfg config.Config
36 cluster cluster.Cluster
37 stateLogger observability.StateLogger
38 mutex *sync.Mutex
39 }
40
41 func newEtcdManager(cfg config.Config, cluster cluster.Cluster, stateLogger observability.StateLogger) *etcdManager {
42 return &etcdManager{
43 cfg,
44 cluster,
45 stateLogger,
46 &sync.Mutex{},
47 }
48 }
49
50
51 func Run() error {
52 cfg, err := config.New()
53 if err != nil {
54 return fmt.Errorf("config validation failed: %w", err)
55 }
56
57 log := fog.New(fog.WithLevel(cfg.LogLevel())).WithName("etcd-manager")
58 ctx := fog.IntoContext(context.Background(), log)
59
60 etcdManager := createEtcdManager(cfg)
61 _ = etcdManager.runExitHandler(ctx, exit)
62
63 server := observability.NewServer(9085)
64 go runMetricsServer(ctx, server)
65
66
67 sock := socket.NewSocket(etcdManager.cfg.Fs(), RecoverySocket)
68 if err := sock.Listen(); err != nil {
69 return fmt.Errorf("failed to listen on socket: %w", err)
70 }
71 go sock.Accept(ctx)
72 go sock.Handle(ctx, etcdManager)
73
74 return etcdManager.run(ctx)
75 }
76
77 func createEtcdManager(cfg config.Config) *etcdManager {
78 cluster := cluster.New(cfg.Endpoint(), cfg.MaxUnhealthy(), cluster.Status{})
79 cluster.InitializeStatus()
80 stateLogger := observability.NewStateLogger(cfg.LogLevel())
81 return newEtcdManager(cfg, cluster, stateLogger)
82 }
83
84 func runMetricsServer(ctx context.Context, server *observability.Server) {
85 var lastLoggedError time.Time
86 log := fog.FromContext(ctx)
87 for {
88 if err := server.Run(); err != nil {
89 if time.Since(lastLoggedError) > 5*time.Minute {
90 log.Error(err, "failure in metrics server")
91 lastLoggedError = time.Now()
92 }
93
94 time.Sleep(20 * time.Second)
95 }
96 }
97 }
98
99
100 func (e *etcdManager) run(ctx context.Context) error {
101 log := fog.FromContext(ctx).WithValues("routine", "main")
102 ctx = fog.IntoContext(ctx, log)
103
104 e.cluster.InitializeStatus()
105 for {
106 if err := e.monitorHealth(ctx); err != nil {
107 log.Error(err, "failed to monitor etcd health")
108 }
109 time.Sleep(30 * time.Second)
110 }
111 }
112
113 func (e *etcdManager) monitorHealth(ctx context.Context) error {
114 client, err := e.cfg.EtcdRetryClient()
115 if err != nil {
116 return err
117 }
118 defer client.Close()
119
120
121 e.cluster.UpdateStatus(ctx, client)
122
123 e.stateLogger.LogIfStateChanged(e.cluster.IsHealthy())
124 observability.ReportHealthMetrics(e.cluster.IsHealthy())
125
126
127 alarms := cluster.GetAlarms(ctx, client)
128 observability.ReportAlarmMetrics(alarms)
129
130
131
132 if len(alarms) == 0 && e.cluster.IsResetRequired() {
133 return resetCluster(ctx, e)
134 }
135
136 return nil
137 }
138
139 func resetCluster(ctx context.Context, etcdManager *etcdManager) error {
140 log := fog.FromContext(ctx)
141 log.V(0).Info("etcd cluster is unhealthy, resetting cluster...", "minutesUnhealthy", etcdManager.cfg.MaxUnhealthy().Minutes(), "emaudit", "")
142 acquired, err := etcdManager.WithTryLock(ctx, nil, etcdManager.ResetCluster)
143 if err != nil {
144 return err
145 }
146 if !acquired {
147 log.V(0).Info("cluster reset already in progress")
148 }
149 return nil
150 }
151
152
153
154
155
156 func (e *etcdManager) WithTryLock(ctx context.Context, locked chan<- bool, fn func(context.Context) error) (bool, error) {
157 acquired := e.mutex.TryLock()
158 if locked != nil {
159 locked <- !acquired
160 }
161 if !acquired {
162 return false, nil
163 }
164 defer e.mutex.Unlock()
165
166 return true, fn(ctx)
167 }
168
169
170
171
172 func (e *etcdManager) ResetCluster(ctx context.Context) error {
173 log := fog.FromContext(ctx)
174 e.cluster.ResetTimer()
175
176 conn, err := e.cfg.SystemdConnection(ctx)
177 if err != nil {
178 return fmt.Errorf("failed to establish a connection to systemd: %w", err)
179 }
180 defer conn.Close()
181
182 containerdClient, err := e.cfg.ContainerdClient()
183 if err != nil {
184 return fmt.Errorf("failed to create containerd client: %w", err)
185 }
186 defer containerdClient.Close()
187 ctx = namespaces.WithNamespace(ctx, "k8s.io")
188
189 m := manifest.New(e.cfg.Fs(), etcdManifestPath, &corev1.Pod{}, 0)
190
191
192 defer func() {
193 log.V(0).Info("taking etcd out of new cluster mode")
194 if err := server.ExitNewClusterMode(ctx, &m, conn, containerdClient); err != nil {
195 log.Error(err, "failed to cleanup reset")
196 }
197 }()
198
199 log.V(0).Info("putting etcd into new cluster mode")
200
201 if err := server.EnterNewClusterMode(ctx, &m, conn, containerdClient); err != nil {
202 return fmt.Errorf("failed to reset cluster: %w", err)
203 }
204
205
206
207 if err := e.deleteEtcdMembersAfterAPIServerRecovery(ctx); err != nil {
208 return fmt.Errorf("failed to delete all EtcdMembers: %w", err)
209 }
210 log.V(0).Info("cluster reset successfully", "emaudit", "")
211 return nil
212 }
213
214
215 func (e *etcdManager) runExitHandler(ctx context.Context, exitFn func(context.Context, afero.Fs, func(int))) chan os.Signal {
216 log := fog.FromContext(ctx).WithValues("routine", "exit")
217 ctx = fog.IntoContext(ctx, log)
218 sigchnl := make(chan os.Signal, 1)
219
220 signal.Notify(sigchnl, unix.SIGTERM, unix.SIGINT)
221 go func() {
222 for {
223 <-sigchnl
224 log.V(0).Info("exit signal received - taking etcd out of new cluster mode")
225 exitFn(ctx, e.cfg.Fs(), os.Exit)
226 return
227 }
228 }()
229 return sigchnl
230 }
231
232
233
234
235
236 func exit(ctx context.Context, fs afero.Fs, exiter func(int)) {
237 log := fog.FromContext(ctx)
238 m := manifest.New(fs, etcdManifestPath, &corev1.Pod{}, 0)
239
240 if err := m.WithUpdate(func(obj runtime.Object) error {
241 pod, ok := obj.(*corev1.Pod)
242 if !ok {
243 return fmt.Errorf("current content of the manifest is not a valid pod")
244 }
245 return server.ClearFNCFlag(pod)
246 }); err != nil {
247 log.Error(err, "failed to remove force new cluster flag from the etcd manifest")
248 }
249 exiter(1)
250 }
251
252
253
254
255
256 func (e *etcdManager) deleteEtcdMembersAfterAPIServerRecovery(ctx context.Context) error {
257 etcdMembers, err := e.getEtcdMembers(ctx)
258 if err != nil {
259 return err
260 }
261
262 eclient, err := e.cfg.EtcdRetryClient()
263 if err != nil {
264 return fmt.Errorf("failed to retrieve etcd retry client: %w", err)
265 }
266 defer eclient.Close()
267
268 e.cluster.UpdateStatus(ctx, eclient)
269 return e.deleteEtcdMembers(ctx, etcdMembers)
270 }
271
272
273
274 func (e *etcdManager) getEtcdMembers(ctx context.Context) (*v1etcd.EtcdMemberList, error) {
275 log := fog.FromContext(ctx)
276 log.V(0).Info("waiting for API server to become available...")
277
278
279
280 kclient, err := e.cfg.BlockingKubeRetryClient()
281 if err != nil {
282 return nil, fmt.Errorf("failed to retrieve blocking kube retry client: %w", err)
283 }
284
285 etcdMembers := &v1etcd.EtcdMemberList{}
286
287
288 if err := kclient.SafeList(ctx, etcdMembers); err != nil {
289 return nil, fmt.Errorf("failed to list etcd members: %w", err)
290 }
291 log.V(0).Info("API server is available")
292 return etcdMembers, nil
293 }
294
295
296
297 func (e *etcdManager) deleteEtcdMembers(ctx context.Context, etcdMembers *v1etcd.EtcdMemberList) error {
298 log := fog.FromContext(ctx)
299 log.V(0).Info("deleting all EtcdMembers")
300 kclient, err := e.cfg.KubeRetryClient()
301 if err != nil {
302 return fmt.Errorf("failed to retrieve kube retry client: %w", err)
303 }
304
305
306 for i := range etcdMembers.Items {
307 if err := kclient.SafeDelete(ctx, &etcdMembers.Items[i]); err != nil {
308 return fmt.Errorf("failed to delete etcd member: %w", err)
309 }
310 }
311 return nil
312 }
313
View as plain text