...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/plugin_manager_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pluginmanager
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"path/filepath"
    23  	"reflect"
    24  	"strconv"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/stretchr/testify/require"
    30  	"k8s.io/apimachinery/pkg/util/sets"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	"k8s.io/client-go/tools/record"
    33  	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
    34  
    35  	"k8s.io/kubernetes/pkg/kubelet/config"
    36  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
    37  )
    38  
    39  var (
    40  	socketDir         string
    41  	supportedVersions = []string{"v1beta1", "v1beta2"}
    42  )
    43  
    44  type fakePluginHandler struct {
    45  	events []string
    46  	sync.RWMutex
    47  }
    48  
    49  func newFakePluginHandler() *fakePluginHandler {
    50  	return &fakePluginHandler{}
    51  }
    52  
    53  // ValidatePlugin is a fake method
    54  func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
    55  	f.Lock()
    56  	defer f.Unlock()
    57  	f.events = append(f.events, "validate "+pluginName)
    58  	return nil
    59  }
    60  
    61  // RegisterPlugin is a fake method
    62  func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
    63  	f.Lock()
    64  	defer f.Unlock()
    65  	f.events = append(f.events, "register "+pluginName)
    66  	return nil
    67  }
    68  
    69  // DeRegisterPlugin is a fake method
    70  func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
    71  	f.Lock()
    72  	defer f.Unlock()
    73  	f.events = append(f.events, "deregister "+pluginName)
    74  }
    75  
    76  func (f *fakePluginHandler) Reset() {
    77  	f.Lock()
    78  	defer f.Unlock()
    79  	f.events = nil
    80  }
    81  
    82  func init() {
    83  	d, err := os.MkdirTemp("", "plugin_manager_test")
    84  	if err != nil {
    85  		panic(fmt.Sprintf("Could not create a temp directory: %s", d))
    86  	}
    87  
    88  	socketDir = d
    89  }
    90  
    91  func cleanup(t *testing.T) {
    92  	require.NoError(t, os.RemoveAll(socketDir))
    93  	os.MkdirAll(socketDir, 0755)
    94  }
    95  
    96  func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
    97  	expected := []string{"validate " + pluginName, "register " + pluginName}
    98  	err := retryWithExponentialBackOff(
    99  		100*time.Millisecond,
   100  		func() (bool, error) {
   101  			fakePluginHandler.Lock()
   102  			defer fakePluginHandler.Unlock()
   103  			if reflect.DeepEqual(fakePluginHandler.events, expected) {
   104  				return true, nil
   105  			}
   106  			t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events)
   107  			return false, nil
   108  		},
   109  	)
   110  	if err != nil {
   111  		t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
   112  	}
   113  }
   114  
   115  func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
   116  	backoff := wait.Backoff{
   117  		Duration: initialDuration,
   118  		Factor:   3,
   119  		Jitter:   0,
   120  		Steps:    6,
   121  	}
   122  	return wait.ExponentialBackoff(backoff, fn)
   123  }
   124  
   125  func TestPluginRegistration(t *testing.T) {
   126  	defer cleanup(t)
   127  
   128  	pluginManager := newTestPluginManager(socketDir)
   129  
   130  	// Start the plugin manager
   131  	stopChan := make(chan struct{})
   132  	defer close(stopChan)
   133  	go func() {
   134  		sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
   135  		pluginManager.Run(sourcesReady, stopChan)
   136  	}()
   137  
   138  	// Add handler for device plugin
   139  	fakeHandler := newFakePluginHandler()
   140  	pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
   141  
   142  	const maxDepth = 3
   143  	// Make sure the plugin manager is aware of the socket in subdirectories
   144  	for i := 0; i < maxDepth; i++ {
   145  		fakeHandler.Reset()
   146  		pluginDir := socketDir
   147  
   148  		for j := 0; j < i; j++ {
   149  			pluginDir = filepath.Join(pluginDir, strconv.Itoa(j))
   150  		}
   151  		require.NoError(t, os.MkdirAll(pluginDir, os.ModePerm))
   152  		socketPath := filepath.Join(pluginDir, fmt.Sprintf("plugin-%d.sock", i))
   153  
   154  		// Add a new plugin
   155  		pluginName := fmt.Sprintf("example-plugin-%d", i)
   156  		p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   157  		require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   158  
   159  		// Verify that the plugin is registered
   160  		waitForRegistration(t, fakeHandler, pluginName)
   161  	}
   162  }
   163  
   164  func newTestPluginManager(sockDir string) PluginManager {
   165  	pm := NewPluginManager(
   166  		sockDir,
   167  		&record.FakeRecorder{},
   168  	)
   169  	return pm
   170  }
   171  

View as plain text