...

Source file src/k8s.io/kubernetes/pkg/kubelet/config/file_linux_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/config

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2016 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package config
    21  
    22  import (
    23  	"fmt"
    24  	"io"
    25  	"os"
    26  	"path/filepath"
    27  	"sync"
    28  	"testing"
    29  	"time"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/runtime"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	clientscheme "k8s.io/client-go/kubernetes/scheme"
    38  	api "k8s.io/kubernetes/pkg/apis/core"
    39  	k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
    40  	"k8s.io/kubernetes/pkg/apis/core/validation"
    41  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    42  	"k8s.io/kubernetes/pkg/securitycontext"
    43  )
    44  
    45  func TestExtractFromNonExistentFile(t *testing.T) {
    46  	ch := make(chan interface{}, 1)
    47  	lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch)
    48  	err := lw.doWatch()
    49  	if err == nil {
    50  		t.Errorf("Expected error")
    51  	}
    52  }
    53  
    54  func TestUpdateOnNonExistentFile(t *testing.T) {
    55  	ch := make(chan interface{})
    56  	NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
    57  	select {
    58  	case got := <-ch:
    59  		update := got.(kubetypes.PodUpdate)
    60  		expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
    61  		if !apiequality.Semantic.DeepDerivative(expected, update) {
    62  			t.Fatalf("expected %#v, Got %#v", expected, update)
    63  		}
    64  
    65  	case <-time.After(wait.ForeverTestTimeout):
    66  		t.Fatalf("expected update, timeout instead")
    67  	}
    68  }
    69  
    70  func TestReadPodsFromFileExistAlready(t *testing.T) {
    71  	hostname := types.NodeName("random-test-hostname")
    72  	var testCases = getTestCases(hostname)
    73  
    74  	for _, testCase := range testCases {
    75  		func() {
    76  			dirName, err := mkTempDir("file-test")
    77  			if err != nil {
    78  				t.Fatalf("unable to create temp dir: %v", err)
    79  			}
    80  			defer os.RemoveAll(dirName)
    81  			file := testCase.writeToFile(dirName, "test_pod_manifest", t)
    82  
    83  			ch := make(chan interface{})
    84  			NewSourceFile(file, hostname, time.Millisecond, ch)
    85  			select {
    86  			case got := <-ch:
    87  				update := got.(kubetypes.PodUpdate)
    88  				for _, pod := range update.Pods {
    89  					// TODO: remove the conversion when validation is performed on versioned objects.
    90  					internalPod := &api.Pod{}
    91  					if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
    92  						t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
    93  					}
    94  					if errs := validation.ValidatePodCreate(internalPod, validation.PodValidationOptions{}); len(errs) > 0 {
    95  						t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
    96  					}
    97  				}
    98  				if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
    99  					t.Fatalf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update)
   100  				}
   101  			case <-time.After(wait.ForeverTestTimeout):
   102  				t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
   103  			}
   104  		}()
   105  	}
   106  }
   107  
   108  var (
   109  	testCases = []struct {
   110  		watchDir bool
   111  		symlink  bool
   112  		period   time.Duration
   113  	}{
   114  		// set the period to be long enough for the file to be changed
   115  		// and short enough to trigger the event
   116  		{true, true, 3 * time.Second},
   117  
   118  		// set the period to avoid periodic PodUpdate event
   119  		{true, false, 60 * time.Second},
   120  		{false, true, 60 * time.Second},
   121  		{false, false, 60 * time.Second},
   122  	}
   123  )
   124  
   125  func TestWatchFileAdded(t *testing.T) {
   126  	for _, testCase := range testCases {
   127  		watchFileAdded(testCase.watchDir, testCase.symlink, t)
   128  	}
   129  }
   130  
   131  func TestWatchFileChanged(t *testing.T) {
   132  	for _, testCase := range testCases {
   133  		watchFileChanged(testCase.watchDir, testCase.symlink, testCase.period, t)
   134  	}
   135  }
   136  
   137  type testCase struct {
   138  	lock     *sync.Mutex
   139  	desc     string
   140  	pod      runtime.Object
   141  	expected kubetypes.PodUpdate
   142  }
   143  
   144  func getTestCases(hostname types.NodeName) []*testCase {
   145  	grace := int64(30)
   146  	enableServiceLinks := v1.DefaultEnableServiceLinks
   147  	return []*testCase{
   148  		{
   149  			lock: &sync.Mutex{},
   150  			desc: "Simple pod",
   151  			pod: &v1.Pod{
   152  				TypeMeta: metav1.TypeMeta{
   153  					Kind:       "Pod",
   154  					APIVersion: "",
   155  				},
   156  				ObjectMeta: metav1.ObjectMeta{
   157  					Name:      "test",
   158  					UID:       "12345",
   159  					Namespace: "mynamespace",
   160  				},
   161  				Spec: v1.PodSpec{
   162  					Containers:      []v1.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
   163  					SecurityContext: &v1.PodSecurityContext{},
   164  					SchedulerName:   v1.DefaultSchedulerName,
   165  				},
   166  				Status: v1.PodStatus{
   167  					Phase: v1.PodPending,
   168  				},
   169  			},
   170  			expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &v1.Pod{
   171  				ObjectMeta: metav1.ObjectMeta{
   172  					Name:        "test-" + string(hostname),
   173  					UID:         "12345",
   174  					Namespace:   "mynamespace",
   175  					Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"},
   176  				},
   177  				Spec: v1.PodSpec{
   178  					NodeName:                      string(hostname),
   179  					RestartPolicy:                 v1.RestartPolicyAlways,
   180  					DNSPolicy:                     v1.DNSClusterFirst,
   181  					TerminationGracePeriodSeconds: &grace,
   182  					Tolerations: []v1.Toleration{{
   183  						Operator: "Exists",
   184  						Effect:   "NoExecute",
   185  					}},
   186  					Containers: []v1.Container{{
   187  						Name:                     "image",
   188  						Image:                    "test/image",
   189  						TerminationMessagePath:   "/dev/termination-log",
   190  						ImagePullPolicy:          "Always",
   191  						SecurityContext:          securitycontext.ValidSecurityContextWithContainerDefaults(),
   192  						TerminationMessagePolicy: v1.TerminationMessageReadFile,
   193  					}},
   194  					SecurityContext:    &v1.PodSecurityContext{},
   195  					SchedulerName:      v1.DefaultSchedulerName,
   196  					EnableServiceLinks: &enableServiceLinks,
   197  				},
   198  				Status: v1.PodStatus{
   199  					Phase: v1.PodPending,
   200  				},
   201  			}),
   202  		},
   203  	}
   204  }
   205  
   206  func (tc *testCase) writeToFile(dir, name string, t *testing.T) string {
   207  	fileContents, err := runtime.Encode(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), tc.pod)
   208  	if err != nil {
   209  		t.Fatalf("%s: error in encoding the pod: %v", tc.desc, err)
   210  	}
   211  
   212  	fileName := filepath.Join(dir, name)
   213  	if err := writeFile(fileName, []byte(fileContents)); err != nil {
   214  		t.Fatalf("unable to write test file %#v", err)
   215  	}
   216  	return fileName
   217  }
   218  
   219  func createSymbolicLink(link, target, name string, t *testing.T) string {
   220  	linkName := filepath.Join(link, name)
   221  	linkedFile := filepath.Join(target, name)
   222  
   223  	err := os.Symlink(linkedFile, linkName)
   224  	if err != nil {
   225  		t.Fatalf("unexpected error when create symbolic link: %v", err)
   226  	}
   227  	return linkName
   228  }
   229  
   230  func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
   231  	hostname := types.NodeName("random-test-hostname")
   232  	var testCases = getTestCases(hostname)
   233  
   234  	fileNamePre := "test_pod_manifest"
   235  	for index, testCase := range testCases {
   236  		func() {
   237  			dirName, err := mkTempDir("dir-test")
   238  			if err != nil {
   239  				t.Fatalf("unable to create temp dir: %v", err)
   240  			}
   241  			defer removeAll(dirName, t)
   242  
   243  			fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
   244  			var linkedDirName string
   245  			if symlink {
   246  				linkedDirName, err = mkTempDir("linked-dir-test")
   247  				if err != nil {
   248  					t.Fatalf("unable to create temp dir for linked files: %v", err)
   249  				}
   250  				defer removeAll(linkedDirName, t)
   251  				createSymbolicLink(dirName, linkedDirName, fileName, t)
   252  			}
   253  
   254  			ch := make(chan interface{})
   255  			if watchDir {
   256  				NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
   257  			} else {
   258  				NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch)
   259  			}
   260  			expectEmptyUpdate(t, ch)
   261  
   262  			addFile := func() {
   263  				// Add a file
   264  				if symlink {
   265  					testCase.writeToFile(linkedDirName, fileName, t)
   266  					return
   267  				}
   268  
   269  				testCase.writeToFile(dirName, fileName, t)
   270  			}
   271  
   272  			go addFile()
   273  
   274  			// For !watchDir: expect an update by SourceFile.reloadConfig().
   275  			// For watchDir: expect at least one update from CREATE & MODIFY inotify event.
   276  			// Shouldn't expect two updates from CREATE & MODIFY because CREATE doesn't guarantee file written.
   277  			// In that case no update will be sent from CREATE event.
   278  			expectUpdate(t, ch, testCase)
   279  		}()
   280  	}
   281  }
   282  
   283  func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *testing.T) {
   284  	hostname := types.NodeName("random-test-hostname")
   285  	var testCases = getTestCases(hostname)
   286  
   287  	fileNamePre := "test_pod_manifest"
   288  	for index, testCase := range testCases {
   289  		func() {
   290  			dirName, err := mkTempDir("dir-test")
   291  			fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
   292  			if err != nil {
   293  				t.Fatalf("unable to create temp dir: %v", err)
   294  			}
   295  			defer removeAll(dirName, t)
   296  
   297  			var linkedDirName string
   298  			if symlink {
   299  				linkedDirName, err = mkTempDir("linked-dir-test")
   300  				if err != nil {
   301  					t.Fatalf("unable to create temp dir for linked files: %v", err)
   302  				}
   303  				defer removeAll(linkedDirName, t)
   304  				createSymbolicLink(dirName, linkedDirName, fileName, t)
   305  			}
   306  
   307  			var file string
   308  			ch := make(chan interface{})
   309  			func() {
   310  				testCase.lock.Lock()
   311  				defer testCase.lock.Unlock()
   312  
   313  				if symlink {
   314  					file = testCase.writeToFile(linkedDirName, fileName, t)
   315  					return
   316  				}
   317  
   318  				file = testCase.writeToFile(dirName, fileName, t)
   319  			}()
   320  
   321  			if watchDir {
   322  				NewSourceFile(dirName, hostname, period, ch)
   323  			} else {
   324  				NewSourceFile(file, hostname, period, ch)
   325  			}
   326  
   327  			// await fsnotify to be ready
   328  			time.Sleep(time.Second)
   329  
   330  			// expect an update by SourceFile.resetStoreFromPath()
   331  			expectUpdate(t, ch, testCase)
   332  
   333  			pod := testCase.pod.(*v1.Pod)
   334  			pod.Spec.Containers[0].Name = "image2"
   335  
   336  			testCase.expected.Pods[0].Spec.Containers[0].Name = "image2"
   337  			changeFile := func() {
   338  				// Edit the file content
   339  				if symlink {
   340  					file = testCase.writeToFile(linkedDirName, fileName, t)
   341  					return
   342  				}
   343  
   344  				file = testCase.writeToFile(dirName, fileName, t)
   345  			}
   346  
   347  			go changeFile()
   348  			// expect an update by MODIFY inotify event
   349  			expectUpdate(t, ch, testCase)
   350  
   351  			if watchDir {
   352  				go changeFileName(dirName, fileName, fileName+"_ch", t)
   353  				// expect an update by MOVED_FROM inotify event cause changing file name
   354  				expectEmptyUpdate(t, ch)
   355  				// expect an update by MOVED_TO inotify event cause changing file name
   356  				expectUpdate(t, ch, testCase)
   357  			}
   358  		}()
   359  	}
   360  }
   361  
   362  func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
   363  	timer := time.After(5 * time.Second)
   364  	for {
   365  		select {
   366  		case got := <-ch:
   367  			update := got.(kubetypes.PodUpdate)
   368  			if len(update.Pods) == 0 {
   369  				// filter out the empty updates from reading a non-existing path
   370  				continue
   371  			}
   372  			for _, pod := range update.Pods {
   373  				// TODO: remove the conversion when validation is performed on versioned objects.
   374  				internalPod := &api.Pod{}
   375  				if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
   376  					t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
   377  				}
   378  				if errs := validation.ValidatePodCreate(internalPod, validation.PodValidationOptions{}); len(errs) > 0 {
   379  					t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
   380  				}
   381  			}
   382  
   383  			if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
   384  				t.Fatalf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
   385  			}
   386  			return
   387  		case <-timer:
   388  			t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
   389  		}
   390  	}
   391  }
   392  
   393  func expectEmptyUpdate(t *testing.T, ch chan interface{}) {
   394  	timer := time.After(5 * time.Second)
   395  	for {
   396  		select {
   397  		case got := <-ch:
   398  			update := got.(kubetypes.PodUpdate)
   399  			if len(update.Pods) != 0 {
   400  				t.Fatalf("expected empty update, got %#v", update)
   401  			}
   402  			return
   403  		case <-timer:
   404  			t.Fatalf("expected empty update, timeout instead")
   405  		}
   406  	}
   407  }
   408  
   409  func writeFile(filename string, data []byte) error {
   410  	f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0666)
   411  	if err != nil {
   412  		return err
   413  	}
   414  	n, err := f.Write(data)
   415  	if err == nil && n < len(data) {
   416  		err = io.ErrShortWrite
   417  	}
   418  	if err1 := f.Close(); err == nil {
   419  		err = err1
   420  	}
   421  	return err
   422  }
   423  
   424  func changeFileName(dir, from, to string, t *testing.T) {
   425  	fromPath := filepath.Join(dir, from)
   426  	toPath := filepath.Join(dir, to)
   427  	if err := os.Rename(fromPath, toPath); err != nil {
   428  		t.Errorf("Fail to change file name: %s", err)
   429  	}
   430  }
   431  

View as plain text