1
2
3 package main
4
5 import (
6 "context"
7 "flag"
8 "io"
9 "os"
10 "os/signal"
11 "path/filepath"
12
13 syscall "golang.org/x/sys/unix"
14
15 "github.com/containerd/containerd"
16 ff "github.com/peterbourgon/ff/v3"
17 "github.com/prometheus/client_golang/prometheus"
18 "google.golang.org/grpc"
19 "google.golang.org/grpc/credentials/insecure"
20 "k8s.io/apimachinery/pkg/runtime"
21 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
22 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
23 "k8s.io/client-go/tools/clientcmd"
24 "k8s.io/client-go/util/homedir"
25 criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "edge-infra.dev/pkg/k8s/runtime/sap"
29 "edge-infra.dev/pkg/sds/devices/agent/metrics"
30 dsv1 "edge-infra.dev/pkg/sds/devices/k8s/apis/v1"
31 "edge-infra.dev/pkg/sds/devices/logger"
32 v1 "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
33
34 "edge-infra.dev/pkg/lib/kernel/udev"
35 "edge-infra.dev/pkg/lib/kernel/udev/reader"
36 "edge-infra.dev/pkg/lib/pprof"
37 "edge-infra.dev/pkg/sds/devices/agent"
38 )
39
40 var (
41 ctrSocketPath = "/run/containerd/containerd.sock"
42 ctrNamespace = "k8s.io"
43 pprofAddr = ":8083"
44
45 metricsAddr = ":9093"
46 )
47
48 func main() {
49 log := logger.New()
50 cfg := agent.Config{}
51 fs := flag.NewFlagSet("device-agent", flag.ExitOnError)
52 cfg.BindFlags(fs)
53 if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil {
54 log.Error("error parsing arguments", "error", err)
55 os.Exit(1)
56 }
57
58 if !cfg.StartServer {
59 log.Info("StartServer is false, shutting down device agent")
60 os.Exit(0)
61 }
62
63 log.Info("starting device agent with configuration", "config", cfg)
64
65 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
66 defer stop()
67
68 opts := []logger.Option{
69 logger.WithLevel(logger.ToLevel(cfg.LogLevel)),
70 }
71 log = logger.New(opts...)
72 ctx = logger.IntoContext(ctx, log)
73
74 config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
75 if err != nil {
76 log.Error("fail to get local config", "error", err)
77 os.Exit(1)
78 }
79
80 k8sClient, err := client.New(config, client.Options{Scheme: createScheme()})
81 if err != nil {
82 log.Error("error creating k8s client", "error", err)
83 os.Exit(1)
84 }
85
86 resourceManager, err := sap.NewResourceManagerFromConfig(config, client.Options{}, sap.Owner{Field: "device-agent", Group: dsv1.GroupVersion.Group})
87 if err != nil {
88 log.Error("error creating resource manager", "error", err)
89 os.Exit(1)
90 }
91
92 log.Info("serving uevent watcher (start)", "address", "netlink")
93 decoder, uEventReader, err := createDecoder("netlink")
94 if err != nil {
95 log.Error("error creating uevent decoder", "error", err)
96 os.Exit(1)
97 }
98 defer uEventReader.Close()
99
100 c, err := containerd.New(ctrSocketPath, containerd.WithDefaultNamespace(ctrNamespace))
101 if err != nil {
102 log.Error("error with containerd client connecting to containerd socket", "error", err)
103 os.Exit(1)
104 }
105 defer c.Close()
106
107 conn, err := grpc.NewClient(filepath.Join("unix://", ctrSocketPath), grpc.WithTransportCredentials(insecure.NewCredentials()))
108 if err != nil {
109 log.Error("failed to connect cri endpoint", "error", err)
110 os.Exit(1)
111 }
112 defer conn.Close()
113 rc := criruntime.NewRuntimeServiceClient(conn)
114
115 if cfg.EnablePprof {
116 go func() {
117 if err := pprof.Serve(pprofAddr); err != nil {
118 log.Error("error serving pprof server", "error", err)
119 }
120 }()
121 }
122
123 metricsServer := metrics.Server(metricsAddr)
124 defer metricsServer.Close()
125 go func() {
126 if err := metricsServer.ListenAndServe(); err != nil {
127 log.Error("error serving device metrics server")
128 }
129 }()
130
131 deviceAgent, err := agent.NewDeviceAgent(ctx, config, k8sClient, c, rc, cfg, decoder, resourceManager)
132 if err != nil {
133 log.Error("error instantiating device agent", "error", err)
134 os.Exit(1)
135 }
136
137 if err := deviceAgent.Start(ctx); err != nil {
138 log.Error("error running device agent", "error", err)
139 os.Exit(1)
140 }
141 }
142
143
144 func createDecoder(source string) (reader.Decoder, io.ReadCloser, error) {
145 uEventReader, fd, err := udev.NewUEventReader(source)
146 if err != nil {
147 return nil, nil, err
148 }
149 decoder := reader.NewSocketReader(fd)
150 return decoder, uEventReader, nil
151 }
152
153 func init() {
154 prometheus.MustRegister(
155 metrics.DeviceReconcileDurationMetric,
156 metrics.NodeDiskBytesMetric,
157 )
158 }
159
160
161
162 func createScheme() *runtime.Scheme {
163 scheme := runtime.NewScheme()
164 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
165 utilruntime.Must(v1.AddToScheme(scheme))
166 utilruntime.Must(dsv1.AddToScheme(scheme))
167 return scheme
168 }
169
View as plain text