...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin/noderesources.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin

     1  /*
     2  Copyright 2024 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package plugin
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"sync"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/status"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	resourceapi "k8s.io/api/resource/v1alpha2"
    33  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    36  	"k8s.io/apimachinery/pkg/util/sets"
    37  	resourceinformers "k8s.io/client-go/informers/resource/v1alpha2"
    38  	"k8s.io/client-go/kubernetes"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/util/workqueue"
    41  	"k8s.io/klog/v2"
    42  	drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
    43  	"k8s.io/utils/ptr"
    44  )
    45  
    46  const (
    47  	// resyncPeriod for informer
    48  	// TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable?
    49  	resyncPeriod = time.Duration(10 * time.Minute)
    50  )
    51  
    52  // nodeResourcesController collects resource information from all registered
    53  // plugins and synchronizes that information with ResourceSlice objects.
    54  type nodeResourcesController struct {
    55  	ctx        context.Context
    56  	kubeClient kubernetes.Interface
    57  	getNode    func() (*v1.Node, error)
    58  	wg         sync.WaitGroup
    59  	queue      workqueue.RateLimitingInterface
    60  	sliceStore cache.Store
    61  
    62  	mutex         sync.RWMutex
    63  	activePlugins map[string]*activePlugin
    64  }
    65  
    66  // activePlugin holds the resource information about one plugin
    67  // and the gRPC stream that is used to retrieve that. The context
    68  // used by that stream can be canceled separately to stop
    69  // the monitoring.
    70  type activePlugin struct {
    71  	// cancel is the function which cancels the monitorPlugin goroutine
    72  	// for this plugin.
    73  	cancel func(reason error)
    74  
    75  	// resources is protected by the nodeResourcesController read/write lock.
    76  	// When receiving updates from the driver, the entire slice gets replaced,
    77  	// so it is okay to not do a deep copy of it. Only retrieving the slice
    78  	// must be protected by a read lock.
    79  	resources []*resourceapi.ResourceModel
    80  }
    81  
    82  // startNodeResourcesController constructs a new controller and starts it.
    83  //
    84  // If a kubeClient is provided, then it synchronizes ResourceSlices
    85  // with the resource information provided by plugins. Without it,
    86  // the controller is inactive. This can happen when kubelet is run stand-alone
    87  // without an apiserver. In that case we can't and don't need to publish
    88  // ResourceSlices.
    89  func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *nodeResourcesController {
    90  	if kubeClient == nil {
    91  		return nil
    92  	}
    93  
    94  	logger := klog.FromContext(ctx)
    95  	logger = klog.LoggerWithName(logger, "node resources controller")
    96  	ctx = klog.NewContext(ctx, logger)
    97  
    98  	c := &nodeResourcesController{
    99  		ctx:           ctx,
   100  		kubeClient:    kubeClient,
   101  		getNode:       getNode,
   102  		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"),
   103  		activePlugins: make(map[string]*activePlugin),
   104  	}
   105  
   106  	c.wg.Add(1)
   107  	go func() {
   108  		defer c.wg.Done()
   109  		c.run(ctx)
   110  	}()
   111  
   112  	return c
   113  }
   114  
   115  // waitForStop blocks until all background activity spawned by
   116  // the controller has stopped. The context passed to start must
   117  // be canceled for that to happen.
   118  //
   119  // Not needed at the moment, but if it was, this is what it would
   120  // look like...
   121  // func (c *nodeResourcesController) waitForStop() {
   122  // 	if c == nil {
   123  // 		return
   124  // 	}
   125  //
   126  // 	c.wg.Wait()
   127  // }
   128  
   129  // addPlugin is called whenever a plugin has been (re-)registered.
   130  func (c *nodeResourcesController) addPlugin(driverName string, pluginInstance *plugin) {
   131  	if c == nil {
   132  		return
   133  	}
   134  
   135  	klog.FromContext(c.ctx).V(2).Info("Adding plugin", "driverName", driverName)
   136  	c.mutex.Lock()
   137  	defer c.mutex.Unlock()
   138  
   139  	if active := c.activePlugins[driverName]; active != nil {
   140  		active.cancel(errors.New("plugin has re-registered"))
   141  	}
   142  	active := &activePlugin{}
   143  	cancelCtx, cancel := context.WithCancelCause(c.ctx)
   144  	active.cancel = cancel
   145  	c.activePlugins[driverName] = active
   146  	c.queue.Add(driverName)
   147  
   148  	c.wg.Add(1)
   149  	go func() {
   150  		defer c.wg.Done()
   151  		c.monitorPlugin(cancelCtx, active, driverName, pluginInstance)
   152  	}()
   153  }
   154  
   155  // removePlugin is called whenever a plugin has been unregistered.
   156  func (c *nodeResourcesController) removePlugin(driverName string) {
   157  	if c == nil {
   158  		return
   159  	}
   160  
   161  	klog.FromContext(c.ctx).V(2).Info("Removing plugin", "driverName", driverName)
   162  	c.mutex.Lock()
   163  	defer c.mutex.Unlock()
   164  	if active, ok := c.activePlugins[driverName]; ok {
   165  		active.cancel(errors.New("plugin has unregistered"))
   166  		delete(c.activePlugins, driverName)
   167  		c.queue.Add(driverName)
   168  	}
   169  }
   170  
   171  // monitorPlugin calls the plugin to retrieve resource information and caches
   172  // all responses that it gets for processing in the sync method. It keeps
   173  // retrying until an error or EOF response indicates that no further data is
   174  // going to be sent, then watch resources of the plugin stops until it
   175  // re-registers.
   176  func (c *nodeResourcesController) monitorPlugin(ctx context.Context, active *activePlugin, driverName string, pluginInstance *plugin) {
   177  	logger := klog.FromContext(ctx)
   178  	logger = klog.LoggerWithValues(logger, "driverName", driverName)
   179  	logger.Info("Starting to monitor node resources of the plugin")
   180  	defer func() {
   181  		r := recover()
   182  		logger.Info("Stopping to monitor node resources of the plugin", "reason", context.Cause(ctx), "err", ctx.Err(), "recover", r)
   183  	}()
   184  
   185  	// Keep trying until canceled.
   186  	for ctx.Err() == nil {
   187  		logger.V(5).Info("Calling NodeListAndWatchResources")
   188  		stream, err := pluginInstance.NodeListAndWatchResources(ctx, new(drapb.NodeListAndWatchResourcesRequest))
   189  		if err != nil {
   190  			switch {
   191  			case status.Convert(err).Code() == codes.Unimplemented:
   192  				// The plugin simply doesn't provide node resources.
   193  				active.cancel(errors.New("plugin does not support node resource reporting"))
   194  			default:
   195  				// This is a problem, report it and retry.
   196  				logger.Error(err, "Creating gRPC stream for node resources failed")
   197  				// TODO (https://github.com/kubernetes/kubernetes/issues/123689): expontential backoff?
   198  				select {
   199  				case <-time.After(5 * time.Second):
   200  				case <-ctx.Done():
   201  				}
   202  			}
   203  			continue
   204  		}
   205  		for {
   206  			response, err := stream.Recv()
   207  			if err != nil {
   208  				switch {
   209  				case errors.Is(err, io.EOF):
   210  					// This is okay. Some plugins might never change their
   211  					// resources after reporting them once.
   212  					active.cancel(errors.New("plugin has closed the stream"))
   213  				case status.Convert(err).Code() == codes.Unimplemented:
   214  					// The plugin has the method, does not really implement it.
   215  					active.cancel(errors.New("plugin does not support node resource reporting"))
   216  				case ctx.Err() == nil:
   217  					// This is a problem, report it and retry.
   218  					logger.Error(err, "Reading node resources from gRPC stream failed")
   219  					// TODO (https://github.com/kubernetes/kubernetes/issues/123689): expontential backoff?
   220  					select {
   221  					case <-time.After(5 * time.Second):
   222  					case <-ctx.Done():
   223  					}
   224  				}
   225  				break
   226  			}
   227  
   228  			if loggerV := logger.V(6); loggerV.Enabled() {
   229  				loggerV.Info("Driver resources updated", "resources", response.Resources)
   230  			} else {
   231  				logger.V(5).Info("Driver resources updated", "numResources", len(response.Resources))
   232  			}
   233  
   234  			c.mutex.Lock()
   235  			active.resources = response.Resources
   236  			c.mutex.Unlock()
   237  			c.queue.Add(driverName)
   238  		}
   239  	}
   240  }
   241  
   242  // run is running in the background. It handles blocking initialization (like
   243  // syncing the informer) and then syncs the actual with the desired state.
   244  func (c *nodeResourcesController) run(ctx context.Context) {
   245  	logger := klog.FromContext(ctx)
   246  
   247  	// When kubelet starts, we have two choices:
   248  	// - Sync immediately, which in practice will delete all ResourceSlices
   249  	//   because no plugin has registered yet. We could do a DeleteCollection
   250  	//   to speed this up.
   251  	// - Wait a bit, then sync. If all plugins have re-registered in the meantime,
   252  	//   we might not need to change any ResourceSlice.
   253  	//
   254  	// For now syncing starts immediately, with no DeleteCollection. This
   255  	// can be reconsidered later.
   256  
   257  	// Wait until we're able to get a Node object.
   258  	// This means that the object is created on the API server,
   259  	// the kubeclient is functional and the node informer cache is populated with the node object.
   260  	// Without this it doesn't make sense to proceed further as we need a node name and
   261  	// a node UID for this controller to work.
   262  	var node *v1.Node
   263  	var err error
   264  	for {
   265  		node, err = c.getNode()
   266  		if err == nil {
   267  			break
   268  		}
   269  		logger.V(5).Info("Getting Node object failed, waiting", "err", err)
   270  		select {
   271  		case <-ctx.Done():
   272  			return
   273  		case <-time.After(time.Second):
   274  		}
   275  	}
   276  
   277  	// We could use an indexer on driver name, but that seems overkill.
   278  	informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
   279  		options.FieldSelector = "nodeName=" + node.Name
   280  	})
   281  	c.sliceStore = informer.GetStore()
   282  	handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
   283  		AddFunc: func(obj any) {
   284  			slice, ok := obj.(*resourceapi.ResourceSlice)
   285  			if !ok {
   286  				return
   287  			}
   288  			logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice))
   289  			c.queue.Add(slice.DriverName)
   290  		},
   291  		UpdateFunc: func(old, new any) {
   292  			oldSlice, ok := old.(*resourceapi.ResourceSlice)
   293  			if !ok {
   294  				return
   295  			}
   296  			newSlice, ok := new.(*resourceapi.ResourceSlice)
   297  			if !ok {
   298  				return
   299  			}
   300  			if loggerV := logger.V(6); loggerV.Enabled() {
   301  				loggerV.Info("ResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice))
   302  			} else {
   303  				logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice))
   304  			}
   305  			c.queue.Add(newSlice.DriverName)
   306  		},
   307  		DeleteFunc: func(obj any) {
   308  			if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
   309  				obj = tombstone.Obj
   310  			}
   311  			slice, ok := obj.(*resourceapi.ResourceSlice)
   312  			if !ok {
   313  				return
   314  			}
   315  			logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice))
   316  			c.queue.Add(slice.DriverName)
   317  		},
   318  	})
   319  	if err != nil {
   320  		logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring")
   321  		return
   322  	}
   323  
   324  	// Start informer and wait for our cache to be populated.
   325  	c.wg.Add(1)
   326  	go func() {
   327  		defer c.wg.Done()
   328  		informer.Run(ctx.Done())
   329  	}()
   330  	for !handler.HasSynced() {
   331  		select {
   332  		case <-time.After(time.Second):
   333  		case <-ctx.Done():
   334  			return
   335  		}
   336  	}
   337  	logger.Info("ResourceSlice informer has synced")
   338  
   339  	for c.processNextWorkItem(ctx) {
   340  	}
   341  }
   342  
   343  func (c *nodeResourcesController) processNextWorkItem(ctx context.Context) bool {
   344  	key, shutdown := c.queue.Get()
   345  	if shutdown {
   346  		return false
   347  	}
   348  	defer c.queue.Done(key)
   349  
   350  	driverName := key.(string)
   351  
   352  	// Panics are caught and treated like errors.
   353  	var err error
   354  	func() {
   355  		defer func() {
   356  			if r := recover(); r != nil {
   357  				err = fmt.Errorf("internal error: %v", r)
   358  			}
   359  		}()
   360  		err = c.sync(ctx, driverName)
   361  	}()
   362  
   363  	if err != nil {
   364  		// TODO (https://github.com/kubernetes/enhancements/issues/3077): contextual logging in utilruntime
   365  		utilruntime.HandleError(fmt.Errorf("processing driver %v: %v", driverName, err))
   366  		c.queue.AddRateLimited(key)
   367  
   368  		// Return without removing the work item from the queue.
   369  		// It will be retried.
   370  		return true
   371  	}
   372  
   373  	c.queue.Forget(key)
   374  	return true
   375  }
   376  
   377  func (c *nodeResourcesController) sync(ctx context.Context, driverName string) error {
   378  	logger := klog.FromContext(ctx)
   379  
   380  	// Gather information about the actual and desired state.
   381  	slices := c.sliceStore.List()
   382  	var driverResources []*resourceapi.ResourceModel
   383  	c.mutex.RLock()
   384  	if active, ok := c.activePlugins[driverName]; ok {
   385  		// No need for a deep copy, the entire slice gets replaced on writes.
   386  		driverResources = active.resources
   387  	}
   388  	c.mutex.RUnlock()
   389  
   390  	// Resources that are not yet stored in any slice need to be published.
   391  	// Here we track the indices of any resources that are already stored.
   392  	storedResourceIndices := sets.New[int]()
   393  
   394  	// Slices that don't match any driver resource can either be updated (if there
   395  	// are new driver resources that need to be stored) or they need to be deleted.
   396  	obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices))
   397  
   398  	// Match slices with resource information.
   399  	for _, obj := range slices {
   400  		slice := obj.(*resourceapi.ResourceSlice)
   401  		if slice.DriverName != driverName {
   402  			continue
   403  		}
   404  
   405  		index := indexOfModel(driverResources, &slice.ResourceModel)
   406  		if index >= 0 {
   407  			storedResourceIndices.Insert(index)
   408  			continue
   409  		}
   410  
   411  		obsoleteSlices = append(obsoleteSlices, slice)
   412  	}
   413  
   414  	if loggerV := logger.V(6); loggerV.Enabled() {
   415  		// Dump entire resource information.
   416  		loggerV.Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", driverResources)
   417  	} else {
   418  		logger.V(5).Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(driverResources))
   419  	}
   420  
   421  	// Update stale slices before removing what's left.
   422  	//
   423  	// We don't really know which of these slices might have
   424  	// been used for "the" driver resource because they don't
   425  	// have a unique ID. In practice, a driver is most likely
   426  	// to just give us one ResourceModel, in which case
   427  	// this isn't a problem at all. If we have more than one,
   428  	// then at least conceptually it currently doesn't matter
   429  	// where we publish it.
   430  	//
   431  	// The long-term goal is to move the handling of
   432  	// ResourceSlice objects into the driver, with kubelet
   433  	// just acting as a REST proxy. The advantage of that will
   434  	// be that kubelet won't need to support the same
   435  	// resource API version as the driver and the control plane.
   436  	// With that approach, the driver will be able to match
   437  	// up objects more intelligently.
   438  	numObsoleteSlices := len(obsoleteSlices)
   439  	for index, resource := range driverResources {
   440  		if storedResourceIndices.Has(index) {
   441  			// No need to do anything, it is already stored exactly
   442  			// like this in an existing slice.
   443  			continue
   444  		}
   445  
   446  		if numObsoleteSlices > 0 {
   447  			// Update one existing slice.
   448  			slice := obsoleteSlices[numObsoleteSlices-1]
   449  			numObsoleteSlices--
   450  			slice = slice.DeepCopy()
   451  			slice.ResourceModel = *resource
   452  			logger.V(5).Info("Reusing existing node resource slice", "slice", klog.KObj(slice))
   453  			if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
   454  				return fmt.Errorf("update node resource slice: %w", err)
   455  			}
   456  			continue
   457  		}
   458  
   459  		// Although node name and UID are unlikely to change
   460  		// we're getting updated node object just to be on the safe side.
   461  		// It's a cheap operation as it gets an object from the node informer cache.
   462  		node, err := c.getNode()
   463  		if err != nil {
   464  			return fmt.Errorf("retrieve node object: %w", err)
   465  		}
   466  
   467  		// Create a new slice.
   468  		slice := &resourceapi.ResourceSlice{
   469  			ObjectMeta: metav1.ObjectMeta{
   470  				GenerateName: node.Name + "-" + driverName + "-",
   471  				OwnerReferences: []metav1.OwnerReference{
   472  					{
   473  						APIVersion: v1.SchemeGroupVersion.WithKind("Node").Version,
   474  						Kind:       v1.SchemeGroupVersion.WithKind("Node").Kind,
   475  						Name:       node.Name,
   476  						UID:        node.UID,
   477  						Controller: ptr.To(true),
   478  					},
   479  				},
   480  			},
   481  			NodeName:      node.Name,
   482  			DriverName:    driverName,
   483  			ResourceModel: *resource,
   484  		}
   485  		logger.V(5).Info("Creating new node resource slice", "slice", klog.KObj(slice))
   486  		if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
   487  			return fmt.Errorf("create node resource slice: %w", err)
   488  		}
   489  	}
   490  
   491  	// All remaining slices are truly orphaned.
   492  	for i := 0; i < numObsoleteSlices; i++ {
   493  		slice := obsoleteSlices[i]
   494  		logger.V(5).Info("Deleting obsolete node resource slice", "slice", klog.KObj(slice))
   495  		if err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil {
   496  			return fmt.Errorf("delete node resource slice: %w", err)
   497  		}
   498  	}
   499  
   500  	return nil
   501  }
   502  
   503  func indexOfModel(models []*resourceapi.ResourceModel, model *resourceapi.ResourceModel) int {
   504  	for index, m := range models {
   505  		if apiequality.Semantic.DeepEqual(m, model) {
   506  			return index
   507  		}
   508  	}
   509  	return -1
   510  }
   511  

View as plain text