1
2
3
4
19
20 package config
21
22 import (
23 "fmt"
24 "io"
25 "os"
26 "path/filepath"
27 "sync"
28 "testing"
29 "time"
30
31 v1 "k8s.io/api/core/v1"
32 apiequality "k8s.io/apimachinery/pkg/api/equality"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/runtime"
35 "k8s.io/apimachinery/pkg/types"
36 "k8s.io/apimachinery/pkg/util/wait"
37 clientscheme "k8s.io/client-go/kubernetes/scheme"
38 api "k8s.io/kubernetes/pkg/apis/core"
39 k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
40 "k8s.io/kubernetes/pkg/apis/core/validation"
41 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
42 "k8s.io/kubernetes/pkg/securitycontext"
43 )
44
45 func TestExtractFromNonExistentFile(t *testing.T) {
46 ch := make(chan interface{}, 1)
47 lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch)
48 err := lw.doWatch()
49 if err == nil {
50 t.Errorf("Expected error")
51 }
52 }
53
54 func TestUpdateOnNonExistentFile(t *testing.T) {
55 ch := make(chan interface{})
56 NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
57 select {
58 case got := <-ch:
59 update := got.(kubetypes.PodUpdate)
60 expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
61 if !apiequality.Semantic.DeepDerivative(expected, update) {
62 t.Fatalf("expected %#v, Got %#v", expected, update)
63 }
64
65 case <-time.After(wait.ForeverTestTimeout):
66 t.Fatalf("expected update, timeout instead")
67 }
68 }
69
70 func TestReadPodsFromFileExistAlready(t *testing.T) {
71 hostname := types.NodeName("random-test-hostname")
72 var testCases = getTestCases(hostname)
73
74 for _, testCase := range testCases {
75 func() {
76 dirName, err := mkTempDir("file-test")
77 if err != nil {
78 t.Fatalf("unable to create temp dir: %v", err)
79 }
80 defer os.RemoveAll(dirName)
81 file := testCase.writeToFile(dirName, "test_pod_manifest", t)
82
83 ch := make(chan interface{})
84 NewSourceFile(file, hostname, time.Millisecond, ch)
85 select {
86 case got := <-ch:
87 update := got.(kubetypes.PodUpdate)
88 for _, pod := range update.Pods {
89
90 internalPod := &api.Pod{}
91 if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
92 t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
93 }
94 if errs := validation.ValidatePodCreate(internalPod, validation.PodValidationOptions{}); len(errs) > 0 {
95 t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
96 }
97 }
98 if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
99 t.Fatalf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update)
100 }
101 case <-time.After(wait.ForeverTestTimeout):
102 t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
103 }
104 }()
105 }
106 }
107
108 var (
109 testCases = []struct {
110 watchDir bool
111 symlink bool
112 period time.Duration
113 }{
114
115
116 {true, true, 3 * time.Second},
117
118
119 {true, false, 60 * time.Second},
120 {false, true, 60 * time.Second},
121 {false, false, 60 * time.Second},
122 }
123 )
124
125 func TestWatchFileAdded(t *testing.T) {
126 for _, testCase := range testCases {
127 watchFileAdded(testCase.watchDir, testCase.symlink, t)
128 }
129 }
130
131 func TestWatchFileChanged(t *testing.T) {
132 for _, testCase := range testCases {
133 watchFileChanged(testCase.watchDir, testCase.symlink, testCase.period, t)
134 }
135 }
136
137 type testCase struct {
138 lock *sync.Mutex
139 desc string
140 pod runtime.Object
141 expected kubetypes.PodUpdate
142 }
143
144 func getTestCases(hostname types.NodeName) []*testCase {
145 grace := int64(30)
146 enableServiceLinks := v1.DefaultEnableServiceLinks
147 return []*testCase{
148 {
149 lock: &sync.Mutex{},
150 desc: "Simple pod",
151 pod: &v1.Pod{
152 TypeMeta: metav1.TypeMeta{
153 Kind: "Pod",
154 APIVersion: "",
155 },
156 ObjectMeta: metav1.ObjectMeta{
157 Name: "test",
158 UID: "12345",
159 Namespace: "mynamespace",
160 },
161 Spec: v1.PodSpec{
162 Containers: []v1.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
163 SecurityContext: &v1.PodSecurityContext{},
164 SchedulerName: v1.DefaultSchedulerName,
165 },
166 Status: v1.PodStatus{
167 Phase: v1.PodPending,
168 },
169 },
170 expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &v1.Pod{
171 ObjectMeta: metav1.ObjectMeta{
172 Name: "test-" + string(hostname),
173 UID: "12345",
174 Namespace: "mynamespace",
175 Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"},
176 },
177 Spec: v1.PodSpec{
178 NodeName: string(hostname),
179 RestartPolicy: v1.RestartPolicyAlways,
180 DNSPolicy: v1.DNSClusterFirst,
181 TerminationGracePeriodSeconds: &grace,
182 Tolerations: []v1.Toleration{{
183 Operator: "Exists",
184 Effect: "NoExecute",
185 }},
186 Containers: []v1.Container{{
187 Name: "image",
188 Image: "test/image",
189 TerminationMessagePath: "/dev/termination-log",
190 ImagePullPolicy: "Always",
191 SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
192 TerminationMessagePolicy: v1.TerminationMessageReadFile,
193 }},
194 SecurityContext: &v1.PodSecurityContext{},
195 SchedulerName: v1.DefaultSchedulerName,
196 EnableServiceLinks: &enableServiceLinks,
197 },
198 Status: v1.PodStatus{
199 Phase: v1.PodPending,
200 },
201 }),
202 },
203 }
204 }
205
206 func (tc *testCase) writeToFile(dir, name string, t *testing.T) string {
207 fileContents, err := runtime.Encode(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), tc.pod)
208 if err != nil {
209 t.Fatalf("%s: error in encoding the pod: %v", tc.desc, err)
210 }
211
212 fileName := filepath.Join(dir, name)
213 if err := writeFile(fileName, []byte(fileContents)); err != nil {
214 t.Fatalf("unable to write test file %#v", err)
215 }
216 return fileName
217 }
218
219 func createSymbolicLink(link, target, name string, t *testing.T) string {
220 linkName := filepath.Join(link, name)
221 linkedFile := filepath.Join(target, name)
222
223 err := os.Symlink(linkedFile, linkName)
224 if err != nil {
225 t.Fatalf("unexpected error when create symbolic link: %v", err)
226 }
227 return linkName
228 }
229
230 func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
231 hostname := types.NodeName("random-test-hostname")
232 var testCases = getTestCases(hostname)
233
234 fileNamePre := "test_pod_manifest"
235 for index, testCase := range testCases {
236 func() {
237 dirName, err := mkTempDir("dir-test")
238 if err != nil {
239 t.Fatalf("unable to create temp dir: %v", err)
240 }
241 defer removeAll(dirName, t)
242
243 fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
244 var linkedDirName string
245 if symlink {
246 linkedDirName, err = mkTempDir("linked-dir-test")
247 if err != nil {
248 t.Fatalf("unable to create temp dir for linked files: %v", err)
249 }
250 defer removeAll(linkedDirName, t)
251 createSymbolicLink(dirName, linkedDirName, fileName, t)
252 }
253
254 ch := make(chan interface{})
255 if watchDir {
256 NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
257 } else {
258 NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch)
259 }
260 expectEmptyUpdate(t, ch)
261
262 addFile := func() {
263
264 if symlink {
265 testCase.writeToFile(linkedDirName, fileName, t)
266 return
267 }
268
269 testCase.writeToFile(dirName, fileName, t)
270 }
271
272 go addFile()
273
274
275
276
277
278 expectUpdate(t, ch, testCase)
279 }()
280 }
281 }
282
283 func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *testing.T) {
284 hostname := types.NodeName("random-test-hostname")
285 var testCases = getTestCases(hostname)
286
287 fileNamePre := "test_pod_manifest"
288 for index, testCase := range testCases {
289 func() {
290 dirName, err := mkTempDir("dir-test")
291 fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
292 if err != nil {
293 t.Fatalf("unable to create temp dir: %v", err)
294 }
295 defer removeAll(dirName, t)
296
297 var linkedDirName string
298 if symlink {
299 linkedDirName, err = mkTempDir("linked-dir-test")
300 if err != nil {
301 t.Fatalf("unable to create temp dir for linked files: %v", err)
302 }
303 defer removeAll(linkedDirName, t)
304 createSymbolicLink(dirName, linkedDirName, fileName, t)
305 }
306
307 var file string
308 ch := make(chan interface{})
309 func() {
310 testCase.lock.Lock()
311 defer testCase.lock.Unlock()
312
313 if symlink {
314 file = testCase.writeToFile(linkedDirName, fileName, t)
315 return
316 }
317
318 file = testCase.writeToFile(dirName, fileName, t)
319 }()
320
321 if watchDir {
322 NewSourceFile(dirName, hostname, period, ch)
323 } else {
324 NewSourceFile(file, hostname, period, ch)
325 }
326
327
328 time.Sleep(time.Second)
329
330
331 expectUpdate(t, ch, testCase)
332
333 pod := testCase.pod.(*v1.Pod)
334 pod.Spec.Containers[0].Name = "image2"
335
336 testCase.expected.Pods[0].Spec.Containers[0].Name = "image2"
337 changeFile := func() {
338
339 if symlink {
340 file = testCase.writeToFile(linkedDirName, fileName, t)
341 return
342 }
343
344 file = testCase.writeToFile(dirName, fileName, t)
345 }
346
347 go changeFile()
348
349 expectUpdate(t, ch, testCase)
350
351 if watchDir {
352 go changeFileName(dirName, fileName, fileName+"_ch", t)
353
354 expectEmptyUpdate(t, ch)
355
356 expectUpdate(t, ch, testCase)
357 }
358 }()
359 }
360 }
361
362 func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
363 timer := time.After(5 * time.Second)
364 for {
365 select {
366 case got := <-ch:
367 update := got.(kubetypes.PodUpdate)
368 if len(update.Pods) == 0 {
369
370 continue
371 }
372 for _, pod := range update.Pods {
373
374 internalPod := &api.Pod{}
375 if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
376 t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
377 }
378 if errs := validation.ValidatePodCreate(internalPod, validation.PodValidationOptions{}); len(errs) > 0 {
379 t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
380 }
381 }
382
383 if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
384 t.Fatalf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
385 }
386 return
387 case <-timer:
388 t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
389 }
390 }
391 }
392
393 func expectEmptyUpdate(t *testing.T, ch chan interface{}) {
394 timer := time.After(5 * time.Second)
395 for {
396 select {
397 case got := <-ch:
398 update := got.(kubetypes.PodUpdate)
399 if len(update.Pods) != 0 {
400 t.Fatalf("expected empty update, got %#v", update)
401 }
402 return
403 case <-timer:
404 t.Fatalf("expected empty update, timeout instead")
405 }
406 }
407 }
408
409 func writeFile(filename string, data []byte) error {
410 f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0666)
411 if err != nil {
412 return err
413 }
414 n, err := f.Write(data)
415 if err == nil && n < len(data) {
416 err = io.ErrShortWrite
417 }
418 if err1 := f.Close(); err == nil {
419 err = err1
420 }
421 return err
422 }
423
424 func changeFileName(dir, from, to string, t *testing.T) {
425 fromPath := filepath.Join(dir, from)
426 toPath := filepath.Join(dir, to)
427 if err := os.Rename(fromPath, toPath); err != nil {
428 t.Errorf("Fail to change file name: %s", err)
429 }
430 }
431
View as plain text