...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pluginwatcher
    18  
    19  import (
    20  	"flag"
    21  	"fmt"
    22  	"os"
    23  	"path/filepath"
    24  	"sync"
    25  	"testing"
    26  	"time"
    27  
    28  	"github.com/stretchr/testify/require"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/klog/v2"
    31  	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
    32  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    33  )
    34  
    35  var (
    36  	supportedVersions = []string{"v1beta1", "v1beta2"}
    37  )
    38  
    39  func init() {
    40  	var logLevel string
    41  
    42  	klog.InitFlags(flag.CommandLine)
    43  	flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
    44  	flag.StringVar(&logLevel, "logLevel", "6", "test")
    45  	flag.Lookup("v").Value.Set(logLevel)
    46  }
    47  
    48  func initTempDir(t *testing.T) string {
    49  	// Creating a different directory. os.RemoveAll is not atomic enough;
    50  	// os.MkdirAll can get into an "Access Denied" error on Windows.
    51  	d, err := os.MkdirTemp("", "plugin_test")
    52  	if err != nil {
    53  		t.Fatalf("Could not create a temp directory %s: %v", d, err)
    54  	}
    55  
    56  	return d
    57  }
    58  
    59  func waitForRegistration(
    60  	t *testing.T,
    61  	socketPath string,
    62  	dsw cache.DesiredStateOfWorld) {
    63  	err := retryWithExponentialBackOff(
    64  		time.Duration(500*time.Millisecond),
    65  		func() (bool, error) {
    66  			if dsw.PluginExists(socketPath) {
    67  				return true, nil
    68  			}
    69  			return false, nil
    70  		},
    71  	)
    72  	if err != nil {
    73  		t.Fatalf("Timed out waiting for plugin to be added to desired state of world cache:\n%s.", socketPath)
    74  	}
    75  }
    76  
    77  func waitForUnregistration(
    78  	t *testing.T,
    79  	socketPath string,
    80  	dsw cache.DesiredStateOfWorld) {
    81  	err := retryWithExponentialBackOff(
    82  		time.Duration(500*time.Millisecond),
    83  		func() (bool, error) {
    84  			if !dsw.PluginExists(socketPath) {
    85  				return true, nil
    86  			}
    87  			return false, nil
    88  		},
    89  	)
    90  
    91  	if err != nil {
    92  		t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
    93  	}
    94  }
    95  
    96  func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
    97  	backoff := wait.Backoff{
    98  		Duration: initialDuration,
    99  		Factor:   3,
   100  		Jitter:   0,
   101  		Steps:    6,
   102  	}
   103  	return wait.ExponentialBackoff(backoff, fn)
   104  }
   105  
   106  func TestPluginRegistration(t *testing.T) {
   107  	socketDir := initTempDir(t)
   108  	defer os.RemoveAll(socketDir)
   109  
   110  	dsw := cache.NewDesiredStateOfWorld()
   111  	newWatcher(t, socketDir, dsw, wait.NeverStop)
   112  
   113  	for i := 0; i < 10; i++ {
   114  		socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i))
   115  		pluginName := fmt.Sprintf("example-plugin-%d", i)
   116  
   117  		p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   118  		require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   119  
   120  		pluginInfo := GetPluginInfo(p)
   121  		waitForRegistration(t, pluginInfo.SocketPath, dsw)
   122  
   123  		// Check the desired state for plugins
   124  		dswPlugins := dsw.GetPluginsToRegister()
   125  		if len(dswPlugins) != 1 {
   126  			t.Fatalf("TestPluginRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
   127  		}
   128  
   129  		// Stop the plugin; the plugin should be removed from the desired state of world cache
   130  		require.NoError(t, p.Stop())
   131  		// The following doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
   132  		waitForUnregistration(t, pluginInfo.SocketPath, dsw)
   133  		dswPlugins = dsw.GetPluginsToRegister()
   134  		if len(dswPlugins) != 0 {
   135  			t.Fatalf("TestPluginRegistration: desired state of world length should be 0 but it's %d", len(dswPlugins))
   136  		}
   137  	}
   138  }
   139  
   140  func TestPluginRegistrationSameName(t *testing.T) {
   141  	socketDir := initTempDir(t)
   142  	defer os.RemoveAll(socketDir)
   143  
   144  	dsw := cache.NewDesiredStateOfWorld()
   145  	newWatcher(t, socketDir, dsw, wait.NeverStop)
   146  
   147  	// Make 10 plugins with the same name and same type but different socket path;
   148  	// all 10 should be in desired state of world cache
   149  	pluginName := "dep-example-plugin"
   150  	for i := 0; i < 10; i++ {
   151  		socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i))
   152  		p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   153  		require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   154  
   155  		pluginInfo := GetPluginInfo(p)
   156  		waitForRegistration(t, pluginInfo.SocketPath, dsw)
   157  
   158  		// Check the desired state for plugins
   159  		dswPlugins := dsw.GetPluginsToRegister()
   160  		if len(dswPlugins) != i+1 {
   161  			t.Fatalf("TestPluginRegistrationSameName: desired state of world length should be %d but it's %d", i+1, len(dswPlugins))
   162  		}
   163  	}
   164  }
   165  
   166  func TestPluginReRegistration(t *testing.T) {
   167  	socketDir := initTempDir(t)
   168  	defer os.RemoveAll(socketDir)
   169  
   170  	dsw := cache.NewDesiredStateOfWorld()
   171  	newWatcher(t, socketDir, dsw, wait.NeverStop)
   172  
   173  	// Create a plugin first, we are then going to remove the plugin, update the plugin with a different name
   174  	// and recreate it.
   175  	socketPath := filepath.Join(socketDir, "plugin-reregistration.sock")
   176  	pluginName := "reregister-plugin"
   177  	p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   178  	require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   179  	pluginInfo := GetPluginInfo(p)
   180  	lastTimestamp := time.Now()
   181  	waitForRegistration(t, pluginInfo.SocketPath, dsw)
   182  
   183  	// Remove this plugin, then recreate it again with a different name for 10 times
   184  	// The updated plugin should be in the desired state of world cache
   185  	for i := 0; i < 10; i++ {
   186  		// Stop the plugin; the plugin should be removed from the desired state of world cache
   187  		// The plugin removal doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
   188  		require.NoError(t, p.Stop())
   189  		waitForUnregistration(t, pluginInfo.SocketPath, dsw)
   190  
   191  		// Add the plugin again
   192  		pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
   193  		p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   194  		require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   195  		waitForRegistration(t, pluginInfo.SocketPath, dsw)
   196  
   197  		// Check the dsw cache. The updated plugin should be the only plugin in it
   198  		dswPlugins := dsw.GetPluginsToRegister()
   199  		if len(dswPlugins) != 1 {
   200  			t.Fatalf("TestPluginReRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
   201  		}
   202  		if !dswPlugins[0].Timestamp.After(lastTimestamp) {
   203  			t.Fatalf("TestPluginReRegistration: for plugin %s timestamp of plugin is not updated", pluginName)
   204  		}
   205  		lastTimestamp = dswPlugins[0].Timestamp
   206  	}
   207  }
   208  
   209  func TestPluginRegistrationAtKubeletStart(t *testing.T) {
   210  	socketDir := initTempDir(t)
   211  	defer os.RemoveAll(socketDir)
   212  
   213  	plugins := make([]*examplePlugin, 10)
   214  
   215  	for i := 0; i < len(plugins); i++ {
   216  		socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i))
   217  		pluginName := fmt.Sprintf("example-plugin-%d", i)
   218  
   219  		p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   220  		require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   221  		defer func(p *examplePlugin) {
   222  			require.NoError(t, p.Stop())
   223  		}(p)
   224  
   225  		plugins[i] = p
   226  	}
   227  
   228  	dsw := cache.NewDesiredStateOfWorld()
   229  	newWatcher(t, socketDir, dsw, wait.NeverStop)
   230  
   231  	var wg sync.WaitGroup
   232  	for i := 0; i < len(plugins); i++ {
   233  		wg.Add(1)
   234  		go func(p *examplePlugin) {
   235  			defer wg.Done()
   236  
   237  			pluginInfo := GetPluginInfo(p)
   238  			// Validate that the plugin is in the desired state cache
   239  			waitForRegistration(t, pluginInfo.SocketPath, dsw)
   240  		}(plugins[i])
   241  	}
   242  
   243  	c := make(chan struct{})
   244  	go func() {
   245  		defer close(c)
   246  		wg.Wait()
   247  	}()
   248  
   249  	select {
   250  	case <-c:
   251  		return
   252  	case <-time.After(wait.ForeverTestTimeout):
   253  		t.Fatalf("Timeout while waiting for the plugin registration status")
   254  	}
   255  }
   256  
   257  func newWatcher(t *testing.T, socketDir string, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
   258  	w := NewWatcher(socketDir, desiredStateOfWorldCache)
   259  	require.NoError(t, w.Start(stopCh))
   260  
   261  	return w
   262  }
   263  

View as plain text