...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/endpoint_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/devicemanager

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package devicemanager
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"path/filepath"
    23  	"sync"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/stretchr/testify/require"
    28  
    29  	pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
    30  	plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
    31  )
    32  
    33  // monitorCallback is the function called when a device's health state changes,
    34  // or new devices are reported, or old devices are deleted.
    35  // Updated contains the most recent state of the Device.
    36  type monitorCallback func(resourceName string, devices []pluginapi.Device)
    37  
    38  func newMockPluginManager() *mockPluginManager {
    39  	return &mockPluginManager{
    40  		func(string) error { return nil },
    41  		func(string, plugin.DevicePlugin) error { return nil },
    42  		func(string) {},
    43  		func(string, *pluginapi.ListAndWatchResponse) {},
    44  	}
    45  }
    46  
    47  type mockPluginManager struct {
    48  	cleanupPluginDirectory     func(string) error
    49  	pluginConnected            func(string, plugin.DevicePlugin) error
    50  	pluginDisconnected         func(string)
    51  	pluginListAndWatchReceiver func(string, *pluginapi.ListAndWatchResponse)
    52  }
    53  
    54  func (m *mockPluginManager) CleanupPluginDirectory(r string) error {
    55  	return m.cleanupPluginDirectory(r)
    56  }
    57  
    58  func (m *mockPluginManager) PluginConnected(r string, p plugin.DevicePlugin) error {
    59  	return m.pluginConnected(r, p)
    60  }
    61  
    62  func (m *mockPluginManager) PluginDisconnected(r string) {
    63  	m.pluginDisconnected(r)
    64  }
    65  
    66  func (m *mockPluginManager) PluginListAndWatchReceiver(r string, lr *pluginapi.ListAndWatchResponse) {
    67  	m.pluginListAndWatchReceiver(r, lr)
    68  }
    69  
    70  func esocketName() string {
    71  	return fmt.Sprintf("mock%d.sock", time.Now().UnixNano())
    72  }
    73  
    74  func TestNewEndpoint(t *testing.T) {
    75  	socket := filepath.Join(os.TempDir(), esocketName())
    76  
    77  	devs := []*pluginapi.Device{
    78  		{ID: "ADeviceId", Health: pluginapi.Healthy},
    79  	}
    80  
    81  	p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {})
    82  	defer ecleanup(t, p, e)
    83  }
    84  
    85  func TestRun(t *testing.T) {
    86  	socket := filepath.Join(os.TempDir(), esocketName())
    87  
    88  	devs := []*pluginapi.Device{
    89  		{ID: "ADeviceId", Health: pluginapi.Healthy},
    90  		{ID: "AnotherDeviceId", Health: pluginapi.Healthy},
    91  		{ID: "AThirdDeviceId", Health: pluginapi.Unhealthy},
    92  	}
    93  
    94  	updated := []*pluginapi.Device{
    95  		{ID: "ADeviceId", Health: pluginapi.Unhealthy},
    96  		{ID: "AThirdDeviceId", Health: pluginapi.Healthy},
    97  		{ID: "AFourthDeviceId", Health: pluginapi.Healthy},
    98  	}
    99  
   100  	callbackCount := 0
   101  	callbackChan := make(chan int)
   102  	callback := func(n string, devices []pluginapi.Device) {
   103  		// Should be called twice:
   104  		// one for plugin registration, one for plugin update.
   105  		if callbackCount > 2 {
   106  			t.FailNow()
   107  		}
   108  
   109  		// Check plugin registration
   110  		if callbackCount == 0 {
   111  			require.Len(t, devices, 3)
   112  			require.Equal(t, devices[0].ID, devs[0].ID)
   113  			require.Equal(t, devices[1].ID, devs[1].ID)
   114  			require.Equal(t, devices[2].ID, devs[2].ID)
   115  			require.Equal(t, devices[0].Health, devs[0].Health)
   116  			require.Equal(t, devices[1].Health, devs[1].Health)
   117  			require.Equal(t, devices[2].Health, devs[2].Health)
   118  		}
   119  
   120  		// Check plugin update
   121  		if callbackCount == 1 {
   122  			require.Len(t, devices, 3)
   123  			require.Equal(t, devices[0].ID, updated[0].ID)
   124  			require.Equal(t, devices[1].ID, updated[1].ID)
   125  			require.Equal(t, devices[2].ID, updated[2].ID)
   126  			require.Equal(t, devices[0].Health, updated[0].Health)
   127  			require.Equal(t, devices[1].Health, updated[1].Health)
   128  			require.Equal(t, devices[2].Health, updated[2].Health)
   129  		}
   130  
   131  		callbackCount++
   132  		callbackChan <- callbackCount
   133  	}
   134  
   135  	p, e := esetup(t, devs, socket, "mock", callback)
   136  	defer ecleanup(t, p, e)
   137  
   138  	go e.client.Run()
   139  	// Wait for the first callback to be issued.
   140  	<-callbackChan
   141  
   142  	p.Update(updated)
   143  
   144  	// Wait for the second callback to be issued.
   145  	<-callbackChan
   146  
   147  	require.Equal(t, callbackCount, 2)
   148  }
   149  
   150  func TestAllocate(t *testing.T) {
   151  	socket := filepath.Join(os.TempDir(), esocketName())
   152  	devs := []*pluginapi.Device{
   153  		{ID: "ADeviceId", Health: pluginapi.Healthy},
   154  	}
   155  	callbackCount := 0
   156  	callbackChan := make(chan int)
   157  	p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {
   158  		callbackCount++
   159  		callbackChan <- callbackCount
   160  	})
   161  	defer ecleanup(t, p, e)
   162  
   163  	resp := new(pluginapi.AllocateResponse)
   164  	contResp := new(pluginapi.ContainerAllocateResponse)
   165  	contResp.Devices = append(contResp.Devices, &pluginapi.DeviceSpec{
   166  		ContainerPath: "/dev/aaa",
   167  		HostPath:      "/dev/aaa",
   168  		Permissions:   "mrw",
   169  	})
   170  
   171  	contResp.Devices = append(contResp.Devices, &pluginapi.DeviceSpec{
   172  		ContainerPath: "/dev/bbb",
   173  		HostPath:      "/dev/bbb",
   174  		Permissions:   "mrw",
   175  	})
   176  
   177  	contResp.Mounts = append(contResp.Mounts, &pluginapi.Mount{
   178  		ContainerPath: "/container_dir1/file1",
   179  		HostPath:      "host_dir1/file1",
   180  		ReadOnly:      true,
   181  	})
   182  
   183  	resp.ContainerResponses = append(resp.ContainerResponses, contResp)
   184  
   185  	p.SetAllocFunc(func(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) {
   186  		return resp, nil
   187  	})
   188  
   189  	go e.client.Run()
   190  	// Wait for the callback to be issued.
   191  	select {
   192  	case <-callbackChan:
   193  		break
   194  	case <-time.After(time.Second):
   195  		t.FailNow()
   196  	}
   197  
   198  	respOut, err := e.allocate([]string{"ADeviceId"})
   199  	require.NoError(t, err)
   200  	require.Equal(t, resp, respOut)
   201  }
   202  
   203  func TestGetPreferredAllocation(t *testing.T) {
   204  	socket := filepath.Join(os.TempDir(), esocketName())
   205  	callbackCount := 0
   206  	callbackChan := make(chan int)
   207  	p, e := esetup(t, []*pluginapi.Device{}, socket, "mock", func(n string, d []pluginapi.Device) {
   208  		callbackCount++
   209  		callbackChan <- callbackCount
   210  	})
   211  	defer ecleanup(t, p, e)
   212  
   213  	resp := &pluginapi.PreferredAllocationResponse{
   214  		ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
   215  			{DeviceIDs: []string{"device0", "device1", "device2"}},
   216  		},
   217  	}
   218  
   219  	p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
   220  		return resp, nil
   221  	})
   222  
   223  	go e.client.Run()
   224  	// Wait for the callback to be issued.
   225  	select {
   226  	case <-callbackChan:
   227  		break
   228  	case <-time.After(time.Second):
   229  		t.FailNow()
   230  	}
   231  
   232  	respOut, err := e.getPreferredAllocation([]string{}, []string{}, -1)
   233  	require.NoError(t, err)
   234  	require.Equal(t, resp, respOut)
   235  }
   236  
   237  func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*plugin.Stub, *endpointImpl) {
   238  	m := newMockPluginManager()
   239  
   240  	m.pluginListAndWatchReceiver = func(r string, resp *pluginapi.ListAndWatchResponse) {
   241  		var newDevs []pluginapi.Device
   242  		for _, d := range resp.Devices {
   243  			newDevs = append(newDevs, *d)
   244  		}
   245  		callback(resourceName, newDevs)
   246  	}
   247  
   248  	var dp plugin.DevicePlugin
   249  	var wg sync.WaitGroup
   250  	wg.Add(1)
   251  	m.pluginConnected = func(r string, c plugin.DevicePlugin) error {
   252  		dp = c
   253  		wg.Done()
   254  		return nil
   255  	}
   256  
   257  	p := plugin.NewDevicePluginStub(devs, socket, resourceName, false, false)
   258  	err := p.Start()
   259  	require.NoError(t, err)
   260  
   261  	c := plugin.NewPluginClient(resourceName, socket, m)
   262  	err = c.Connect()
   263  	require.NoError(t, err)
   264  
   265  	wg.Wait()
   266  
   267  	e := newEndpointImpl(dp)
   268  	e.client = c
   269  
   270  	m.pluginDisconnected = func(r string) {
   271  		e.setStopTime(time.Now())
   272  	}
   273  
   274  	return p, e
   275  }
   276  
   277  func ecleanup(t *testing.T, p *plugin.Stub, e *endpointImpl) {
   278  	p.Stop()
   279  	e.client.Disconnect()
   280  }
   281  

View as plain text