...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin/client_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package plugin
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"os"
    24  	"path/filepath"
    25  	"sync"
    26  	"testing"
    27  
    28  	"github.com/stretchr/testify/assert"
    29  	"google.golang.org/grpc"
    30  	drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
    31  	drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
    32  )
    33  
    34  type fakeV1alpha3GRPCServer struct {
    35  	drapbv1alpha3.UnimplementedNodeServer
    36  }
    37  
    38  func (f *fakeV1alpha3GRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1alpha3.NodePrepareResourcesRequest) (*drapbv1alpha3.NodePrepareResourcesResponse, error) {
    39  	return &drapbv1alpha3.NodePrepareResourcesResponse{Claims: map[string]*drapbv1alpha3.NodePrepareResourceResponse{"dummy": {CDIDevices: []string{"dummy"}}}}, nil
    40  }
    41  
    42  func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1alpha3.NodeUnprepareResourcesRequest) (*drapbv1alpha3.NodeUnprepareResourcesResponse, error) {
    43  	return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil
    44  }
    45  
    46  func (f *fakeV1alpha3GRPCServer) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, srv drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
    47  	if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
    48  		return err
    49  	}
    50  	if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
    51  		return err
    52  	}
    53  	return nil
    54  }
    55  
    56  type fakeV1alpha2GRPCServer struct {
    57  	drapbv1alpha2.UnimplementedNodeServer
    58  }
    59  
    60  func (f *fakeV1alpha2GRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1alpha2.NodePrepareResourceRequest) (*drapbv1alpha2.NodePrepareResourceResponse, error) {
    61  	return &drapbv1alpha2.NodePrepareResourceResponse{CdiDevices: []string{"dummy"}}, nil
    62  }
    63  
    64  func (f *fakeV1alpha2GRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1alpha2.NodeUnprepareResourceRequest) (*drapbv1alpha2.NodeUnprepareResourceResponse, error) {
    65  	return &drapbv1alpha2.NodeUnprepareResourceResponse{}, nil
    66  }
    67  
    68  type tearDown func()
    69  
    70  func setupFakeGRPCServer(version string) (string, tearDown, error) {
    71  	p, err := os.MkdirTemp("", "dra_plugin")
    72  	if err != nil {
    73  		return "", nil, err
    74  	}
    75  
    76  	closeCh := make(chan struct{})
    77  	addr := filepath.Join(p, "server.sock")
    78  	teardown := func() {
    79  		close(closeCh)
    80  		os.RemoveAll(addr)
    81  	}
    82  
    83  	listener, err := net.Listen("unix", addr)
    84  	if err != nil {
    85  		teardown()
    86  		return "", nil, err
    87  	}
    88  
    89  	s := grpc.NewServer()
    90  	switch version {
    91  	case v1alpha2Version:
    92  		fakeGRPCServer := &fakeV1alpha2GRPCServer{}
    93  		drapbv1alpha2.RegisterNodeServer(s, fakeGRPCServer)
    94  	case v1alpha3Version:
    95  		fakeGRPCServer := &fakeV1alpha3GRPCServer{}
    96  		drapbv1alpha3.RegisterNodeServer(s, fakeGRPCServer)
    97  	default:
    98  		return "", nil, fmt.Errorf("unsupported version: %s", version)
    99  	}
   100  
   101  	go func() {
   102  		go s.Serve(listener)
   103  		<-closeCh
   104  		s.GracefulStop()
   105  	}()
   106  
   107  	return addr, teardown, nil
   108  }
   109  
   110  func TestGRPCConnIsReused(t *testing.T) {
   111  	addr, teardown, err := setupFakeGRPCServer(v1alpha3Version)
   112  	if err != nil {
   113  		t.Fatal(err)
   114  	}
   115  	defer teardown()
   116  
   117  	reusedConns := make(map[*grpc.ClientConn]int)
   118  	wg := sync.WaitGroup{}
   119  	m := sync.Mutex{}
   120  
   121  	p := &plugin{
   122  		endpoint: addr,
   123  		version:  v1alpha3Version,
   124  	}
   125  
   126  	conn, err := p.getOrCreateGRPCConn()
   127  	defer func() {
   128  		err := conn.Close()
   129  		if err != nil {
   130  			t.Error(err)
   131  		}
   132  	}()
   133  	if err != nil {
   134  		t.Fatal(err)
   135  	}
   136  
   137  	// ensure the plugin we are using is registered
   138  	draPlugins.add("dummy-plugin", p)
   139  	defer draPlugins.delete("dummy-plugin")
   140  
   141  	// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
   142  	for i := 0; i < 2; i++ {
   143  		wg.Add(1)
   144  		go func() {
   145  			defer wg.Done()
   146  			client, err := NewDRAPluginClient("dummy-plugin")
   147  			if err != nil {
   148  				t.Error(err)
   149  				return
   150  			}
   151  
   152  			req := &drapbv1alpha3.NodePrepareResourcesRequest{
   153  				Claims: []*drapbv1alpha3.Claim{
   154  					{
   155  						Namespace:      "dummy-namespace",
   156  						Uid:            "dummy-uid",
   157  						Name:           "dummy-claim",
   158  						ResourceHandle: "dummy-resource",
   159  					},
   160  				},
   161  			}
   162  			client.NodePrepareResources(context.TODO(), req)
   163  
   164  			client.(*plugin).Lock()
   165  			conn := client.(*plugin).conn
   166  			client.(*plugin).Unlock()
   167  
   168  			m.Lock()
   169  			defer m.Unlock()
   170  			reusedConns[conn]++
   171  		}()
   172  	}
   173  
   174  	wg.Wait()
   175  	// We should have only one entry otherwise it means another gRPC connection has been created
   176  	if len(reusedConns) != 1 {
   177  		t.Errorf("expected length to be 1 but got %d", len(reusedConns))
   178  	}
   179  	if counter, ok := reusedConns[conn]; ok && counter != 2 {
   180  		t.Errorf("expected counter to be 2 but got %d", counter)
   181  	}
   182  }
   183  
   184  func TestNewDRAPluginClient(t *testing.T) {
   185  	for _, test := range []struct {
   186  		description string
   187  		setup       func(string) tearDown
   188  		pluginName  string
   189  		shouldError bool
   190  	}{
   191  		{
   192  			description: "plugin name is empty",
   193  			setup: func(_ string) tearDown {
   194  				return func() {}
   195  			},
   196  			pluginName:  "",
   197  			shouldError: true,
   198  		},
   199  		{
   200  			description: "plugin name not found in the list",
   201  			setup: func(_ string) tearDown {
   202  				return func() {}
   203  			},
   204  			pluginName:  "plugin-name-not-found-in-the-list",
   205  			shouldError: true,
   206  		},
   207  		{
   208  			description: "plugin exists",
   209  			setup: func(name string) tearDown {
   210  				draPlugins.add(name, &plugin{})
   211  				return func() {
   212  					draPlugins.delete(name)
   213  				}
   214  			},
   215  			pluginName: "dummy-plugin",
   216  		},
   217  	} {
   218  		t.Run(test.description, func(t *testing.T) {
   219  			teardown := test.setup(test.pluginName)
   220  			defer teardown()
   221  
   222  			client, err := NewDRAPluginClient(test.pluginName)
   223  			if test.shouldError {
   224  				assert.Nil(t, client)
   225  				assert.Error(t, err)
   226  			} else {
   227  				assert.NotNil(t, client)
   228  				assert.Nil(t, err)
   229  			}
   230  		})
   231  	}
   232  }
   233  
   234  func TestNodeUnprepareResource(t *testing.T) {
   235  	for _, test := range []struct {
   236  		description   string
   237  		serverSetup   func(string) (string, tearDown, error)
   238  		serverVersion string
   239  		request       *drapbv1alpha3.NodeUnprepareResourcesRequest
   240  	}{
   241  		{
   242  			description:   "server supports v1alpha3",
   243  			serverSetup:   setupFakeGRPCServer,
   244  			serverVersion: v1alpha3Version,
   245  			request:       &drapbv1alpha3.NodeUnprepareResourcesRequest{},
   246  		},
   247  		{
   248  			description:   "server supports v1alpha2, plugin client should fallback",
   249  			serverSetup:   setupFakeGRPCServer,
   250  			serverVersion: v1alpha2Version,
   251  			request: &drapbv1alpha3.NodeUnprepareResourcesRequest{
   252  				Claims: []*drapbv1alpha3.Claim{
   253  					{
   254  						Namespace:      "dummy-namespace",
   255  						Uid:            "dummy-uid",
   256  						Name:           "dummy-claim",
   257  						ResourceHandle: "dummy-resource",
   258  					},
   259  				},
   260  			},
   261  		},
   262  	} {
   263  		t.Run(test.description, func(t *testing.T) {
   264  			addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
   265  			if err != nil {
   266  				t.Fatal(err)
   267  			}
   268  			defer teardown()
   269  
   270  			p := &plugin{
   271  				endpoint:      addr,
   272  				version:       v1alpha3Version,
   273  				clientTimeout: PluginClientTimeout,
   274  			}
   275  
   276  			conn, err := p.getOrCreateGRPCConn()
   277  			defer func() {
   278  				err := conn.Close()
   279  				if err != nil {
   280  					t.Error(err)
   281  				}
   282  			}()
   283  			if err != nil {
   284  				t.Fatal(err)
   285  			}
   286  
   287  			draPlugins.add("dummy-plugin", p)
   288  			defer draPlugins.delete("dummy-plugin")
   289  
   290  			client, err := NewDRAPluginClient("dummy-plugin")
   291  			if err != nil {
   292  				t.Fatal(err)
   293  			}
   294  
   295  			_, err = client.NodeUnprepareResources(context.TODO(), test.request)
   296  			if err != nil {
   297  				t.Fatal(err)
   298  			}
   299  		})
   300  	}
   301  }
   302  
   303  func TestListAndWatchResources(t *testing.T) {
   304  	for _, test := range []struct {
   305  		description   string
   306  		serverSetup   func(string) (string, tearDown, error)
   307  		serverVersion string
   308  		request       *drapbv1alpha3.NodeListAndWatchResourcesRequest
   309  		responses     []*drapbv1alpha3.NodeListAndWatchResourcesResponse
   310  		expectError   string
   311  	}{
   312  		{
   313  			description:   "server supports NodeResources API",
   314  			serverSetup:   setupFakeGRPCServer,
   315  			serverVersion: v1alpha3Version,
   316  			request:       &drapbv1alpha3.NodeListAndWatchResourcesRequest{},
   317  			responses: []*drapbv1alpha3.NodeListAndWatchResourcesResponse{
   318  				{},
   319  				{},
   320  			},
   321  			expectError: "EOF",
   322  		},
   323  		{
   324  			description:   "server doesn't support NodeResources API",
   325  			serverSetup:   setupFakeGRPCServer,
   326  			serverVersion: v1alpha2Version,
   327  			request:       new(drapbv1alpha3.NodeListAndWatchResourcesRequest),
   328  			expectError:   "Unimplemented",
   329  		},
   330  	} {
   331  		t.Run(test.description, func(t *testing.T) {
   332  			addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
   333  			if err != nil {
   334  				t.Fatal(err)
   335  			}
   336  			defer teardown()
   337  
   338  			p := &plugin{
   339  				endpoint: addr,
   340  				version:  v1alpha3Version,
   341  			}
   342  
   343  			conn, err := p.getOrCreateGRPCConn()
   344  			defer func() {
   345  				err := conn.Close()
   346  				if err != nil {
   347  					t.Error(err)
   348  				}
   349  			}()
   350  			if err != nil {
   351  				t.Fatal(err)
   352  			}
   353  
   354  			draPlugins.add("dummy-plugin", p)
   355  			defer draPlugins.delete("dummy-plugin")
   356  
   357  			client, err := NewDRAPluginClient("dummy-plugin")
   358  			if err != nil {
   359  				t.Fatal(err)
   360  			}
   361  
   362  			stream, err := client.NodeListAndWatchResources(context.Background(), test.request)
   363  			if err != nil {
   364  				t.Fatal(err)
   365  			}
   366  			var actualResponses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
   367  			var actualErr error
   368  			for {
   369  				resp, err := stream.Recv()
   370  				if err != nil {
   371  					actualErr = err
   372  					break
   373  				}
   374  				actualResponses = append(actualResponses, resp)
   375  			}
   376  			assert.Equal(t, test.responses, actualResponses)
   377  			assert.Contains(t, actualErr.Error(), test.expectError)
   378  		})
   379  	}
   380  }
   381  

View as plain text