1
16
17
18
19 package app
20
21 import (
22 "context"
23 "encoding/json"
24 "fmt"
25 "net"
26 "net/http"
27 "net/http/pprof"
28 "os"
29 "os/signal"
30 "path"
31 "path/filepath"
32 "strings"
33 "syscall"
34 "time"
35
36 "github.com/spf13/cobra"
37 "k8s.io/component-base/metrics"
38
39 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
40 "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/rest"
42 "k8s.io/client-go/tools/clientcmd"
43 cliflag "k8s.io/component-base/cli/flag"
44 "k8s.io/component-base/featuregate"
45 "k8s.io/component-base/logs"
46 logsapi "k8s.io/component-base/logs/api/v1"
47 "k8s.io/component-base/metrics/legacyregistry"
48 "k8s.io/component-base/term"
49 "k8s.io/dynamic-resource-allocation/kubeletplugin"
50 "k8s.io/dynamic-resource-allocation/leaderelection"
51 "k8s.io/klog/v2"
52 )
53
54
55 func NewCommand() *cobra.Command {
56 o := logsapi.NewLoggingConfiguration()
57 var clientset kubernetes.Interface
58 var config *rest.Config
59 ctx := context.Background()
60 logger := klog.Background()
61
62 cmd := &cobra.Command{
63 Use: "cdi-test-driver",
64 Long: "cdi-test-driver implements a resource driver controller and kubelet plugin.",
65 }
66 sharedFlagSets := cliflag.NamedFlagSets{}
67 fs := sharedFlagSets.FlagSet("logging")
68 logsapi.AddFlags(o, fs)
69 logs.AddFlags(fs, logs.SkipLoggingConfigurationFlags())
70
71 fs = sharedFlagSets.FlagSet("Kubernetes client")
72 kubeconfig := fs.String("kubeconfig", "", "Absolute path to the kube.config file. Either this or KUBECONFIG need to be set if the driver is being run out of cluster.")
73 kubeAPIQPS := fs.Float32("kube-api-qps", 50, "QPS to use while communicating with the kubernetes apiserver.")
74 kubeAPIBurst := fs.Int("kube-api-burst", 100, "Burst to use while communicating with the kubernetes apiserver.")
75 workers := fs.Int("workers", 10, "Concurrency to process multiple claims")
76
77 fs = sharedFlagSets.FlagSet("http server")
78 httpEndpoint := fs.String("http-endpoint", "",
79 "The TCP network address where the HTTP server for diagnostics, including pprof, metrics and (if applicable) leader election health check, will listen (example: `:8080`). The default is the empty string, which means the server is disabled.")
80 metricsPath := fs.String("metrics-path", "/metrics", "The HTTP path where Prometheus metrics will be exposed, disabled if empty.")
81 profilePath := fs.String("pprof-path", "", "The HTTP path where pprof profiling will be available, disabled if empty.")
82
83 fs = sharedFlagSets.FlagSet("CDI")
84 driverNameFlagName := "drivername"
85 driverName := fs.String(driverNameFlagName, "test-driver.cdi.k8s.io", "Resource driver name.")
86 driverNameFlag := fs.Lookup(driverNameFlagName)
87
88 fs = sharedFlagSets.FlagSet("other")
89 featureGate := featuregate.NewFeatureGate()
90 utilruntime.Must(logsapi.AddFeatureGates(featureGate))
91 featureGate.AddFlag(fs)
92
93 fs = cmd.PersistentFlags()
94 for _, f := range sharedFlagSets.FlagSets {
95 fs.AddFlagSet(f)
96 }
97
98 mux := http.NewServeMux()
99
100 cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
101
102
103
104 if err := logsapi.ValidateAndApply(o, featureGate); err != nil {
105 return err
106 }
107
108
109 kubeconfigEnv := os.Getenv("KUBECONFIG")
110
111 if kubeconfigEnv != "" {
112 logger.Info("Found KUBECONFIG environment variable set, using that..")
113 *kubeconfig = kubeconfigEnv
114 }
115
116 var err error
117 if *kubeconfig == "" {
118 config, err = rest.InClusterConfig()
119 if err != nil {
120 return fmt.Errorf("create in-cluster client configuration: %w", err)
121 }
122 } else {
123 config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
124 if err != nil {
125 return fmt.Errorf("create out-of-cluster client configuration: %w", err)
126 }
127 }
128 config.QPS = *kubeAPIQPS
129 config.Burst = *kubeAPIBurst
130
131 clientset, err = kubernetes.NewForConfig(config)
132 if err != nil {
133 return fmt.Errorf("create client: %w", err)
134 }
135
136 if *httpEndpoint != "" {
137 if *metricsPath != "" {
138
139
140
141
142
143
144 gatherer := legacyregistry.DefaultGatherer
145 actualPath := path.Join("/", *metricsPath)
146 logger.Info("Starting metrics", "path", actualPath)
147 mux.Handle(actualPath,
148 metrics.HandlerFor(gatherer, metrics.HandlerOpts{}))
149 }
150
151 if *profilePath != "" {
152 actualPath := path.Join("/", *profilePath)
153 logger.Info("Starting profiling", "path", actualPath)
154 mux.HandleFunc(path.Join("/", *profilePath), pprof.Index)
155 mux.HandleFunc(path.Join("/", *profilePath, "cmdline"), pprof.Cmdline)
156 mux.HandleFunc(path.Join("/", *profilePath, "profile"), pprof.Profile)
157 mux.HandleFunc(path.Join("/", *profilePath, "symbol"), pprof.Symbol)
158 mux.HandleFunc(path.Join("/", *profilePath, "trace"), pprof.Trace)
159 }
160
161 listener, err := net.Listen("tcp", *httpEndpoint)
162 if err != nil {
163 return fmt.Errorf("listen on HTTP endpoint: %w", err)
164 }
165
166 go func() {
167 logger.Info("Starting HTTP server", "endpoint", *httpEndpoint)
168 err := http.Serve(listener, mux)
169 if err != nil {
170 logger.Error(err, "HTTP server failed")
171 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
172 }
173 }()
174 }
175
176 return nil
177 }
178
179 controller := &cobra.Command{
180 Use: "controller",
181 Short: "run as resource controller",
182 Long: "cdi-test-driver controller runs as a resource driver controller.",
183 Args: cobra.ExactArgs(0),
184 }
185 controllerFlagSets := cliflag.NamedFlagSets{}
186 fs = controllerFlagSets.FlagSet("leader election")
187 enableLeaderElection := fs.Bool("leader-election", false,
188 "Enables leader election. If leader election is enabled, additional RBAC rules are required.")
189 leaderElectionNamespace := fs.String("leader-election-namespace", "",
190 "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
191 leaderElectionLeaseDuration := fs.Duration("leader-election-lease-duration", 15*time.Second,
192 "Duration, in seconds, that non-leader candidates will wait to force acquire leadership.")
193 leaderElectionRenewDeadline := fs.Duration("leader-election-renew-deadline", 10*time.Second,
194 "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up.")
195 leaderElectionRetryPeriod := fs.Duration("leader-election-retry-period", 5*time.Second,
196 "Duration, in seconds, the LeaderElector clients should wait between tries of actions.")
197 fs = controllerFlagSets.FlagSet("controller")
198 resourceConfig := fs.String("resource-config", "", "A JSON file containing a Resources struct. Defaults are unshared, network-attached resources.")
199 fs = controller.Flags()
200 for _, f := range controllerFlagSets.FlagSets {
201 fs.AddFlagSet(f)
202 }
203
204 controller.RunE = func(cmd *cobra.Command, args []string) error {
205 resources := Resources{}
206 if *resourceConfig != "" {
207 file, err := os.Open(*resourceConfig)
208 if err != nil {
209 return fmt.Errorf("open resource config: %w", err)
210 }
211 decoder := json.NewDecoder(file)
212 decoder.DisallowUnknownFields()
213 if err := decoder.Decode(&resources); err != nil {
214 return fmt.Errorf("parse resource config %q: %w", *resourceConfig, err)
215 }
216 }
217 if resources.DriverName == "" || driverNameFlag.Changed {
218 resources.DriverName = *driverName
219 }
220
221 run := func() {
222 controller := NewController(clientset, resources)
223 controller.Run(ctx, *workers)
224 }
225
226 if !*enableLeaderElection {
227 run()
228 return nil
229 }
230
231
232 lockName := *driverName
233
234
235
236
237 leClientset, err := kubernetes.NewForConfig(config)
238 if err != nil {
239 return fmt.Errorf("create leaderelection client: %w", err)
240 }
241
242 le := leaderelection.New(leClientset, lockName,
243 func(ctx context.Context) {
244 run()
245 },
246 leaderelection.LeaseDuration(*leaderElectionLeaseDuration),
247 leaderelection.RenewDeadline(*leaderElectionRenewDeadline),
248 leaderelection.RetryPeriod(*leaderElectionRetryPeriod),
249 leaderelection.Namespace(*leaderElectionNamespace),
250 )
251 if *httpEndpoint != "" {
252 le.PrepareHealthCheck(mux)
253 }
254 if err := le.Run(); err != nil {
255 return fmt.Errorf("leader election failed: %w", err)
256 }
257
258 return nil
259 }
260 cmd.AddCommand(controller)
261
262 kubeletPlugin := &cobra.Command{
263 Use: "kubelet-plugin",
264 Short: "run as kubelet plugin",
265 Long: "cdi-test-driver kubelet-plugin runs as a device plugin for kubelet that supports dynamic resource allocation.",
266 Args: cobra.ExactArgs(0),
267 }
268 kubeletPluginFlagSets := cliflag.NamedFlagSets{}
269 fs = kubeletPluginFlagSets.FlagSet("kubelet")
270 pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.")
271 endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.")
272 draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.")
273 fs = kubeletPluginFlagSets.FlagSet("CDI")
274 cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files")
275 fs = kubeletPlugin.Flags()
276 for _, f := range kubeletPluginFlagSets.FlagSets {
277 fs.AddFlagSet(f)
278 }
279 kubeletPlugin.RunE = func(cmd *cobra.Command, args []string) error {
280
281
282
283 if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil {
284 return fmt.Errorf("create CDI directory: %w", err)
285 }
286 if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil {
287 return fmt.Errorf("create socket directory: %w", err)
288 }
289
290 plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{},
291 kubeletplugin.PluginSocketPath(*endpoint),
292 kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
293 kubeletplugin.KubeletPluginSocketPath(*draAddress),
294 )
295 if err != nil {
296 return fmt.Errorf("start example plugin: %w", err)
297 }
298
299
300
301 sigc := make(chan os.Signal, 1)
302 signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
303 logger.Info("Waiting for signal.")
304 sig := <-sigc
305 logger.Info("Received signal, shutting down.", "signal", sig)
306 plugin.Stop()
307 return nil
308 }
309 cmd.AddCommand(kubeletPlugin)
310
311
312
313
314 cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
315 cliflag.SetUsageAndHelpFunc(cmd, sharedFlagSets, cols)
316 var children []string
317 for _, child := range cmd.Commands() {
318 children = append(children, child.Use)
319 }
320 cmd.Use += " [shared flags] " + strings.Join(children, "|")
321 cliflag.SetUsageAndHelpFunc(controller, controllerFlagSets, cols)
322 cliflag.SetUsageAndHelpFunc(kubeletPlugin, kubeletPluginFlagSets, cols)
323
324 return cmd
325 }
326
View as plain text