1
16
17 package operationexecutor
18
19 import (
20 "fmt"
21 "os"
22 "strconv"
23 "testing"
24 "time"
25
26 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
27 )
28
29 const (
30 numPluginsToRegister = 2
31 numPluginsToUnregister = 2
32 )
33
34 var _ OperationGenerator = &fakeOperationGenerator{}
35 var socketDir string
36
37 func init() {
38 d, err := os.MkdirTemp("", "operation_executor_test")
39 if err != nil {
40 panic(fmt.Sprintf("Could not create a temp directory: %s", d))
41 }
42 socketDir = d
43 }
44
45 func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
46 ch, quit, oe := setup()
47 for i := 0; i < numPluginsToRegister; i++ {
48 socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
49 oe.RegisterPlugin(socketPath, time.Now(), nil , nil )
50 }
51 if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
52 t.Fatalf("Unable to start register operations in Concurrent for plugins")
53 }
54 }
55
56 func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
57 ch, quit, oe := setup()
58 socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
59 for i := 0; i < numPluginsToRegister; i++ {
60 oe.RegisterPlugin(socketPath, time.Now(), nil , nil )
61
62 }
63 if !isOperationRunSerially(ch, quit) {
64 t.Fatalf("Unable to start register operations serially for plugins")
65 }
66 }
67
68 func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
69 ch, quit, oe := setup()
70 for i := 0; i < numPluginsToUnregister; i++ {
71 socketPath := "socket-path" + strconv.Itoa(i)
72 pluginInfo := cache.PluginInfo{SocketPath: socketPath}
73 oe.UnregisterPlugin(pluginInfo, nil )
74
75 }
76 if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
77 t.Fatalf("Unable to start unregister operations in Concurrent for plugins")
78 }
79 }
80
81 func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
82 ch, quit, oe := setup()
83 socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
84 for i := 0; i < numPluginsToUnregister; i++ {
85 pluginInfo := cache.PluginInfo{SocketPath: socketPath}
86 oe.UnregisterPlugin(pluginInfo, nil )
87
88 }
89 if !isOperationRunSerially(ch, quit) {
90 t.Fatalf("Unable to start unregister operations serially for plugins")
91 }
92 }
93
94 type fakeOperationGenerator struct {
95 ch chan interface{}
96 quit chan interface{}
97 }
98
99 func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
100 return &fakeOperationGenerator{
101 ch: ch,
102 quit: quit,
103 }
104 }
105
106 func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
107 socketPath string,
108 timestamp time.Time,
109 pluginHandlers map[string]cache.PluginHandler,
110 actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
111
112 opFunc := func() error {
113 startOperationAndBlock(fopg.ch, fopg.quit)
114 return nil
115 }
116 return opFunc
117 }
118
119 func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
120 pluginInfo cache.PluginInfo,
121 actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
122 opFunc := func() error {
123 startOperationAndBlock(fopg.ch, fopg.quit)
124 return nil
125 }
126 return opFunc
127 }
128
129 func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
130 defer close(quit)
131 numOperationsStarted := 0
132 loop:
133 for {
134 select {
135 case <-ch:
136 numOperationsStarted++
137 if numOperationsStarted > 1 {
138 return false
139 }
140 case <-time.After(5 * time.Second):
141 break loop
142 }
143 }
144 return true
145 }
146
147 func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
148 defer close(quit)
149 numOperationsStarted := 0
150 loop:
151 for {
152 select {
153 case <-ch:
154 numOperationsStarted++
155 if numOperationsStarted == numOperationsToRun {
156 return true
157 }
158 case <-time.After(5 * time.Second):
159 break loop
160 }
161 }
162 return false
163 }
164
165 func setup() (chan interface{}, chan interface{}, OperationExecutor) {
166 ch, quit := make(chan interface{}), make(chan interface{})
167 return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
168 }
169
170
171
172 func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
173 ch <- nil
174 <-quit
175 }
176
View as plain text