...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin/plugin.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package plugin
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"net"
    24  	"strings"
    25  	"sync"
    26  	"time"
    27  
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/connectivity"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	v1 "k8s.io/api/core/v1"
    32  	utilversion "k8s.io/apimachinery/pkg/util/version"
    33  	"k8s.io/client-go/kubernetes"
    34  	"k8s.io/klog/v2"
    35  )
    36  
    37  const (
    38  	// DRAPluginName is the name of the in-tree DRA Plugin.
    39  	DRAPluginName   = "kubernetes.io/dra"
    40  	v1alpha3Version = "v1alpha3"
    41  	v1alpha2Version = "v1alpha2"
    42  )
    43  
    44  // Plugin is a description of a DRA Plugin, defined by an endpoint
    45  // and the highest DRA version supported.
    46  type plugin struct {
    47  	sync.Mutex
    48  	conn                    *grpc.ClientConn
    49  	endpoint                string
    50  	version                 string
    51  	highestSupportedVersion *utilversion.Version
    52  	clientTimeout           time.Duration
    53  }
    54  
    55  func (p *plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
    56  	p.Lock()
    57  	defer p.Unlock()
    58  
    59  	if p.conn != nil {
    60  		return p.conn, nil
    61  	}
    62  
    63  	network := "unix"
    64  	klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint)
    65  	conn, err := grpc.Dial(
    66  		p.endpoint,
    67  		grpc.WithTransportCredentials(insecure.NewCredentials()),
    68  		grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
    69  			return (&net.Dialer{}).DialContext(ctx, network, target)
    70  		}),
    71  	)
    72  	if err != nil {
    73  		return nil, err
    74  	}
    75  
    76  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    77  	defer cancel()
    78  
    79  	if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok {
    80  		return nil, errors.New("timed out waiting for gRPC connection to be ready")
    81  	}
    82  
    83  	p.conn = conn
    84  	return p.conn, nil
    85  }
    86  
    87  func (p *plugin) getVersion() string {
    88  	p.Lock()
    89  	defer p.Unlock()
    90  	return p.version
    91  }
    92  
    93  func (p *plugin) setVersion(version string) {
    94  	p.Lock()
    95  	p.version = version
    96  	p.Unlock()
    97  }
    98  
    99  // RegistrationHandler is the handler which is fed to the pluginwatcher API.
   100  type RegistrationHandler struct {
   101  	controller *nodeResourcesController
   102  }
   103  
   104  // NewPluginHandler returns new registration handler.
   105  //
   106  // Must only be called once per process because it manages global state.
   107  // If a kubeClient is provided, then it synchronizes ResourceSlices
   108  // with the resource information provided by plugins.
   109  func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
   110  	handler := &RegistrationHandler{}
   111  
   112  	// If kubelet ever gets an API for stopping registration handlers, then
   113  	// that would need to be hooked up with stopping the controller.
   114  	handler.controller = startNodeResourcesController(context.TODO(), kubeClient, getNode)
   115  
   116  	return handler
   117  }
   118  
   119  // RegisterPlugin is called when a plugin can be registered.
   120  func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
   121  	klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint)
   122  
   123  	highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions)
   124  	if err != nil {
   125  		return err
   126  	}
   127  
   128  	var timeout time.Duration
   129  	if pluginClientTimeout == nil {
   130  		timeout = PluginClientTimeout
   131  	} else {
   132  		timeout = *pluginClientTimeout
   133  	}
   134  
   135  	pluginInstance := &plugin{
   136  		conn:                    nil,
   137  		endpoint:                endpoint,
   138  		version:                 v1alpha3Version,
   139  		highestSupportedVersion: highestSupportedVersion,
   140  		clientTimeout:           timeout,
   141  	}
   142  
   143  	// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
   144  	// all other DRA components will be able to get the actual socket of DRA plugins by its name.
   145  	// By default we assume the supported plugin version is v1alpha3
   146  	draPlugins.add(pluginName, pluginInstance)
   147  	h.controller.addPlugin(pluginName, pluginInstance)
   148  
   149  	return nil
   150  }
   151  
   152  func (h *RegistrationHandler) validateVersions(
   153  	callerName string,
   154  	pluginName string,
   155  	versions []string,
   156  ) (*utilversion.Version, error) {
   157  	if len(versions) == 0 {
   158  		return nil, errors.New(
   159  			log(
   160  				"%s for DRA plugin %q failed. Plugin returned an empty list for supported versions",
   161  				callerName,
   162  				pluginName,
   163  			),
   164  		)
   165  	}
   166  
   167  	// Validate version
   168  	newPluginHighestVersion, err := utilversion.HighestSupportedVersion(versions)
   169  	if err != nil {
   170  		return nil, errors.New(
   171  			log(
   172  				"%s for DRA plugin %q failed. None of the versions specified %q are supported. err=%v",
   173  				callerName,
   174  				pluginName,
   175  				versions,
   176  				err,
   177  			),
   178  		)
   179  	}
   180  
   181  	existingPlugin := draPlugins.get(pluginName)
   182  	if existingPlugin == nil {
   183  		return newPluginHighestVersion, nil
   184  	}
   185  	if existingPlugin.highestSupportedVersion.LessThan(newPluginHighestVersion) {
   186  		return newPluginHighestVersion, nil
   187  	}
   188  	return nil, errors.New(
   189  		log(
   190  			"%s for DRA plugin %q failed. Another plugin with the same name is already registered with a higher supported version: %q",
   191  			callerName,
   192  			pluginName,
   193  			existingPlugin.highestSupportedVersion,
   194  		),
   195  	)
   196  }
   197  
   198  func deregisterPlugin(pluginName string) {
   199  	draPlugins.delete(pluginName)
   200  }
   201  
   202  // DeRegisterPlugin is called when a plugin has removed its socket,
   203  // signaling it is no longer available.
   204  func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
   205  	klog.InfoS("DeRegister DRA plugin", "name", pluginName)
   206  	deregisterPlugin(pluginName)
   207  	h.controller.removePlugin(pluginName)
   208  }
   209  
   210  // ValidatePlugin is called by kubelet's plugin watcher upon detection
   211  // of a new registration socket opened by DRA plugin.
   212  func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
   213  	klog.InfoS("Validate DRA plugin", "name", pluginName, "endpoint", endpoint, "versions", strings.Join(versions, ","))
   214  
   215  	_, err := h.validateVersions("ValidatePlugin", pluginName, versions)
   216  	if err != nil {
   217  		return fmt.Errorf("validation failed for DRA plugin %s at endpoint %s: %+v", pluginName, endpoint, err)
   218  	}
   219  
   220  	return err
   221  }
   222  
   223  // log prepends log string with `kubernetes.io/dra`.
   224  func log(msg string, parts ...interface{}) string {
   225  	return fmt.Sprintf(fmt.Sprintf("%s: %s", DRAPluginName, msg), parts...)
   226  }
   227  

View as plain text