...

Source file src/edge-infra.dev/cmd/sds/devices/device-agent/deviceagent.go

Documentation: edge-infra.dev/cmd/sds/devices/device-agent

     1  //go:build linux
     2  
     3  package main
     4  
     5  import (
     6  	"context"
     7  	"flag"
     8  	"io"
     9  	"os"
    10  	"os/signal"
    11  	"path/filepath"
    12  
    13  	syscall "golang.org/x/sys/unix"
    14  
    15  	"github.com/containerd/containerd"
    16  	ff "github.com/peterbourgon/ff/v3"
    17  	"github.com/prometheus/client_golang/prometheus"
    18  	"google.golang.org/grpc"
    19  	"google.golang.org/grpc/credentials/insecure"
    20  	"k8s.io/apimachinery/pkg/runtime"
    21  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    22  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    23  	"k8s.io/client-go/tools/clientcmd"
    24  	"k8s.io/client-go/util/homedir"
    25  	criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"edge-infra.dev/pkg/k8s/runtime/sap"
    29  	"edge-infra.dev/pkg/sds/devices/agent/metrics"
    30  	dsv1 "edge-infra.dev/pkg/sds/devices/k8s/apis/v1"
    31  	"edge-infra.dev/pkg/sds/devices/logger"
    32  	v1 "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
    33  
    34  	"edge-infra.dev/pkg/lib/kernel/udev"
    35  	"edge-infra.dev/pkg/lib/kernel/udev/reader"
    36  	"edge-infra.dev/pkg/lib/pprof"
    37  	"edge-infra.dev/pkg/sds/devices/agent"
    38  )
    39  
    40  var (
    41  	ctrSocketPath = "/run/containerd/containerd.sock"
    42  	ctrNamespace  = "k8s.io"
    43  	pprofAddr     = ":8083"
    44  	// metricsAddr is port metrics are served on
    45  	metricsAddr = ":9093"
    46  )
    47  
    48  func main() {
    49  	log := logger.New()
    50  	cfg := agent.Config{}
    51  	fs := flag.NewFlagSet("device-agent", flag.ExitOnError)
    52  	cfg.BindFlags(fs)
    53  	if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix(), ff.WithIgnoreUndefined(true)); err != nil {
    54  		log.Error("error parsing arguments", "error", err)
    55  		os.Exit(1)
    56  	}
    57  
    58  	if !cfg.StartServer {
    59  		log.Info("StartServer is false, shutting down device agent")
    60  		os.Exit(0)
    61  	}
    62  
    63  	log.Info("starting device agent with configuration", "config", cfg)
    64  
    65  	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    66  	defer stop()
    67  
    68  	opts := []logger.Option{
    69  		logger.WithLevel(logger.ToLevel(cfg.LogLevel)),
    70  	}
    71  	log = logger.New(opts...)
    72  	ctx = logger.IntoContext(ctx, log)
    73  
    74  	config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
    75  	if err != nil {
    76  		log.Error("fail to get local config", "error", err)
    77  		os.Exit(1)
    78  	}
    79  
    80  	k8sClient, err := client.New(config, client.Options{Scheme: createScheme()})
    81  	if err != nil {
    82  		log.Error("error creating k8s client", "error", err)
    83  		os.Exit(1)
    84  	}
    85  
    86  	resourceManager, err := sap.NewResourceManagerFromConfig(config, client.Options{}, sap.Owner{Field: "device-agent", Group: dsv1.GroupVersion.Group})
    87  	if err != nil {
    88  		log.Error("error creating resource manager", "error", err)
    89  		os.Exit(1)
    90  	}
    91  
    92  	log.Info("serving uevent watcher (start)", "address", "netlink")
    93  	decoder, uEventReader, err := createDecoder("netlink")
    94  	if err != nil {
    95  		log.Error("error creating uevent decoder", "error", err)
    96  		os.Exit(1)
    97  	}
    98  	defer uEventReader.Close()
    99  
   100  	c, err := containerd.New(ctrSocketPath, containerd.WithDefaultNamespace(ctrNamespace))
   101  	if err != nil {
   102  		log.Error("error with containerd client connecting to containerd socket", "error", err)
   103  		os.Exit(1)
   104  	}
   105  	defer c.Close()
   106  
   107  	conn, err := grpc.NewClient(filepath.Join("unix://", ctrSocketPath), grpc.WithTransportCredentials(insecure.NewCredentials()))
   108  	if err != nil {
   109  		log.Error("failed to connect cri endpoint", "error", err)
   110  		os.Exit(1)
   111  	}
   112  	defer conn.Close()
   113  	rc := criruntime.NewRuntimeServiceClient(conn)
   114  
   115  	if cfg.EnablePprof {
   116  		go func() {
   117  			if err := pprof.Serve(pprofAddr); err != nil {
   118  				log.Error("error serving pprof server", "error", err)
   119  			}
   120  		}()
   121  	}
   122  
   123  	metricsServer := metrics.Server(metricsAddr)
   124  	defer metricsServer.Close()
   125  	go func() {
   126  		if err := metricsServer.ListenAndServe(); err != nil {
   127  			log.Error("error serving device metrics server")
   128  		}
   129  	}()
   130  
   131  	deviceAgent, err := agent.NewDeviceAgent(ctx, config, k8sClient, c, rc, cfg, decoder, resourceManager)
   132  	if err != nil {
   133  		log.Error("error instantiating device agent", "error", err)
   134  		os.Exit(1)
   135  	}
   136  
   137  	if err := deviceAgent.Start(ctx); err != nil {
   138  		log.Error("error running device agent", "error", err)
   139  		os.Exit(1)
   140  	}
   141  }
   142  
   143  // createDecoder instantiates a new uvent decoder to read from netlink
   144  func createDecoder(source string) (reader.Decoder, io.ReadCloser, error) {
   145  	uEventReader, fd, err := udev.NewUEventReader(source)
   146  	if err != nil {
   147  		return nil, nil, err
   148  	}
   149  	decoder := reader.NewSocketReader(fd)
   150  	return decoder, uEventReader, nil
   151  }
   152  
   153  func init() {
   154  	prometheus.MustRegister(
   155  		metrics.DeviceReconcileDurationMetric,
   156  		metrics.NodeDiskBytesMetric,
   157  	)
   158  }
   159  
   160  // createScheme returns a runtime schema registered
   161  // with the corev1 and device-system scheme's.
   162  func createScheme() *runtime.Scheme {
   163  	scheme := runtime.NewScheme()
   164  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   165  	utilruntime.Must(v1.AddToScheme(scheme))
   166  	utilruntime.Must(dsv1.AddToScheme(scheme))
   167  	return scheme
   168  }
   169  

View as plain text