    17  // Package app does all of the work necessary to configure and run a
    18  // Kubernetes app process.
    19  package app
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"fmt"
    25  	"net"
    26  	"net/http"
    27  	"net/http/pprof"
    28  	"os"
    29  	"os/signal"
    30  	"path"
    31  	"path/filepath"
    32  	"strings"
    33  	"syscall"
    34  	"time"
    36  	"github.com/spf13/cobra"
    37  	"k8s.io/component-base/metrics"
    39  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    40  	"k8s.io/client-go/kubernetes"
    41  	"k8s.io/client-go/rest"
    42  	"k8s.io/client-go/tools/clientcmd"
    43  	cliflag "k8s.io/component-base/cli/flag"
    44  	"k8s.io/component-base/featuregate"
    45  	"k8s.io/component-base/logs"
    46  	logsapi "k8s.io/component-base/logs/api/v1"
    47  	"k8s.io/component-base/metrics/legacyregistry"
    48  	"k8s.io/component-base/term"
    49  	"k8s.io/dynamic-resource-allocation/kubeletplugin"
    50  	"k8s.io/dynamic-resource-allocation/leaderelection"
    51  	"k8s.io/klog/v2"
    52  )
    54  // NewCommand creates a *cobra.Command object with default parameters.
    55  func NewCommand() *cobra.Command {
    56  	o := logsapi.NewLoggingConfiguration()
    57  	var clientset kubernetes.Interface
    58  	var config *rest.Config
    59  	ctx := context.Background()
    60  	logger := klog.Background()
    62  	cmd := &cobra.Command{
    63  		Use:  "cdi-test-driver",
    64  		Long: "cdi-test-driver implements a resource driver controller and kubelet plugin.",
    65  	}
    66  	sharedFlagSets := cliflag.NamedFlagSets{}
    67  	fs := sharedFlagSets.FlagSet("logging")
    68  	logsapi.AddFlags(o, fs)
    69  	logs.AddFlags(fs, logs.SkipLoggingConfigurationFlags())
    71  	fs = sharedFlagSets.FlagSet("Kubernetes client")
    72  	kubeconfig := fs.String("kubeconfig", "", "Absolute path to the kube.config file. Either this or KUBECONFIG need to be set if the driver is being run out of cluster.")
    73  	kubeAPIQPS := fs.Float32("kube-api-qps", 50, "QPS to use while communicating with the kubernetes apiserver.")
    74  	kubeAPIBurst := fs.Int("kube-api-burst", 100, "Burst to use while communicating with the kubernetes apiserver.")
    75  	workers := fs.Int("workers", 10, "Concurrency to process multiple claims")
    77  	fs = sharedFlagSets.FlagSet("http server")
    78  	httpEndpoint := fs.String("http-endpoint", "",
    79  		"The TCP network address where the HTTP server for diagnostics, including pprof, metrics and (if applicable) leader election health check, will listen (example: `:8080`). The default is the empty string, which means the server is disabled.")
    80  	metricsPath := fs.String("metrics-path", "/metrics", "The HTTP path where Prometheus metrics will be exposed, disabled if empty.")
    81  	profilePath := fs.String("pprof-path", "", "The HTTP path where pprof profiling will be available, disabled if empty.")
    83  	fs = sharedFlagSets.FlagSet("CDI")
    84  	driverNameFlagName := "drivername"
    85  	driverName := fs.String(driverNameFlagName, "test-driver.cdi.k8s.io", "Resource driver name.")
    86  	driverNameFlag := fs.Lookup(driverNameFlagName)
    88  	fs = sharedFlagSets.FlagSet("other")
    89  	featureGate := featuregate.NewFeatureGate()
    90  	utilruntime.Must(logsapi.AddFeatureGates(featureGate))
    91  	featureGate.AddFlag(fs)
    93  	fs = cmd.PersistentFlags()
    94  	for _, f := range sharedFlagSets.FlagSets {
    95  		fs.AddFlagSet(f)
    96  	}
    98  	mux := http.NewServeMux()
   100  	cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
   101  		// Activate logging as soon as possible, after that
   102  		// show flags with the final logging configuration.
   104  		if err := logsapi.ValidateAndApply(o, featureGate); err != nil {
   105  			return err
   106  		}
   108  		// get the KUBECONFIG from env if specified (useful for local/debug cluster)
   109  		kubeconfigEnv := os.Getenv("KUBECONFIG")
   111  		if kubeconfigEnv != "" {
   112  			logger.Info("Found KUBECONFIG environment variable set, using that..")
   113  			*kubeconfig = kubeconfigEnv
   114  		}
   116  		var err error
   117  		if *kubeconfig == "" {
   118  			config, err = rest.InClusterConfig()
   119  			if err != nil {
   120  				return fmt.Errorf("create in-cluster client configuration: %w", err)
   121  			}
   122  		} else {
   123  			config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
   124  			if err != nil {
   125  				return fmt.Errorf("create out-of-cluster client configuration: %w", err)
   126  			}
   127  		}
   128  		config.QPS = *kubeAPIQPS
   129  		config.Burst = *kubeAPIBurst
   131  		clientset, err = kubernetes.NewForConfig(config)
   132  		if err != nil {
   133  			return fmt.Errorf("create client: %w", err)
   134  		}
   136  		if *httpEndpoint != "" {
   137  			if *metricsPath != "" {
   138  				// For workqueue and leader election metrics, set up via the anonymous imports of:
   139  				// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go
   140  				// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go
   141  				//
   142  				// Also to happens to include Go runtime and process metrics:
   143  				// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.go#L46-L49
   144  				gatherer := legacyregistry.DefaultGatherer
   145  				actualPath := path.Join("/", *metricsPath)
   146  				logger.Info("Starting metrics", "path", actualPath)
   147  				mux.Handle(actualPath,
   148  					metrics.HandlerFor(gatherer, metrics.HandlerOpts{}))
   149  			}
   151  			if *profilePath != "" {
   152  				actualPath := path.Join("/", *profilePath)
   153  				logger.Info("Starting profiling", "path", actualPath)
   154  				mux.HandleFunc(path.Join("/", *profilePath), pprof.Index)
   155  				mux.HandleFunc(path.Join("/", *profilePath, "cmdline"), pprof.Cmdline)
   156  				mux.HandleFunc(path.Join("/", *profilePath, "profile"), pprof.Profile)
   157  				mux.HandleFunc(path.Join("/", *profilePath, "symbol"), pprof.Symbol)
   158  				mux.HandleFunc(path.Join("/", *profilePath, "trace"), pprof.Trace)
   159  			}
   161  			listener, err := net.Listen("tcp", *httpEndpoint)
   162  			if err != nil {
   163  				return fmt.Errorf("listen on HTTP endpoint: %w", err)
   164  			}
   166  			go func() {
   167  				logger.Info("Starting HTTP server", "endpoint", *httpEndpoint)
   168  				err := http.Serve(listener, mux)
   169  				if err != nil {
   170  					logger.Error(err, "HTTP server failed")
   171  					klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   172  				}
   173  			}()
   174  		}
   176  		return nil
   177  	}
   179  	controller := &cobra.Command{
   180  		Use:   "controller",
   181  		Short: "run as resource controller",
   182  		Long:  "cdi-test-driver controller runs as a resource driver controller.",
   183  		Args:  cobra.ExactArgs(0),
   184  	}
   185  	controllerFlagSets := cliflag.NamedFlagSets{}
   186  	fs = controllerFlagSets.FlagSet("leader election")
   187  	enableLeaderElection := fs.Bool("leader-election", false,
   188  		"Enables leader election. If leader election is enabled, additional RBAC rules are required.")
   189  	leaderElectionNamespace := fs.String("leader-election-namespace", "",
   190  		"Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
   191  	leaderElectionLeaseDuration := fs.Duration("leader-election-lease-duration", 15*time.Second,
   192  		"Duration, in seconds, that non-leader candidates will wait to force acquire leadership.")
   193  	leaderElectionRenewDeadline := fs.Duration("leader-election-renew-deadline", 10*time.Second,
   194  		"Duration, in seconds, that the acting leader will retry refreshing leadership before giving up.")
   195  	leaderElectionRetryPeriod := fs.Duration("leader-election-retry-period", 5*time.Second,
   196  		"Duration, in seconds, the LeaderElector clients should wait between tries of actions.")
   197  	fs = controllerFlagSets.FlagSet("controller")
   198  	resourceConfig := fs.String("resource-config", "", "A JSON file containing a Resources struct. Defaults are unshared, network-attached resources.")
   199  	fs = controller.Flags()
   200  	for _, f := range controllerFlagSets.FlagSets {
   201  		fs.AddFlagSet(f)
   202  	}
   204  	controller.RunE = func(cmd *cobra.Command, args []string) error {
   205  		resources := Resources{}
   206  		if *resourceConfig != "" {
   207  			file, err := os.Open(*resourceConfig)
   208  			if err != nil {
   209  				return fmt.Errorf("open resource config: %w", err)
   210  			}
   211  			decoder := json.NewDecoder(file)
   212  			decoder.DisallowUnknownFields()
   213  			if err := decoder.Decode(&resources); err != nil {
   214  				return fmt.Errorf("parse resource config %q: %w", *resourceConfig, err)
   215  			}
   216  		}
   217  		if resources.DriverName == "" || driverNameFlag.Changed {
   218  			resources.DriverName = *driverName
   219  		}
   221  		run := func() {
   222  			controller := NewController(clientset, resources)
   223  			controller.Run(ctx, *workers)
   224  		}
   226  		if !*enableLeaderElection {
   227  			run()
   228  			return nil
   229  		}
   231  		// This must not change between releases.
   232  		lockName := *driverName
   234  		// Create a new clientset for leader election
   235  		// to avoid starving it when the normal traffic
   236  		// exceeds the QPS+burst limits.
   237  		leClientset, err := kubernetes.NewForConfig(config)
   238  		if err != nil {
   239  			return fmt.Errorf("create leaderelection client: %w", err)
   240  		}
   242  		le := leaderelection.New(leClientset, lockName,
   243  			func(ctx context.Context) {
   244  				run()
   245  			},
   246  			leaderelection.LeaseDuration(*leaderElectionLeaseDuration),
   247  			leaderelection.RenewDeadline(*leaderElectionRenewDeadline),
   248  			leaderelection.RetryPeriod(*leaderElectionRetryPeriod),
   249  			leaderelection.Namespace(*leaderElectionNamespace),
   250  		)
   251  		if *httpEndpoint != "" {
   252  			le.PrepareHealthCheck(mux)
   253  		}
   254  		if err := le.Run(); err != nil {
   255  			return fmt.Errorf("leader election failed: %w", err)
   256  		}
   258  		return nil
   259  	}
   260  	cmd.AddCommand(controller)
   262  	kubeletPlugin := &cobra.Command{
   263  		Use:   "kubelet-plugin",
   264  		Short: "run as kubelet plugin",
   265  		Long:  "cdi-test-driver kubelet-plugin runs as a device plugin for kubelet that supports dynamic resource allocation.",
   266  		Args:  cobra.ExactArgs(0),
   267  	}
   268  	kubeletPluginFlagSets := cliflag.NamedFlagSets{}
   269  	fs = kubeletPluginFlagSets.FlagSet("kubelet")
   270  	pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.")
   271  	endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.")
   272  	draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.")
   273  	fs = kubeletPluginFlagSets.FlagSet("CDI")
   274  	cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files")
   275  	fs = kubeletPlugin.Flags()
   276  	for _, f := range kubeletPluginFlagSets.FlagSets {
   277  		fs.AddFlagSet(f)
   278  	}
   279  	kubeletPlugin.RunE = func(cmd *cobra.Command, args []string) error {
   280  		// Ensure that directories exist, creating them if necessary. We want
   281  		// to know early if there is a setup problem that would prevent
   282  		// creating those directories.
   283  		if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil {
   284  			return fmt.Errorf("create CDI directory: %w", err)
   285  		}
   286  		if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil {
   287  			return fmt.Errorf("create socket directory: %w", err)
   288  		}
   290  		plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{},
   291  			kubeletplugin.PluginSocketPath(*endpoint),
   292  			kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
   293  			kubeletplugin.KubeletPluginSocketPath(*draAddress),
   294  		)
   295  		if err != nil {
   296  			return fmt.Errorf("start example plugin: %w", err)
   297  		}
   299  		// Handle graceful shutdown. We need to delete Unix domain
   300  		// sockets.
   301  		sigc := make(chan os.Signal, 1)
   302  		signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
   303  		logger.Info("Waiting for signal.")
   304  		sig := <-sigc
   305  		logger.Info("Received signal, shutting down.", "signal", sig)
   306  		plugin.Stop()
   307  		return nil
   308  	}
   309  	cmd.AddCommand(kubeletPlugin)
   311  	// SetUsageAndHelpFunc takes care of flag grouping. However,
   312  	// it doesn't support listing child commands. We add those
   313  	// to cmd.Use.
   314  	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
   315  	cliflag.SetUsageAndHelpFunc(cmd, sharedFlagSets, cols)
   316  	var children []string
   317  	for _, child := range cmd.Commands() {
   318  		children = append(children, child.Use)
   319  	}
   320  	cmd.Use += " [shared flags] " + strings.Join(children, "|")
   321  	cliflag.SetUsageAndHelpFunc(controller, controllerFlagSets, cols)
   322  	cliflag.SetUsageAndHelpFunc(kubeletPlugin, kubeletPluginFlagSets, cols)
   324  	return cmd
   325  }

