...

Source file src/k8s.io/kubernetes/pkg/kubelet/config/file.go

Documentation: k8s.io/kubernetes/pkg/kubelet/config

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package config
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"path/filepath"
    23  	"sort"
    24  	"strings"
    25  	"time"
    26  
    27  	"k8s.io/klog/v2"
    28  
    29  	v1 "k8s.io/api/core/v1"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"k8s.io/client-go/tools/cache"
    32  	api "k8s.io/kubernetes/pkg/apis/core"
    33  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    34  	utilio "k8s.io/utils/io"
    35  )
    36  
    37  type podEventType int
    38  
    39  const (
    40  	podAdd podEventType = iota
    41  	podModify
    42  	podDelete
    43  
    44  	eventBufferLen = 10
    45  )
    46  
    47  type watchEvent struct {
    48  	fileName  string
    49  	eventType podEventType
    50  }
    51  
    52  type sourceFile struct {
    53  	path           string
    54  	nodeName       types.NodeName
    55  	period         time.Duration
    56  	store          cache.Store
    57  	fileKeyMapping map[string]string
    58  	updates        chan<- interface{}
    59  	watchEvents    chan *watchEvent
    60  }
    61  
    62  // NewSourceFile watches a config file for changes.
    63  func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
    64  	// "github.com/sigma/go-inotify" requires a path without trailing "/"
    65  	path = strings.TrimRight(path, string(os.PathSeparator))
    66  
    67  	config := newSourceFile(path, nodeName, period, updates)
    68  	klog.V(1).InfoS("Watching path", "path", path)
    69  	config.run()
    70  }
    71  
    72  func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
    73  	send := func(objs []interface{}) {
    74  		var pods []*v1.Pod
    75  		for _, o := range objs {
    76  			pods = append(pods, o.(*v1.Pod))
    77  		}
    78  		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
    79  	}
    80  	store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
    81  	return &sourceFile{
    82  		path:           path,
    83  		nodeName:       nodeName,
    84  		period:         period,
    85  		store:          store,
    86  		fileKeyMapping: map[string]string{},
    87  		updates:        updates,
    88  		watchEvents:    make(chan *watchEvent, eventBufferLen),
    89  	}
    90  }
    91  
    92  func (s *sourceFile) run() {
    93  	listTicker := time.NewTicker(s.period)
    94  
    95  	go func() {
    96  		// Read path immediately to speed up startup.
    97  		if err := s.listConfig(); err != nil {
    98  			klog.ErrorS(err, "Unable to read config path", "path", s.path)
    99  		}
   100  		for {
   101  			select {
   102  			case <-listTicker.C:
   103  				if err := s.listConfig(); err != nil {
   104  					klog.ErrorS(err, "Unable to read config path", "path", s.path)
   105  				}
   106  			case e := <-s.watchEvents:
   107  				if err := s.consumeWatchEvent(e); err != nil {
   108  					klog.ErrorS(err, "Unable to process watch event")
   109  				}
   110  			}
   111  		}
   112  	}()
   113  
   114  	s.startWatch()
   115  }
   116  
   117  func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
   118  	return applyDefaults(pod, source, true, s.nodeName)
   119  }
   120  
   121  func (s *sourceFile) listConfig() error {
   122  	path := s.path
   123  	statInfo, err := os.Stat(path)
   124  	if err != nil {
   125  		if !os.IsNotExist(err) {
   126  			return err
   127  		}
   128  		// Emit an update with an empty PodList to allow FileSource to be marked as seen
   129  		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
   130  		return fmt.Errorf("path does not exist, ignoring")
   131  	}
   132  
   133  	switch {
   134  	case statInfo.Mode().IsDir():
   135  		pods, err := s.extractFromDir(path)
   136  		if err != nil {
   137  			return err
   138  		}
   139  		if len(pods) == 0 {
   140  			// Emit an update with an empty PodList to allow FileSource to be marked as seen
   141  			s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
   142  			return nil
   143  		}
   144  		return s.replaceStore(pods...)
   145  
   146  	case statInfo.Mode().IsRegular():
   147  		pod, err := s.extractFromFile(path)
   148  		if err != nil {
   149  			return err
   150  		}
   151  		return s.replaceStore(pod)
   152  
   153  	default:
   154  		return fmt.Errorf("path is not a directory or file")
   155  	}
   156  }
   157  
   158  // Get as many pod manifests as we can from a directory. Return an error if and only if something
   159  // prevented us from reading anything at all. Do not return an error if only some files
   160  // were problematic.
   161  func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
   162  	dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
   163  	if err != nil {
   164  		return nil, fmt.Errorf("glob failed: %v", err)
   165  	}
   166  
   167  	pods := make([]*v1.Pod, 0, len(dirents))
   168  	if len(dirents) == 0 {
   169  		return pods, nil
   170  	}
   171  
   172  	sort.Strings(dirents)
   173  	for _, path := range dirents {
   174  		statInfo, err := os.Stat(path)
   175  		if err != nil {
   176  			klog.ErrorS(err, "Could not get metadata", "path", path)
   177  			continue
   178  		}
   179  
   180  		switch {
   181  		case statInfo.Mode().IsDir():
   182  			klog.ErrorS(nil, "Provided manifest path is a directory, not recursing into manifest path", "path", path)
   183  		case statInfo.Mode().IsRegular():
   184  			pod, err := s.extractFromFile(path)
   185  			if err != nil {
   186  				if !os.IsNotExist(err) {
   187  					klog.ErrorS(err, "Could not process manifest file", "path", path)
   188  				}
   189  			} else {
   190  				pods = append(pods, pod)
   191  			}
   192  		default:
   193  			klog.ErrorS(nil, "Manifest path is not a directory or file", "path", path, "mode", statInfo.Mode())
   194  		}
   195  	}
   196  	return pods, nil
   197  }
   198  
   199  // extractFromFile parses a file for Pod configuration information.
   200  func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
   201  	klog.V(3).InfoS("Reading config file", "path", filename)
   202  	defer func() {
   203  		if err == nil && pod != nil {
   204  			objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
   205  			if keyErr != nil {
   206  				err = keyErr
   207  				return
   208  			}
   209  			s.fileKeyMapping[filename] = objKey
   210  		}
   211  	}()
   212  
   213  	file, err := os.Open(filename)
   214  	if err != nil {
   215  		return pod, err
   216  	}
   217  	defer file.Close()
   218  
   219  	data, err := utilio.ReadAtMost(file, maxConfigLength)
   220  	if err != nil {
   221  		return pod, err
   222  	}
   223  
   224  	defaultFn := func(pod *api.Pod) error {
   225  		return s.applyDefaults(pod, filename)
   226  	}
   227  
   228  	parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
   229  	if parsed {
   230  		if podErr != nil {
   231  			return pod, podErr
   232  		}
   233  		return pod, nil
   234  	}
   235  
   236  	return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
   237  }
   238  
   239  func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) {
   240  	objs := []interface{}{}
   241  	for _, pod := range pods {
   242  		objs = append(objs, pod)
   243  	}
   244  	return s.store.Replace(objs, "")
   245  }
   246  

View as plain text