...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package reconciler
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"path/filepath"
    23  	"runtime"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/stretchr/testify/require"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	"k8s.io/client-go/tools/record"
    30  	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
    31  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    32  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
    33  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
    34  )
    35  
    36  const (
    37  	// reconcilerLoopSleepDuration is the amount of time the reconciler loop
    38  	// waits between successive executions
    39  	reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
    40  )
    41  
    42  var (
    43  	socketDir         string
    44  	supportedVersions = []string{"v1beta1", "v1beta2"}
    45  )
    46  
    47  func init() {
    48  	d, err := os.MkdirTemp("", "reconciler_test")
    49  	if err != nil {
    50  		panic(fmt.Sprintf("Could not create a temp directory: %s", d))
    51  	}
    52  	socketDir = d
    53  }
    54  
    55  func cleanup(t *testing.T) {
    56  	require.NoError(t, os.RemoveAll(socketDir))
    57  	os.MkdirAll(socketDir, 0755)
    58  }
    59  
    60  func runReconciler(reconciler Reconciler) {
    61  	go reconciler.Run(wait.NeverStop)
    62  }
    63  
    64  func waitForRegistration(
    65  	t *testing.T,
    66  	socketPath string,
    67  	previousTimestamp time.Time,
    68  	asw cache.ActualStateOfWorld) {
    69  	err := retryWithExponentialBackOff(
    70  		time.Duration(500*time.Millisecond),
    71  		func() (bool, error) {
    72  			registeredPlugins := asw.GetRegisteredPlugins()
    73  			for _, plugin := range registeredPlugins {
    74  				if plugin.SocketPath == socketPath && plugin.Timestamp.After(previousTimestamp) {
    75  					return true, nil
    76  				}
    77  			}
    78  			return false, nil
    79  		},
    80  	)
    81  	if err != nil {
    82  		t.Fatalf("Timed out waiting for plugin to be registered:\n%s.", socketPath)
    83  	}
    84  }
    85  
    86  func waitForUnregistration(
    87  	t *testing.T,
    88  	socketPath string,
    89  	asw cache.ActualStateOfWorld) {
    90  	err := retryWithExponentialBackOff(
    91  		time.Duration(500*time.Millisecond),
    92  		func() (bool, error) {
    93  			registeredPlugins := asw.GetRegisteredPlugins()
    94  			for _, plugin := range registeredPlugins {
    95  				if plugin.SocketPath == socketPath {
    96  					return false, nil
    97  				}
    98  			}
    99  			return true, nil
   100  		},
   101  	)
   102  
   103  	if err != nil {
   104  		t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
   105  	}
   106  }
   107  
   108  func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
   109  	backoff := wait.Backoff{
   110  		Duration: initialDuration,
   111  		Factor:   3,
   112  		Jitter:   0,
   113  		Steps:    6,
   114  	}
   115  	return wait.ExponentialBackoff(backoff, fn)
   116  }
   117  
   118  type DummyImpl struct{}
   119  
   120  func NewDummyImpl() *DummyImpl {
   121  	return &DummyImpl{}
   122  }
   123  
   124  // ValidatePlugin is a dummy implementation
   125  func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
   126  	return nil
   127  }
   128  
   129  // RegisterPlugin is a dummy implementation
   130  func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
   131  	return nil
   132  }
   133  
   134  // DeRegisterPlugin is a dummy implementation
   135  func (d *DummyImpl) DeRegisterPlugin(pluginName string) {
   136  }
   137  
   138  // Calls Run()
   139  // Verifies that asw and dsw have no plugins
   140  func Test_Run_Positive_DoNothing(t *testing.T) {
   141  	defer cleanup(t)
   142  
   143  	dsw := cache.NewDesiredStateOfWorld()
   144  	asw := cache.NewActualStateOfWorld()
   145  	fakeRecorder := &record.FakeRecorder{}
   146  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   147  		fakeRecorder,
   148  	))
   149  	reconciler := NewReconciler(
   150  		oex,
   151  		reconcilerLoopSleepDuration,
   152  		dsw,
   153  		asw,
   154  	)
   155  	// Act
   156  	runReconciler(reconciler)
   157  
   158  	// Get dsw and asw plugins; they should both be empty
   159  	if len(asw.GetRegisteredPlugins()) != 0 {
   160  		t.Fatalf("Test_Run_Positive_DoNothing: actual state of world should be empty but it's not")
   161  	}
   162  	if len(dsw.GetPluginsToRegister()) != 0 {
   163  		t.Fatalf("Test_Run_Positive_DoNothing: desired state of world should be empty but it's not")
   164  	}
   165  }
   166  
   167  // Populates desiredStateOfWorld cache with one plugin.
   168  // Calls Run()
   169  // Verifies the actual state of world contains that plugin
   170  func Test_Run_Positive_Register(t *testing.T) {
   171  	// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
   172  	if runtime.GOOS == "windows" {
   173  		t.Skip("Skipping test that fails on Windows")
   174  	}
   175  
   176  	defer cleanup(t)
   177  
   178  	dsw := cache.NewDesiredStateOfWorld()
   179  	asw := cache.NewActualStateOfWorld()
   180  	di := NewDummyImpl()
   181  	fakeRecorder := &record.FakeRecorder{}
   182  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   183  		fakeRecorder,
   184  	))
   185  	reconciler := NewReconciler(
   186  		oex,
   187  		reconcilerLoopSleepDuration,
   188  		dsw,
   189  		asw,
   190  	)
   191  	reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
   192  
   193  	// Start the reconciler to fill ASW.
   194  	stopChan := make(chan struct{})
   195  	defer close(stopChan)
   196  	go reconciler.Run(stopChan)
   197  	socketPath := filepath.Join(socketDir, "plugin.sock")
   198  	pluginName := fmt.Sprintf("example-plugin")
   199  	p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   200  	require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   201  	defer func() {
   202  		require.NoError(t, p.Stop())
   203  	}()
   204  	timestampBeforeRegistration := time.Now()
   205  	dsw.AddOrUpdatePlugin(socketPath)
   206  	waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
   207  
   208  	// Get asw plugins; it should contain the added plugin
   209  	aswPlugins := asw.GetRegisteredPlugins()
   210  	if len(aswPlugins) != 1 {
   211  		t.Fatalf("Test_Run_Positive_Register: actual state of world length should be one but it's %d", len(aswPlugins))
   212  	}
   213  	if aswPlugins[0].SocketPath != socketPath {
   214  		t.Fatalf("Test_Run_Positive_Register: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
   215  	}
   216  }
   217  
   218  // Populates desiredStateOfWorld cache with one plugin
   219  // Calls Run()
   220  // Verifies there is one plugin now in actual state of world.
   221  // Deletes plugin from desired state of world.
   222  // Verifies that plugin no longer exists in actual state of world.
   223  func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
   224  	// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
   225  	if runtime.GOOS == "windows" {
   226  		t.Skip("Skipping test that fails on Windows")
   227  	}
   228  
   229  	defer cleanup(t)
   230  
   231  	dsw := cache.NewDesiredStateOfWorld()
   232  	asw := cache.NewActualStateOfWorld()
   233  	di := NewDummyImpl()
   234  	fakeRecorder := &record.FakeRecorder{}
   235  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   236  		fakeRecorder,
   237  	))
   238  	reconciler := NewReconciler(
   239  		oex,
   240  		reconcilerLoopSleepDuration,
   241  		dsw,
   242  		asw,
   243  	)
   244  	reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
   245  
   246  	// Start the reconciler to fill ASW.
   247  	stopChan := make(chan struct{})
   248  	defer close(stopChan)
   249  	go reconciler.Run(stopChan)
   250  
   251  	socketPath := filepath.Join(socketDir, "plugin.sock")
   252  	pluginName := fmt.Sprintf("example-plugin")
   253  	p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   254  	require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   255  	timestampBeforeRegistration := time.Now()
   256  	dsw.AddOrUpdatePlugin(socketPath)
   257  	waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
   258  
   259  	// Get asw plugins; it should contain the added plugin
   260  	aswPlugins := asw.GetRegisteredPlugins()
   261  	if len(aswPlugins) != 1 {
   262  		t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
   263  	}
   264  	if aswPlugins[0].SocketPath != socketPath {
   265  		t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
   266  	}
   267  
   268  	dsw.RemovePlugin(socketPath)
   269  	os.Remove(socketPath)
   270  	waitForUnregistration(t, socketPath, asw)
   271  
   272  	// Get asw plugins; it should no longer contain the added plugin
   273  	aswPlugins = asw.GetRegisteredPlugins()
   274  	if len(aswPlugins) != 0 {
   275  		t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be zero but it's %d", len(aswPlugins))
   276  	}
   277  }
   278  
   279  // Populates desiredStateOfWorld cache with one plugin
   280  // Calls Run()
   281  // Then update the timestamp of the plugin
   282  // Verifies that the plugin is reregistered.
   283  // Verifies the plugin with updated timestamp now in actual state of world.
   284  func Test_Run_Positive_ReRegister(t *testing.T) {
   285  	// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
   286  	if runtime.GOOS == "windows" {
   287  		t.Skip("Skipping test that fails on Windows")
   288  	}
   289  
   290  	defer cleanup(t)
   291  
   292  	dsw := cache.NewDesiredStateOfWorld()
   293  	asw := cache.NewActualStateOfWorld()
   294  	di := NewDummyImpl()
   295  	fakeRecorder := &record.FakeRecorder{}
   296  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   297  		fakeRecorder,
   298  	))
   299  	reconciler := NewReconciler(
   300  		oex,
   301  		reconcilerLoopSleepDuration,
   302  		dsw,
   303  		asw,
   304  	)
   305  	reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
   306  
   307  	// Start the reconciler to fill ASW.
   308  	stopChan := make(chan struct{})
   309  	defer close(stopChan)
   310  	go reconciler.Run(stopChan)
   311  
   312  	socketPath := filepath.Join(socketDir, "plugin2.sock")
   313  	pluginName := fmt.Sprintf("example-plugin2")
   314  	p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
   315  	require.NoError(t, p.Serve("v1beta1", "v1beta2"))
   316  	timestampBeforeRegistration := time.Now()
   317  	dsw.AddOrUpdatePlugin(socketPath)
   318  	waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
   319  
   320  	timeStampBeforeReRegistration := time.Now()
   321  	// Add the plugin again to update the timestamp
   322  	dsw.AddOrUpdatePlugin(socketPath)
   323  	// This should trigger a deregistration and a regitration
   324  	// The process of unregistration and reregistration can happen so fast that
   325  	// we are not able to catch it with waitForUnregistration, so here we are checking
   326  	// the plugin has an updated timestamp.
   327  	waitForRegistration(t, socketPath, timeStampBeforeReRegistration, asw)
   328  
   329  	// Get asw plugins; it should contain the added plugin
   330  	aswPlugins := asw.GetRegisteredPlugins()
   331  	if len(aswPlugins) != 1 {
   332  		t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
   333  	}
   334  	if aswPlugins[0].SocketPath != socketPath {
   335  		t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
   336  	}
   337  }
   338  

View as plain text