...

Source file src/k8s.io/kubernetes/pkg/kubelet/config/file_linux.go

Documentation: k8s.io/kubernetes/pkg/kubelet/config

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2016 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package config
    21  
    22  import (
    23  	"fmt"
    24  	"os"
    25  	"path/filepath"
    26  	"strings"
    27  	"time"
    28  
    29  	"github.com/fsnotify/fsnotify"
    30  	"k8s.io/klog/v2"
    31  
    32  	"k8s.io/api/core/v1"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	"k8s.io/client-go/util/flowcontrol"
    35  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    36  )
    37  
    38  const (
    39  	retryPeriod    = 1 * time.Second
    40  	maxRetryPeriod = 20 * time.Second
    41  )
    42  
    43  type retryableError struct {
    44  	message string
    45  }
    46  
    47  func (e *retryableError) Error() string {
    48  	return e.message
    49  }
    50  
    51  func (s *sourceFile) startWatch() {
    52  	backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
    53  	backOffID := "watch"
    54  
    55  	go wait.Forever(func() {
    56  		if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
    57  			return
    58  		}
    59  
    60  		if err := s.doWatch(); err != nil {
    61  			klog.ErrorS(err, "Unable to read config path", "path", s.path)
    62  			if _, retryable := err.(*retryableError); !retryable {
    63  				backOff.Next(backOffID, time.Now())
    64  			}
    65  		}
    66  	}, retryPeriod)
    67  }
    68  
    69  func (s *sourceFile) doWatch() error {
    70  	_, err := os.Stat(s.path)
    71  	if err != nil {
    72  		if !os.IsNotExist(err) {
    73  			return err
    74  		}
    75  		// Emit an update with an empty PodList to allow FileSource to be marked as seen
    76  		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
    77  		return &retryableError{"path does not exist, ignoring"}
    78  	}
    79  
    80  	w, err := fsnotify.NewWatcher()
    81  	if err != nil {
    82  		return fmt.Errorf("unable to create inotify: %v", err)
    83  	}
    84  	defer w.Close()
    85  
    86  	err = w.Add(s.path)
    87  	if err != nil {
    88  		return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err)
    89  	}
    90  
    91  	for {
    92  		select {
    93  		case event := <-w.Events:
    94  			if err = s.produceWatchEvent(&event); err != nil {
    95  				return fmt.Errorf("error while processing inotify event (%+v): %v", event, err)
    96  			}
    97  		case err = <-w.Errors:
    98  			return fmt.Errorf("error while watching %q: %v", s.path, err)
    99  		}
   100  	}
   101  }
   102  
   103  func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
   104  	// Ignore file start with dots
   105  	if strings.HasPrefix(filepath.Base(e.Name), ".") {
   106  		klog.V(4).InfoS("Ignored pod manifest, because it starts with dots", "eventName", e.Name)
   107  		return nil
   108  	}
   109  	var eventType podEventType
   110  	switch {
   111  	case (e.Op & fsnotify.Create) > 0:
   112  		eventType = podAdd
   113  	case (e.Op & fsnotify.Write) > 0:
   114  		eventType = podModify
   115  	case (e.Op & fsnotify.Chmod) > 0:
   116  		eventType = podModify
   117  	case (e.Op & fsnotify.Remove) > 0:
   118  		eventType = podDelete
   119  	case (e.Op & fsnotify.Rename) > 0:
   120  		eventType = podDelete
   121  	default:
   122  		// Ignore rest events
   123  		return nil
   124  	}
   125  
   126  	s.watchEvents <- &watchEvent{e.Name, eventType}
   127  	return nil
   128  }
   129  
   130  func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
   131  	switch e.eventType {
   132  	case podAdd, podModify:
   133  		pod, err := s.extractFromFile(e.fileName)
   134  		if err != nil {
   135  			return fmt.Errorf("can't process config file %q: %v", e.fileName, err)
   136  		}
   137  		return s.store.Add(pod)
   138  	case podDelete:
   139  		if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist {
   140  			pod, podExist, err := s.store.GetByKey(objKey)
   141  			if err != nil {
   142  				return err
   143  			}
   144  			if !podExist {
   145  				return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey)
   146  			}
   147  			if err = s.store.Delete(pod); err != nil {
   148  				return fmt.Errorf("failed to remove deleted pod from cache: %v", err)
   149  			}
   150  			delete(s.fileKeyMapping, e.fileName)
   151  		}
   152  	}
   153  	return nil
   154  }
   155  

View as plain text