1
16
17 package pluginmanager
18
19 import (
20 "fmt"
21 "os"
22 "path/filepath"
23 "reflect"
24 "strconv"
25 "sync"
26 "testing"
27 "time"
28
29 "github.com/stretchr/testify/require"
30 "k8s.io/apimachinery/pkg/util/sets"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/client-go/tools/record"
33 registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
34
35 "k8s.io/kubernetes/pkg/kubelet/config"
36 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
37 )
38
39 var (
40 socketDir string
41 supportedVersions = []string{"v1beta1", "v1beta2"}
42 )
43
44 type fakePluginHandler struct {
45 events []string
46 sync.RWMutex
47 }
48
49 func newFakePluginHandler() *fakePluginHandler {
50 return &fakePluginHandler{}
51 }
52
53
54 func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
55 f.Lock()
56 defer f.Unlock()
57 f.events = append(f.events, "validate "+pluginName)
58 return nil
59 }
60
61
62 func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
63 f.Lock()
64 defer f.Unlock()
65 f.events = append(f.events, "register "+pluginName)
66 return nil
67 }
68
69
70 func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
71 f.Lock()
72 defer f.Unlock()
73 f.events = append(f.events, "deregister "+pluginName)
74 }
75
76 func (f *fakePluginHandler) Reset() {
77 f.Lock()
78 defer f.Unlock()
79 f.events = nil
80 }
81
82 func init() {
83 d, err := os.MkdirTemp("", "plugin_manager_test")
84 if err != nil {
85 panic(fmt.Sprintf("Could not create a temp directory: %s", d))
86 }
87
88 socketDir = d
89 }
90
91 func cleanup(t *testing.T) {
92 require.NoError(t, os.RemoveAll(socketDir))
93 os.MkdirAll(socketDir, 0755)
94 }
95
96 func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
97 expected := []string{"validate " + pluginName, "register " + pluginName}
98 err := retryWithExponentialBackOff(
99 100*time.Millisecond,
100 func() (bool, error) {
101 fakePluginHandler.Lock()
102 defer fakePluginHandler.Unlock()
103 if reflect.DeepEqual(fakePluginHandler.events, expected) {
104 return true, nil
105 }
106 t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events)
107 return false, nil
108 },
109 )
110 if err != nil {
111 t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
112 }
113 }
114
115 func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
116 backoff := wait.Backoff{
117 Duration: initialDuration,
118 Factor: 3,
119 Jitter: 0,
120 Steps: 6,
121 }
122 return wait.ExponentialBackoff(backoff, fn)
123 }
124
125 func TestPluginRegistration(t *testing.T) {
126 defer cleanup(t)
127
128 pluginManager := newTestPluginManager(socketDir)
129
130
131 stopChan := make(chan struct{})
132 defer close(stopChan)
133 go func() {
134 sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
135 pluginManager.Run(sourcesReady, stopChan)
136 }()
137
138
139 fakeHandler := newFakePluginHandler()
140 pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
141
142 const maxDepth = 3
143
144 for i := 0; i < maxDepth; i++ {
145 fakeHandler.Reset()
146 pluginDir := socketDir
147
148 for j := 0; j < i; j++ {
149 pluginDir = filepath.Join(pluginDir, strconv.Itoa(j))
150 }
151 require.NoError(t, os.MkdirAll(pluginDir, os.ModePerm))
152 socketPath := filepath.Join(pluginDir, fmt.Sprintf("plugin-%d.sock", i))
153
154
155 pluginName := fmt.Sprintf("example-plugin-%d", i)
156 p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
157 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
158
159
160 waitForRegistration(t, fakeHandler, pluginName)
161 }
162 }
163
164 func newTestPluginManager(sockDir string) PluginManager {
165 pm := NewPluginManager(
166 sockDir,
167 &record.FakeRecorder{},
168 )
169 return pm
170 }
171
View as plain text