...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pluginwatcher
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"strings"
    23  
    24  	"github.com/fsnotify/fsnotify"
    25  	"k8s.io/klog/v2"
    26  
    27  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    28  	"k8s.io/kubernetes/pkg/kubelet/util"
    29  	utilfs "k8s.io/kubernetes/pkg/util/filesystem"
    30  )
    31  
    32  // Watcher is the plugin watcher
    33  type Watcher struct {
    34  	path                string
    35  	fs                  utilfs.Filesystem
    36  	fsWatcher           *fsnotify.Watcher
    37  	desiredStateOfWorld cache.DesiredStateOfWorld
    38  }
    39  
    40  // NewWatcher provides a new watcher for socket registration
    41  func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
    42  	return &Watcher{
    43  		path:                sockDir,
    44  		fs:                  &utilfs.DefaultFs{},
    45  		desiredStateOfWorld: desiredStateOfWorld,
    46  	}
    47  }
    48  
    49  // Start watches for the creation and deletion of plugin sockets at the path
    50  func (w *Watcher) Start(stopCh <-chan struct{}) error {
    51  	klog.V(2).InfoS("Plugin Watcher Start", "path", w.path)
    52  
    53  	// Creating the directory to be watched if it doesn't exist yet,
    54  	// and walks through the directory to discover the existing plugins.
    55  	if err := w.init(); err != nil {
    56  		return err
    57  	}
    58  
    59  	fsWatcher, err := fsnotify.NewWatcher()
    60  	if err != nil {
    61  		return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
    62  	}
    63  	w.fsWatcher = fsWatcher
    64  
    65  	// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
    66  	if err := w.traversePluginDir(w.path); err != nil {
    67  		klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)
    68  	}
    69  
    70  	go func(fsWatcher *fsnotify.Watcher) {
    71  		for {
    72  			select {
    73  			case event := <-fsWatcher.Events:
    74  				//TODO: Handle errors by taking corrective measures
    75  				if event.Has(fsnotify.Create) {
    76  					err := w.handleCreateEvent(event)
    77  					if err != nil {
    78  						klog.ErrorS(err, "Error when handling create event", "event", event)
    79  					}
    80  				} else if event.Has(fsnotify.Remove) {
    81  					w.handleDeleteEvent(event)
    82  				}
    83  				continue
    84  			case err := <-fsWatcher.Errors:
    85  				if err != nil {
    86  					klog.ErrorS(err, "FsWatcher received error")
    87  				}
    88  				continue
    89  			case <-stopCh:
    90  				w.fsWatcher.Close()
    91  				return
    92  			}
    93  		}
    94  	}(fsWatcher)
    95  
    96  	return nil
    97  }
    98  
    99  func (w *Watcher) init() error {
   100  	klog.V(4).InfoS("Ensuring Plugin directory", "path", w.path)
   101  
   102  	if err := w.fs.MkdirAll(w.path, 0755); err != nil {
   103  		return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
   104  	}
   105  
   106  	return nil
   107  }
   108  
   109  // Walks through the plugin directory discover any existing plugin sockets.
   110  // Ignore all errors except root dir not being walkable
   111  func (w *Watcher) traversePluginDir(dir string) error {
   112  	// watch the new dir
   113  	err := w.fsWatcher.Add(dir)
   114  	if err != nil {
   115  		return fmt.Errorf("failed to watch %s, err: %v", w.path, err)
   116  	}
   117  	// traverse existing children in the dir
   118  	return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
   119  		if err != nil {
   120  			if path == dir {
   121  				return fmt.Errorf("error accessing path: %s error: %v", path, err)
   122  			}
   123  
   124  			klog.ErrorS(err, "Error accessing path", "path", path)
   125  			return nil
   126  		}
   127  
   128  		// do not call fsWatcher.Add twice on the root dir to avoid potential problems.
   129  		if path == dir {
   130  			return nil
   131  		}
   132  
   133  		mode := info.Mode()
   134  		if mode.IsDir() {
   135  			if err := w.fsWatcher.Add(path); err != nil {
   136  				return fmt.Errorf("failed to watch %s, err: %v", path, err)
   137  			}
   138  		} else if isSocket, _ := util.IsUnixDomainSocket(path); isSocket {
   139  			event := fsnotify.Event{
   140  				Name: path,
   141  				Op:   fsnotify.Create,
   142  			}
   143  			//TODO: Handle errors by taking corrective measures
   144  			if err := w.handleCreateEvent(event); err != nil {
   145  				klog.ErrorS(err, "Error when handling create", "event", event)
   146  			}
   147  		} else {
   148  			klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode)
   149  		}
   150  
   151  		return nil
   152  	})
   153  }
   154  
   155  // Handle filesystem notify event.
   156  // Files names:
   157  // - MUST NOT start with a '.'
   158  func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
   159  	klog.V(6).InfoS("Handling create event", "event", event)
   160  
   161  	fi, err := getStat(event)
   162  	if err != nil {
   163  		return fmt.Errorf("stat file %s failed: %v", event.Name, err)
   164  	}
   165  
   166  	if strings.HasPrefix(fi.Name(), ".") {
   167  		klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name())
   168  		return nil
   169  	}
   170  
   171  	if !fi.IsDir() {
   172  		isSocket, err := util.IsUnixDomainSocket(util.NormalizePath(event.Name))
   173  		if err != nil {
   174  			return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err)
   175  		}
   176  		if !isSocket {
   177  			klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name())
   178  			return nil
   179  		}
   180  
   181  		return w.handlePluginRegistration(event.Name)
   182  	}
   183  
   184  	return w.traversePluginDir(event.Name)
   185  }
   186  
   187  func (w *Watcher) handlePluginRegistration(socketPath string) error {
   188  	socketPath = getSocketPath(socketPath)
   189  	// Update desired state of world list of plugins
   190  	// If the socket path does exist in the desired world cache, there's still
   191  	// a possibility that it has been deleted and recreated again before it is
   192  	// removed from the desired world cache, so we still need to call AddOrUpdatePlugin
   193  	// in this case to update the timestamp
   194  	klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath)
   195  	err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
   196  	if err != nil {
   197  		return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
   198  	}
   199  	return nil
   200  }
   201  
   202  func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
   203  	klog.V(6).InfoS("Handling delete event", "event", event)
   204  
   205  	socketPath := event.Name
   206  	klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath)
   207  	w.desiredStateOfWorld.RemovePlugin(socketPath)
   208  }
   209  

View as plain text