1
2
3 package config
4
5 import (
6 "context"
7 "errors"
8 "flag"
9 "fmt"
10 "net"
11 "os"
12 "path/filepath"
13 "strconv"
14 "time"
15
16 "github.com/peterbourgon/ff/v3"
17 "github.com/spf13/afero"
18 clientv3 "go.etcd.io/etcd/client/v3"
19 "k8s.io/apimachinery/pkg/runtime"
20 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
22 "k8s.io/client-go/tools/clientcmd"
23 ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
24
25 "edge-infra.dev/pkg/lib/fog"
26 v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
27 "edge-infra.dev/pkg/sds/lib/containerd"
28 "edge-infra.dev/pkg/sds/lib/dbus/systemd"
29 "edge-infra.dev/pkg/sds/lib/etcd/client"
30 etcdretryclient "edge-infra.dev/pkg/sds/lib/etcd/client/retry"
31 kuberetryclient "edge-infra.dev/pkg/sds/lib/k8s/retryclient"
32 kubeclienttypes "edge-infra.dev/pkg/sds/lib/k8s/retryclient/types"
33 "edge-infra.dev/pkg/sds/lib/os/file"
34 )
35
36 const (
37 DefaultContainerdSocket = "/run/containerd/containerd.sock"
38
39 DefaultKubeconfig = "/etc/kubernetes/admin.conf"
40 DefaultMaxUnhealthyDuration = 10 * time.Minute
41 DefaultLogLevel = "INFO"
42
43 DefaultControlplaneEtcdHost = "127.0.0.1"
44 DefaultControlplaneEtcdPort = "2379"
45
46 DefaultKubeClientRequestTimeout = 3 * time.Second
47 DefaultKubeClientInitialBackoff = 2 * time.Second
48 DefaultKubeClientBackoffFactor = 1
49 DefaultKubeClientMaxRetries = 1
50
51 DefaultBlockingKubeClientRequestTimeout = 5 * time.Second
52 DefaultBlockingKubeClientInitialBackoff = 15 * time.Second
53 DefaultBlockingKubeClientBackoffFactor = 1
54 DefaultBlockingKubeClientMaxRetries = 20
55
56 DefaultEtcdClientRequestTimeout = 3 * time.Second
57 DefaultEtcdClientInitialBackoff = 2 * time.Second
58 DefaultEtcdClientBackoffFactor = 1
59 DefaultEtcdClientMaxRetries = 1
60 )
61
62 var (
63
64 logMap = map[string]int{
65 "DEBUG": fog.DEBUG,
66 "INFO": fog.INFO,
67 "WARN": fog.WARN,
68 "ERROR": fog.ERROR,
69 "DPANIC": fog.DPANIC,
70 "PANIC": fog.PANIC,
71 "FATAL": fog.FATAL,
72 }
73 )
74
75
76 type Config interface {
77 BlockingKubeRetryClient() (kubeclienttypes.Retrier, error)
78 KubeRetryClient() (kubeclienttypes.Retrier, error)
79 EtcdRetryClient() (etcdretryclient.Retrier, error)
80 ContainerdClient() (containerd.Client, error)
81 SystemdConnection(context.Context) (systemd.Connection, error)
82 Fs() afero.Fs
83 Endpoint() string
84 MaxUnhealthy() time.Duration
85 LogLevel() int
86 }
87
88
89 type config struct {
90 fs afero.Fs
91 settings
92 clients
93 }
94
95
96 type settings struct {
97 kubeconfig string
98 containerdSocket string
99 controlplaneEtcd address
100 kubeClientSettings clientConfig
101 blockingKubeClientSettings clientConfig
102 etcdClientSettings clientConfig
103 maxUnhealthy time.Duration
104 logLevel string
105 }
106
107
108 type clientConfig struct {
109 requestTimeout time.Duration
110 initialBackoff time.Duration
111 backoffFactor float64
112 maxRetries int
113 }
114
115
116 type address struct {
117 host string
118 port string
119 }
120
121
122 type clients struct {
123 kube kubeclienttypes.Retrier
124 blockingKube kubeclienttypes.Retrier
125 }
126
127
128 func New() (Config, error) {
129 cfg := &config{}
130 if err := cfg.Initialize(); err != nil {
131 return nil, err
132 }
133
134 return cfg, nil
135 }
136
137
138 func (c *config) Initialize() error {
139 fs := flag.NewFlagSet("run", flag.ExitOnError)
140 c.bindFlags(fs)
141
142 if err := ff.Parse(fs, os.Args[2:], ff.WithEnvVarNoPrefix()); err != nil {
143 return fmt.Errorf("failed to parse command line arguments and environment variables: %w", err)
144 }
145
146 if err := c.validate(); err != nil {
147 return fmt.Errorf("invalid command line arguments or environment variables: %w", err)
148 }
149 return nil
150 }
151
152
153 func (c *config) bindFlags(fs *flag.FlagSet) {
154 fs.StringVar(&c.settings.kubeconfig,
155 "kubeconfig",
156 DefaultKubeconfig,
157 "path to kubeconfig file (must be in /etc/kubernetes)")
158 fs.StringVar(&c.settings.containerdSocket,
159 "containerd-socket",
160 DefaultContainerdSocket,
161 "path to the containerd socket")
162 fs.DurationVar(&c.settings.maxUnhealthy,
163 "max-unhealthy-duration",
164 DefaultMaxUnhealthyDuration,
165 "maximum duration an etcd cluster can be unhealthy before a fix is attempted")
166 fs.StringVar(&c.settings.logLevel,
167 "log-level",
168 DefaultLogLevel,
169 "must be one of DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL")
170
171 c.bindControlPlaneEtcdFlags(fs)
172 c.bindKubeClientFlags(fs)
173 c.bindBlockingKubeClientFlags(fs)
174 c.bindEtcdClientFlags(fs)
175 }
176
177
178 func (c *config) bindControlPlaneEtcdFlags(fs *flag.FlagSet) {
179 fs.StringVar(&c.settings.controlplaneEtcd.host,
180 "controlplane-etcd-host",
181 DefaultControlplaneEtcdHost,
182 "controlplane etcd host")
183 fs.StringVar(&c.settings.controlplaneEtcd.port,
184 "controlplane-etcd-port",
185 DefaultControlplaneEtcdPort,
186 "controlplane etcd port")
187 }
188
189
190 func (c *config) bindKubeClientFlags(fs *flag.FlagSet) {
191 fs.DurationVar(&c.settings.kubeClientSettings.requestTimeout,
192 "kube-client-request-timeout",
193 DefaultKubeClientRequestTimeout,
194 "timeout for Kubernetes client requests")
195 fs.DurationVar(&c.settings.kubeClientSettings.initialBackoff,
196 "kube-client-initial-backoff",
197 DefaultKubeClientInitialBackoff,
198 "initial backoff period between Kubernetes client request attempts")
199 fs.Float64Var(&c.settings.kubeClientSettings.backoffFactor,
200 "kube-client-backoff-factor",
201 DefaultKubeClientBackoffFactor,
202 "factor by which the backoff period increases after each retry")
203 fs.IntVar(&c.settings.kubeClientSettings.maxRetries,
204 "kube-client-max-retries",
205 DefaultKubeClientMaxRetries,
206 "max amount of times to retry a failing Kubernetes client request")
207 }
208
209
210 func (c *config) bindBlockingKubeClientFlags(fs *flag.FlagSet) {
211 fs.DurationVar(&c.settings.blockingKubeClientSettings.requestTimeout,
212 "blocking-kube-client-request-timeout",
213 DefaultBlockingKubeClientRequestTimeout,
214 "timeout for blocking Kubernetes client requests")
215 fs.DurationVar(&c.settings.blockingKubeClientSettings.initialBackoff,
216 "blocking-kube-client-initial-backoff",
217 DefaultBlockingKubeClientInitialBackoff,
218 "initial backoff period between blocking Kubernetes client request attempts")
219 fs.Float64Var(&c.settings.blockingKubeClientSettings.backoffFactor,
220 "blocking-kube-client-backoff-factor",
221 DefaultBlockingKubeClientBackoffFactor,
222 "factor by which the backoff period increases after each retry")
223 fs.IntVar(&c.settings.blockingKubeClientSettings.maxRetries,
224 "blocking-kube-client-max-retries",
225 DefaultBlockingKubeClientMaxRetries,
226 "max amount of times to retry a failing blocking Kubernetes client request")
227 }
228
229
230 func (c *config) bindEtcdClientFlags(fs *flag.FlagSet) {
231 fs.DurationVar(&c.settings.etcdClientSettings.requestTimeout,
232 "etcd-client-request-timeout",
233 DefaultEtcdClientRequestTimeout,
234 "timeout for etcd client requests")
235 fs.DurationVar(&c.settings.etcdClientSettings.initialBackoff,
236 "etcd-client-initial-backoff",
237 DefaultEtcdClientInitialBackoff,
238 "initial backoff period between etcd client request attempts")
239 fs.Float64Var(&c.settings.etcdClientSettings.backoffFactor,
240 "etcd-client-backoff-factor",
241 DefaultEtcdClientBackoffFactor,
242 "factor by which the backoff period increases after each retry")
243 fs.IntVar(&c.settings.etcdClientSettings.maxRetries,
244 "etcd-client-max-retries",
245 DefaultEtcdClientMaxRetries,
246 "max amount of times to retry a failing etcd client request")
247 }
248
249
250 func (c *config) validate() error {
251 if dir := filepath.Dir(c.settings.kubeconfig); dir != "/etc/kubernetes" {
252 return fmt.Errorf("invalid kubeconfig path: %s", dir)
253 }
254
255 if _, ok := logMap[c.settings.logLevel]; !ok {
256 return fmt.Errorf("invalid log level: %s", c.settings.logLevel)
257 }
258
259 if _, err := net.LookupIP(c.settings.controlplaneEtcd.host); err != nil {
260 return fmt.Errorf("invalid etcd hostname: %v", err)
261 }
262
263 port, err := strconv.Atoi(c.settings.controlplaneEtcd.port)
264 if err != nil {
265 return fmt.Errorf("invalid etcd port: %v", err)
266 }
267 if port < 1 || port > 65535 {
268 return errors.New("invalid etcd port: port value must be between 1 and 65535")
269 }
270
271 return nil
272 }
273
274
275 func (c *config) EtcdRetryClient() (etcdretryclient.Retrier, error) {
276 client, err := newEtcdClient(c.settings.controlplaneEtcd, c.settings.etcdClientSettings.requestTimeout)
277 if err != nil {
278 return nil, err
279 }
280 config := etcdretryclient.Config{
281 RequestTimeout: c.settings.etcdClientSettings.requestTimeout,
282 InitialBackoff: c.settings.etcdClientSettings.initialBackoff,
283 BackoffFactor: c.settings.etcdClientSettings.backoffFactor,
284 MaxRetries: c.settings.etcdClientSettings.maxRetries,
285 }
286
287 return etcdretryclient.New(*client, config), nil
288 }
289
290
291 func newEtcdClient(addr address, timeout time.Duration) (*clientv3.Client, error) {
292 tlsConfig, err := client.NewTLSConfig(file.New())
293 if err != nil {
294 return nil, err
295 }
296
297 clientURL := net.JoinHostPort(addr.host, addr.port)
298
299 client, err := client.New(tlsConfig, timeout, clientURL)
300 if err != nil {
301 return nil, fmt.Errorf("failed to create etcd client: %w", err)
302 }
303
304 return client, nil
305 }
306
307
308
309 func (c *config) KubeRetryClient() (kubeclienttypes.Retrier, error) {
310 if c.clients.kube == nil {
311 client, err := newKubeClient(c.settings.kubeconfig)
312 if err != nil {
313 return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
314 }
315
316 config := kuberetryclient.Config{
317 RequestTimeout: c.settings.kubeClientSettings.requestTimeout,
318 InitialBackoff: c.settings.kubeClientSettings.initialBackoff,
319 BackoffFactor: c.settings.kubeClientSettings.backoffFactor,
320 MaxRetries: c.settings.kubeClientSettings.maxRetries,
321 }
322 c.clients.kube = kuberetryclient.New(client, client, config)
323 }
324 return c.clients.kube, nil
325 }
326
327
328
329 func (c *config) BlockingKubeRetryClient() (kubeclienttypes.Retrier, error) {
330 if c.clients.blockingKube == nil {
331 var (
332 startTime = time.Now()
333 client ctrlclient.Client
334 err error
335 )
336 for {
337 if client, err = newKubeClient(c.settings.kubeconfig); err == nil || time.Since(startTime) > 5*time.Minute {
338 break
339 }
340 time.Sleep(10 * time.Second)
341 }
342 if err != nil {
343 return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
344 }
345
346 config := kuberetryclient.Config{
347 RequestTimeout: c.settings.blockingKubeClientSettings.requestTimeout,
348 InitialBackoff: c.settings.blockingKubeClientSettings.initialBackoff,
349 BackoffFactor: c.settings.blockingKubeClientSettings.backoffFactor,
350 MaxRetries: c.settings.blockingKubeClientSettings.maxRetries,
351 }
352 c.clients.blockingKube = kuberetryclient.New(client, client, config)
353 }
354 return c.clients.blockingKube, nil
355 }
356
357
358
359 func newKubeClient(kubeconfig string) (ctrlclient.Client, error) {
360 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
361 if err != nil {
362 return nil, fmt.Errorf("failed to create Kubernetes config: %w", err)
363 }
364
365 opts := ctrlclient.Options{
366 Scheme: createScheme(),
367 }
368 return ctrlclient.New(config, opts)
369 }
370
371
372 func createScheme() *runtime.Scheme {
373 scheme := runtime.NewScheme()
374 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
375 utilruntime.Must(v1etcd.AddToScheme(scheme))
376 return scheme
377 }
378
379
380 func (c *config) ContainerdClient() (containerd.Client, error) {
381 return containerd.NewClient(c.settings.containerdSocket)
382 }
383
384
385 func (c *config) SystemdConnection(ctx context.Context) (systemd.Connection, error) {
386 return systemd.NewConnection(ctx)
387 }
388
389
390 func (c *config) Fs() afero.Fs {
391 if c.fs == nil {
392 c.fs = afero.NewOsFs()
393 }
394
395 return c.fs
396 }
397
398
399
400
401 func (c *config) Endpoint() string {
402 return net.JoinHostPort(c.settings.controlplaneEtcd.host, c.settings.controlplaneEtcd.port)
403 }
404
405
406 func (c *config) MaxUnhealthy() time.Duration {
407 return c.settings.maxUnhealthy
408 }
409
410
411 func (c *config) LogLevel() int {
412 return logMap[c.settings.logLevel]
413 }
414
View as plain text