...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin/client.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package plugin
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"google.golang.org/grpc"
    25  	grpccodes "google.golang.org/grpc/codes"
    26  	grpcstatus "google.golang.org/grpc/status"
    27  
    28  	"k8s.io/klog/v2"
    29  	drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
    30  	drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
    31  )
    32  
    33  const PluginClientTimeout = 45 * time.Second
    34  
    35  type (
    36  	nodeResourceManager interface {
    37  		Prepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error)
    38  		Unprepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error)
    39  	}
    40  
    41  	v1alpha2NodeResourceManager struct{}
    42  	v1alpha3NodeResourceManager struct{}
    43  )
    44  
    45  var nodeResourceManagers = map[string]nodeResourceManager{
    46  	v1alpha2Version: v1alpha2NodeResourceManager{},
    47  	v1alpha3Version: v1alpha3NodeResourceManager{},
    48  }
    49  
    50  func (v1alpha2rm v1alpha2NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
    51  	nodeClient := drapbv1alpha2.NewNodeClient(conn)
    52  	response := &drapb.NodePrepareResourcesResponse{
    53  		Claims: make(map[string]*drapb.NodePrepareResourceResponse),
    54  	}
    55  
    56  	for _, claim := range req.Claims {
    57  		req := &drapbv1alpha2.NodePrepareResourceRequest{
    58  			Namespace:                claim.Namespace,
    59  			ClaimUid:                 claim.Uid,
    60  			ClaimName:                claim.Name,
    61  			ResourceHandle:           claim.ResourceHandle,
    62  			StructuredResourceHandle: claim.StructuredResourceHandle,
    63  		}
    64  		res, err := nodeClient.NodePrepareResource(ctx, req)
    65  		result := &drapb.NodePrepareResourceResponse{}
    66  		if err != nil {
    67  			result.Error = err.Error()
    68  		} else {
    69  			result.CDIDevices = res.CdiDevices
    70  		}
    71  		response.Claims[claim.Uid] = result
    72  	}
    73  
    74  	return response, nil
    75  }
    76  
    77  func (v1alpha2rm v1alpha2NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
    78  	nodeClient := drapbv1alpha2.NewNodeClient(conn)
    79  	response := &drapb.NodeUnprepareResourcesResponse{
    80  		Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
    81  	}
    82  
    83  	for _, claim := range req.Claims {
    84  		_, err := nodeClient.NodeUnprepareResource(ctx,
    85  			&drapbv1alpha2.NodeUnprepareResourceRequest{
    86  				Namespace:      claim.Namespace,
    87  				ClaimUid:       claim.Uid,
    88  				ClaimName:      claim.Name,
    89  				ResourceHandle: claim.ResourceHandle,
    90  			})
    91  		result := &drapb.NodeUnprepareResourceResponse{}
    92  		if err != nil {
    93  			result.Error = err.Error()
    94  		}
    95  		response.Claims[claim.Uid] = result
    96  	}
    97  
    98  	return response, nil
    99  }
   100  
   101  func (v1alpha3rm v1alpha3NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
   102  	nodeClient := drapb.NewNodeClient(conn)
   103  	response, err := nodeClient.NodePrepareResources(ctx, req)
   104  	if err != nil {
   105  		status, _ := grpcstatus.FromError(err)
   106  		if status.Code() == grpccodes.Unimplemented {
   107  			p.setVersion(v1alpha2Version)
   108  			return nodeResourceManagers[v1alpha2Version].Prepare(ctx, conn, p, req)
   109  		}
   110  		return nil, err
   111  	}
   112  
   113  	return response, nil
   114  }
   115  
   116  func (v1alpha3rm v1alpha3NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
   117  	nodeClient := drapb.NewNodeClient(conn)
   118  	response, err := nodeClient.NodeUnprepareResources(ctx, req)
   119  	if err != nil {
   120  		status, _ := grpcstatus.FromError(err)
   121  		if status.Code() == grpccodes.Unimplemented {
   122  			p.setVersion(v1alpha2Version)
   123  			return nodeResourceManagers[v1alpha2Version].Unprepare(ctx, conn, p, req)
   124  		}
   125  		return nil, err
   126  	}
   127  
   128  	return response, nil
   129  }
   130  
   131  func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
   132  	if pluginName == "" {
   133  		return nil, fmt.Errorf("plugin name is empty")
   134  	}
   135  
   136  	existingPlugin := draPlugins.get(pluginName)
   137  	if existingPlugin == nil {
   138  		return nil, fmt.Errorf("plugin name %s not found in the list of registered DRA plugins", pluginName)
   139  	}
   140  
   141  	return existingPlugin, nil
   142  }
   143  
   144  func (p *plugin) NodePrepareResources(
   145  	ctx context.Context,
   146  	req *drapb.NodePrepareResourcesRequest,
   147  	opts ...grpc.CallOption,
   148  ) (*drapb.NodePrepareResourcesResponse, error) {
   149  	logger := klog.FromContext(ctx)
   150  	logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
   151  
   152  	conn, err := p.getOrCreateGRPCConn()
   153  	if err != nil {
   154  		return nil, err
   155  	}
   156  
   157  	ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
   158  	defer cancel()
   159  
   160  	version := p.getVersion()
   161  	resourceManager, exists := nodeResourceManagers[version]
   162  	if !exists {
   163  		err := fmt.Errorf("unsupported plugin version: %s", version)
   164  		logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", nil, "err", err)
   165  		return nil, err
   166  	}
   167  
   168  	response, err := resourceManager.Prepare(ctx, conn, p, req)
   169  	logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", response, "err", err)
   170  	return response, err
   171  }
   172  
   173  func (p *plugin) NodeUnprepareResources(
   174  	ctx context.Context,
   175  	req *drapb.NodeUnprepareResourcesRequest,
   176  	opts ...grpc.CallOption,
   177  ) (*drapb.NodeUnprepareResourcesResponse, error) {
   178  	logger := klog.FromContext(ctx)
   179  	logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
   180  
   181  	conn, err := p.getOrCreateGRPCConn()
   182  	if err != nil {
   183  		return nil, err
   184  	}
   185  
   186  	ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
   187  	defer cancel()
   188  
   189  	version := p.getVersion()
   190  	resourceManager, exists := nodeResourceManagers[version]
   191  	if !exists {
   192  		err := fmt.Errorf("unsupported plugin version: %s", version)
   193  		logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", nil, "err", err)
   194  		return nil, err
   195  	}
   196  
   197  	response, err := resourceManager.Unprepare(ctx, conn, p, req)
   198  	logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
   199  	return response, err
   200  }
   201  
   202  func (p *plugin) NodeListAndWatchResources(
   203  	ctx context.Context,
   204  	req *drapb.NodeListAndWatchResourcesRequest,
   205  	opts ...grpc.CallOption,
   206  ) (drapb.Node_NodeListAndWatchResourcesClient, error) {
   207  	logger := klog.FromContext(ctx)
   208  	logger.V(4).Info(log("calling NodeListAndWatchResources rpc"), "request", req)
   209  
   210  	conn, err := p.getOrCreateGRPCConn()
   211  	if err != nil {
   212  		return nil, err
   213  	}
   214  
   215  	nodeClient := drapb.NewNodeClient(conn)
   216  	return nodeClient.NodeListAndWatchResources(ctx, req, opts...)
   217  }
   218  

View as plain text