//go:build linux package main import ( "context" "flag" "io" "os" "os/signal" "path/filepath" syscall "golang.org/x/sys/unix" "github.com/containerd/containerd" ff "github.com/peterbourgon/ff/v3" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" criruntime "k8s.io/cri-api/pkg/apis/runtime/v1" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/pkg/sds/devices/agent/metrics" dsv1 "edge-infra.dev/pkg/sds/devices/k8s/apis/v1" "edge-infra.dev/pkg/sds/devices/logger" v1 "edge-infra.dev/pkg/sds/ien/k8s/apis/v1" "edge-infra.dev/pkg/lib/kernel/udev" "edge-infra.dev/pkg/lib/kernel/udev/reader" "edge-infra.dev/pkg/lib/pprof" "edge-infra.dev/pkg/sds/devices/agent" ) var ( ctrSocketPath = "/run/containerd/containerd.sock" ctrNamespace = "k8s.io" pprofAddr = ":8083" // metricsAddr is port metrics are served on metricsAddr = ":9093" ) func main() { log := logger.New() cfg := agent.Config{} fs := flag.NewFlagSet("device-agent", flag.ExitOnError) cfg.BindFlags(fs) if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil { log.Error("error parsing arguments", "error", err) os.Exit(1) } if !cfg.StartServer { log.Info("StartServer is false, shutting down device agent") os.Exit(0) } log.Info("starting device agent with configuration", "config", cfg) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() opts := []logger.Option{ logger.WithLevel(logger.ToLevel(cfg.LogLevel)), } log = logger.New(opts...) ctx = logger.IntoContext(ctx, log) config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config")) if err != nil { log.Error("fail to get local config", "error", err) os.Exit(1) } k8sClient, err := client.New(config, client.Options{Scheme: createScheme()}) if err != nil { log.Error("error creating k8s client", "error", err) os.Exit(1) } resourceManager, err := sap.NewResourceManagerFromConfig(config, client.Options{}, sap.Owner{Field: "device-agent", Group: dsv1.GroupVersion.Group}) if err != nil { log.Error("error creating resource manager", "error", err) os.Exit(1) } log.Info("serving uevent watcher (start)", "address", "netlink") decoder, uEventReader, err := createDecoder("netlink") if err != nil { log.Error("error creating uevent decoder", "error", err) os.Exit(1) } defer uEventReader.Close() c, err := containerd.New(ctrSocketPath, containerd.WithDefaultNamespace(ctrNamespace)) if err != nil { log.Error("error with containerd client connecting to containerd socket", "error", err) os.Exit(1) } defer c.Close() conn, err := grpc.NewClient(filepath.Join("unix://", ctrSocketPath), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Error("failed to connect cri endpoint", "error", err) os.Exit(1) } defer conn.Close() rc := criruntime.NewRuntimeServiceClient(conn) if cfg.EnablePprof { go func() { if err := pprof.Serve(pprofAddr); err != nil { log.Error("error serving pprof server", "error", err) } }() } metricsServer := metrics.Server(metricsAddr) defer metricsServer.Close() go func() { if err := metricsServer.ListenAndServe(); err != nil { log.Error("error serving device metrics server") } }() deviceAgent, err := agent.NewDeviceAgent(ctx, config, k8sClient, c, rc, cfg, decoder, resourceManager) if err != nil { log.Error("error instantiating device agent", "error", err) os.Exit(1) } if err := deviceAgent.Start(ctx); err != nil { log.Error("error running device agent", "error", err) os.Exit(1) } } // createDecoder instantiates a new uvent decoder to read from netlink func createDecoder(source string) (reader.Decoder, io.ReadCloser, error) { uEventReader, fd, err := udev.NewUEventReader(source) if err != nil { return nil, nil, err } decoder := reader.NewSocketReader(fd) return decoder, uEventReader, nil } func init() { prometheus.MustRegister( metrics.DeviceReconcileDurationMetric, metrics.NodeDiskBytesMetric, ) } // createScheme returns a runtime schema registered // with the corev1 and device-system scheme's. func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(v1.AddToScheme(scheme)) utilruntime.Must(dsv1.AddToScheme(scheme)) return scheme }