...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package operationexecutor
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"strconv"
    23  	"testing"
    24  	"time"
    25  
    26  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    27  )
    28  
    29  const (
    30  	numPluginsToRegister   = 2
    31  	numPluginsToUnregister = 2
    32  )
    33  
    34  var _ OperationGenerator = &fakeOperationGenerator{}
    35  var socketDir string
    36  
    37  func init() {
    38  	d, err := os.MkdirTemp("", "operation_executor_test")
    39  	if err != nil {
    40  		panic(fmt.Sprintf("Could not create a temp directory: %s", d))
    41  	}
    42  	socketDir = d
    43  }
    44  
    45  func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
    46  	ch, quit, oe := setup()
    47  	for i := 0; i < numPluginsToRegister; i++ {
    48  		socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
    49  		oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
    50  	}
    51  	if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
    52  		t.Fatalf("Unable to start register operations in Concurrent for plugins")
    53  	}
    54  }
    55  
    56  func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
    57  	ch, quit, oe := setup()
    58  	socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
    59  	for i := 0; i < numPluginsToRegister; i++ {
    60  		oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
    61  
    62  	}
    63  	if !isOperationRunSerially(ch, quit) {
    64  		t.Fatalf("Unable to start register operations serially for plugins")
    65  	}
    66  }
    67  
    68  func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
    69  	ch, quit, oe := setup()
    70  	for i := 0; i < numPluginsToUnregister; i++ {
    71  		socketPath := "socket-path" + strconv.Itoa(i)
    72  		pluginInfo := cache.PluginInfo{SocketPath: socketPath}
    73  		oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
    74  
    75  	}
    76  	if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
    77  		t.Fatalf("Unable to start unregister operations in Concurrent for plugins")
    78  	}
    79  }
    80  
    81  func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
    82  	ch, quit, oe := setup()
    83  	socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
    84  	for i := 0; i < numPluginsToUnregister; i++ {
    85  		pluginInfo := cache.PluginInfo{SocketPath: socketPath}
    86  		oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
    87  
    88  	}
    89  	if !isOperationRunSerially(ch, quit) {
    90  		t.Fatalf("Unable to start unregister operations serially for plugins")
    91  	}
    92  }
    93  
    94  type fakeOperationGenerator struct {
    95  	ch   chan interface{}
    96  	quit chan interface{}
    97  }
    98  
    99  func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
   100  	return &fakeOperationGenerator{
   101  		ch:   ch,
   102  		quit: quit,
   103  	}
   104  }
   105  
   106  func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
   107  	socketPath string,
   108  	timestamp time.Time,
   109  	pluginHandlers map[string]cache.PluginHandler,
   110  	actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
   111  
   112  	opFunc := func() error {
   113  		startOperationAndBlock(fopg.ch, fopg.quit)
   114  		return nil
   115  	}
   116  	return opFunc
   117  }
   118  
   119  func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
   120  	pluginInfo cache.PluginInfo,
   121  	actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
   122  	opFunc := func() error {
   123  		startOperationAndBlock(fopg.ch, fopg.quit)
   124  		return nil
   125  	}
   126  	return opFunc
   127  }
   128  
   129  func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
   130  	defer close(quit)
   131  	numOperationsStarted := 0
   132  loop:
   133  	for {
   134  		select {
   135  		case <-ch:
   136  			numOperationsStarted++
   137  			if numOperationsStarted > 1 {
   138  				return false
   139  			}
   140  		case <-time.After(5 * time.Second):
   141  			break loop
   142  		}
   143  	}
   144  	return true
   145  }
   146  
   147  func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
   148  	defer close(quit)
   149  	numOperationsStarted := 0
   150  loop:
   151  	for {
   152  		select {
   153  		case <-ch:
   154  			numOperationsStarted++
   155  			if numOperationsStarted == numOperationsToRun {
   156  				return true
   157  			}
   158  		case <-time.After(5 * time.Second):
   159  			break loop
   160  		}
   161  	}
   162  	return false
   163  }
   164  
   165  func setup() (chan interface{}, chan interface{}, OperationExecutor) {
   166  	ch, quit := make(chan interface{}), make(chan interface{})
   167  	return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
   168  }
   169  
   170  // This function starts by writing to ch and blocks on the quit channel
   171  // until it is closed by the currently running test
   172  func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
   173  	ch <- nil
   174  	<-quit
   175  }
   176  

View as plain text