...

Source file src/k8s.io/kubernetes/test/e2e/dra/test-driver/app/server.go

Documentation: k8s.io/kubernetes/test/e2e/dra/test-driver/app

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package app does all of the work necessary to configure and run a
    18  // Kubernetes app process.
    19  package app
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"fmt"
    25  	"net"
    26  	"net/http"
    27  	"net/http/pprof"
    28  	"os"
    29  	"os/signal"
    30  	"path"
    31  	"path/filepath"
    32  	"strings"
    33  	"syscall"
    34  	"time"
    35  
    36  	"github.com/spf13/cobra"
    37  	"k8s.io/component-base/metrics"
    38  
    39  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    40  	"k8s.io/client-go/kubernetes"
    41  	"k8s.io/client-go/rest"
    42  	"k8s.io/client-go/tools/clientcmd"
    43  	cliflag "k8s.io/component-base/cli/flag"
    44  	"k8s.io/component-base/featuregate"
    45  	"k8s.io/component-base/logs"
    46  	logsapi "k8s.io/component-base/logs/api/v1"
    47  	"k8s.io/component-base/metrics/legacyregistry"
    48  	"k8s.io/component-base/term"
    49  	"k8s.io/dynamic-resource-allocation/kubeletplugin"
    50  	"k8s.io/dynamic-resource-allocation/leaderelection"
    51  	"k8s.io/klog/v2"
    52  )
    53  
    54  // NewCommand creates a *cobra.Command object with default parameters.
    55  func NewCommand() *cobra.Command {
    56  	o := logsapi.NewLoggingConfiguration()
    57  	var clientset kubernetes.Interface
    58  	var config *rest.Config
    59  	ctx := context.Background()
    60  	logger := klog.Background()
    61  
    62  	cmd := &cobra.Command{
    63  		Use:  "cdi-test-driver",
    64  		Long: "cdi-test-driver implements a resource driver controller and kubelet plugin.",
    65  	}
    66  	sharedFlagSets := cliflag.NamedFlagSets{}
    67  	fs := sharedFlagSets.FlagSet("logging")
    68  	logsapi.AddFlags(o, fs)
    69  	logs.AddFlags(fs, logs.SkipLoggingConfigurationFlags())
    70  
    71  	fs = sharedFlagSets.FlagSet("Kubernetes client")
    72  	kubeconfig := fs.String("kubeconfig", "", "Absolute path to the kube.config file. Either this or KUBECONFIG need to be set if the driver is being run out of cluster.")
    73  	kubeAPIQPS := fs.Float32("kube-api-qps", 50, "QPS to use while communicating with the kubernetes apiserver.")
    74  	kubeAPIBurst := fs.Int("kube-api-burst", 100, "Burst to use while communicating with the kubernetes apiserver.")
    75  	workers := fs.Int("workers", 10, "Concurrency to process multiple claims")
    76  
    77  	fs = sharedFlagSets.FlagSet("http server")
    78  	httpEndpoint := fs.String("http-endpoint", "",
    79  		"The TCP network address where the HTTP server for diagnostics, including pprof, metrics and (if applicable) leader election health check, will listen (example: `:8080`). The default is the empty string, which means the server is disabled.")
    80  	metricsPath := fs.String("metrics-path", "/metrics", "The HTTP path where Prometheus metrics will be exposed, disabled if empty.")
    81  	profilePath := fs.String("pprof-path", "", "The HTTP path where pprof profiling will be available, disabled if empty.")
    82  
    83  	fs = sharedFlagSets.FlagSet("CDI")
    84  	driverNameFlagName := "drivername"
    85  	driverName := fs.String(driverNameFlagName, "test-driver.cdi.k8s.io", "Resource driver name.")
    86  	driverNameFlag := fs.Lookup(driverNameFlagName)
    87  
    88  	fs = sharedFlagSets.FlagSet("other")
    89  	featureGate := featuregate.NewFeatureGate()
    90  	utilruntime.Must(logsapi.AddFeatureGates(featureGate))
    91  	featureGate.AddFlag(fs)
    92  
    93  	fs = cmd.PersistentFlags()
    94  	for _, f := range sharedFlagSets.FlagSets {
    95  		fs.AddFlagSet(f)
    96  	}
    97  
    98  	mux := http.NewServeMux()
    99  
   100  	cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
   101  		// Activate logging as soon as possible, after that
   102  		// show flags with the final logging configuration.
   103  
   104  		if err := logsapi.ValidateAndApply(o, featureGate); err != nil {
   105  			return err
   106  		}
   107  
   108  		// get the KUBECONFIG from env if specified (useful for local/debug cluster)
   109  		kubeconfigEnv := os.Getenv("KUBECONFIG")
   110  
   111  		if kubeconfigEnv != "" {
   112  			logger.Info("Found KUBECONFIG environment variable set, using that..")
   113  			*kubeconfig = kubeconfigEnv
   114  		}
   115  
   116  		var err error
   117  		if *kubeconfig == "" {
   118  			config, err = rest.InClusterConfig()
   119  			if err != nil {
   120  				return fmt.Errorf("create in-cluster client configuration: %w", err)
   121  			}
   122  		} else {
   123  			config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
   124  			if err != nil {
   125  				return fmt.Errorf("create out-of-cluster client configuration: %w", err)
   126  			}
   127  		}
   128  		config.QPS = *kubeAPIQPS
   129  		config.Burst = *kubeAPIBurst
   130  
   131  		clientset, err = kubernetes.NewForConfig(config)
   132  		if err != nil {
   133  			return fmt.Errorf("create client: %w", err)
   134  		}
   135  
   136  		if *httpEndpoint != "" {
   137  			if *metricsPath != "" {
   138  				// For workqueue and leader election metrics, set up via the anonymous imports of:
   139  				// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go
   140  				// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go
   141  				//
   142  				// Also to happens to include Go runtime and process metrics:
   143  				// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.go#L46-L49
   144  				gatherer := legacyregistry.DefaultGatherer
   145  				actualPath := path.Join("/", *metricsPath)
   146  				logger.Info("Starting metrics", "path", actualPath)
   147  				mux.Handle(actualPath,
   148  					metrics.HandlerFor(gatherer, metrics.HandlerOpts{}))
   149  			}
   150  
   151  			if *profilePath != "" {
   152  				actualPath := path.Join("/", *profilePath)
   153  				logger.Info("Starting profiling", "path", actualPath)
   154  				mux.HandleFunc(path.Join("/", *profilePath), pprof.Index)
   155  				mux.HandleFunc(path.Join("/", *profilePath, "cmdline"), pprof.Cmdline)
   156  				mux.HandleFunc(path.Join("/", *profilePath, "profile"), pprof.Profile)
   157  				mux.HandleFunc(path.Join("/", *profilePath, "symbol"), pprof.Symbol)
   158  				mux.HandleFunc(path.Join("/", *profilePath, "trace"), pprof.Trace)
   159  			}
   160  
   161  			listener, err := net.Listen("tcp", *httpEndpoint)
   162  			if err != nil {
   163  				return fmt.Errorf("listen on HTTP endpoint: %w", err)
   164  			}
   165  
   166  			go func() {
   167  				logger.Info("Starting HTTP server", "endpoint", *httpEndpoint)
   168  				err := http.Serve(listener, mux)
   169  				if err != nil {
   170  					logger.Error(err, "HTTP server failed")
   171  					klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   172  				}
   173  			}()
   174  		}
   175  
   176  		return nil
   177  	}
   178  
   179  	controller := &cobra.Command{
   180  		Use:   "controller",
   181  		Short: "run as resource controller",
   182  		Long:  "cdi-test-driver controller runs as a resource driver controller.",
   183  		Args:  cobra.ExactArgs(0),
   184  	}
   185  	controllerFlagSets := cliflag.NamedFlagSets{}
   186  	fs = controllerFlagSets.FlagSet("leader election")
   187  	enableLeaderElection := fs.Bool("leader-election", false,
   188  		"Enables leader election. If leader election is enabled, additional RBAC rules are required.")
   189  	leaderElectionNamespace := fs.String("leader-election-namespace", "",
   190  		"Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
   191  	leaderElectionLeaseDuration := fs.Duration("leader-election-lease-duration", 15*time.Second,
   192  		"Duration, in seconds, that non-leader candidates will wait to force acquire leadership.")
   193  	leaderElectionRenewDeadline := fs.Duration("leader-election-renew-deadline", 10*time.Second,
   194  		"Duration, in seconds, that the acting leader will retry refreshing leadership before giving up.")
   195  	leaderElectionRetryPeriod := fs.Duration("leader-election-retry-period", 5*time.Second,
   196  		"Duration, in seconds, the LeaderElector clients should wait between tries of actions.")
   197  	fs = controllerFlagSets.FlagSet("controller")
   198  	resourceConfig := fs.String("resource-config", "", "A JSON file containing a Resources struct. Defaults are unshared, network-attached resources.")
   199  	fs = controller.Flags()
   200  	for _, f := range controllerFlagSets.FlagSets {
   201  		fs.AddFlagSet(f)
   202  	}
   203  
   204  	controller.RunE = func(cmd *cobra.Command, args []string) error {
   205  		resources := Resources{}
   206  		if *resourceConfig != "" {
   207  			file, err := os.Open(*resourceConfig)
   208  			if err != nil {
   209  				return fmt.Errorf("open resource config: %w", err)
   210  			}
   211  			decoder := json.NewDecoder(file)
   212  			decoder.DisallowUnknownFields()
   213  			if err := decoder.Decode(&resources); err != nil {
   214  				return fmt.Errorf("parse resource config %q: %w", *resourceConfig, err)
   215  			}
   216  		}
   217  		if resources.DriverName == "" || driverNameFlag.Changed {
   218  			resources.DriverName = *driverName
   219  		}
   220  
   221  		run := func() {
   222  			controller := NewController(clientset, resources)
   223  			controller.Run(ctx, *workers)
   224  		}
   225  
   226  		if !*enableLeaderElection {
   227  			run()
   228  			return nil
   229  		}
   230  
   231  		// This must not change between releases.
   232  		lockName := *driverName
   233  
   234  		// Create a new clientset for leader election
   235  		// to avoid starving it when the normal traffic
   236  		// exceeds the QPS+burst limits.
   237  		leClientset, err := kubernetes.NewForConfig(config)
   238  		if err != nil {
   239  			return fmt.Errorf("create leaderelection client: %w", err)
   240  		}
   241  
   242  		le := leaderelection.New(leClientset, lockName,
   243  			func(ctx context.Context) {
   244  				run()
   245  			},
   246  			leaderelection.LeaseDuration(*leaderElectionLeaseDuration),
   247  			leaderelection.RenewDeadline(*leaderElectionRenewDeadline),
   248  			leaderelection.RetryPeriod(*leaderElectionRetryPeriod),
   249  			leaderelection.Namespace(*leaderElectionNamespace),
   250  		)
   251  		if *httpEndpoint != "" {
   252  			le.PrepareHealthCheck(mux)
   253  		}
   254  		if err := le.Run(); err != nil {
   255  			return fmt.Errorf("leader election failed: %w", err)
   256  		}
   257  
   258  		return nil
   259  	}
   260  	cmd.AddCommand(controller)
   261  
   262  	kubeletPlugin := &cobra.Command{
   263  		Use:   "kubelet-plugin",
   264  		Short: "run as kubelet plugin",
   265  		Long:  "cdi-test-driver kubelet-plugin runs as a device plugin for kubelet that supports dynamic resource allocation.",
   266  		Args:  cobra.ExactArgs(0),
   267  	}
   268  	kubeletPluginFlagSets := cliflag.NamedFlagSets{}
   269  	fs = kubeletPluginFlagSets.FlagSet("kubelet")
   270  	pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.")
   271  	endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.")
   272  	draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.")
   273  	fs = kubeletPluginFlagSets.FlagSet("CDI")
   274  	cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files")
   275  	fs = kubeletPlugin.Flags()
   276  	for _, f := range kubeletPluginFlagSets.FlagSets {
   277  		fs.AddFlagSet(f)
   278  	}
   279  	kubeletPlugin.RunE = func(cmd *cobra.Command, args []string) error {
   280  		// Ensure that directories exist, creating them if necessary. We want
   281  		// to know early if there is a setup problem that would prevent
   282  		// creating those directories.
   283  		if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil {
   284  			return fmt.Errorf("create CDI directory: %w", err)
   285  		}
   286  		if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil {
   287  			return fmt.Errorf("create socket directory: %w", err)
   288  		}
   289  
   290  		plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{},
   291  			kubeletplugin.PluginSocketPath(*endpoint),
   292  			kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
   293  			kubeletplugin.KubeletPluginSocketPath(*draAddress),
   294  		)
   295  		if err != nil {
   296  			return fmt.Errorf("start example plugin: %w", err)
   297  		}
   298  
   299  		// Handle graceful shutdown. We need to delete Unix domain
   300  		// sockets.
   301  		sigc := make(chan os.Signal, 1)
   302  		signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
   303  		logger.Info("Waiting for signal.")
   304  		sig := <-sigc
   305  		logger.Info("Received signal, shutting down.", "signal", sig)
   306  		plugin.Stop()
   307  		return nil
   308  	}
   309  	cmd.AddCommand(kubeletPlugin)
   310  
   311  	// SetUsageAndHelpFunc takes care of flag grouping. However,
   312  	// it doesn't support listing child commands. We add those
   313  	// to cmd.Use.
   314  	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
   315  	cliflag.SetUsageAndHelpFunc(cmd, sharedFlagSets, cols)
   316  	var children []string
   317  	for _, child := range cmd.Commands() {
   318  		children = append(children, child.Use)
   319  	}
   320  	cmd.Use += " [shared flags] " + strings.Join(children, "|")
   321  	cliflag.SetUsageAndHelpFunc(controller, controllerFlagSets, cols)
   322  	cliflag.SetUsageAndHelpFunc(kubeletPlugin, kubeletPluginFlagSets, cols)
   323  
   324  	return cmd
   325  }
   326  

View as plain text