...

Source file src/k8s.io/kubernetes/pkg/volume/flexvolume/probe.go

Documentation: k8s.io/kubernetes/pkg/volume/flexvolume

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package flexvolume
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"path/filepath"
    23  	"strings"
    24  	"sync"
    25  
    26  	"github.com/fsnotify/fsnotify"
    27  	"k8s.io/apimachinery/pkg/util/errors"
    28  	"k8s.io/klog/v2"
    29  	utilfs "k8s.io/kubernetes/pkg/util/filesystem"
    30  	"k8s.io/kubernetes/pkg/volume"
    31  	"k8s.io/utils/exec"
    32  	utilstrings "k8s.io/utils/strings"
    33  )
    34  
    35  type flexVolumeProber struct {
    36  	mutex          sync.Mutex
    37  	pluginDir      string         // Flexvolume driver directory
    38  	runner         exec.Interface // Interface to use for execing flex calls
    39  	watcher        utilfs.FSWatcher
    40  	factory        PluginFactory
    41  	fs             utilfs.Filesystem
    42  	probeAllNeeded bool
    43  	eventsMap      map[string]volume.ProbeOperation // the key is the driver directory path, the value is the corresponding operation
    44  }
    45  
    46  // GetDynamicPluginProber creates dynamic plugin prober
    47  func GetDynamicPluginProber(pluginDir string, runner exec.Interface) volume.DynamicPluginProber {
    48  	return &flexVolumeProber{
    49  		pluginDir: pluginDir,
    50  		watcher:   utilfs.NewFsnotifyWatcher(),
    51  		factory:   pluginFactory{},
    52  		runner:    runner,
    53  		fs:        &utilfs.DefaultFs{},
    54  	}
    55  }
    56  
    57  func (prober *flexVolumeProber) Init() error {
    58  	prober.testAndSetProbeAllNeeded(true)
    59  	prober.eventsMap = map[string]volume.ProbeOperation{}
    60  
    61  	if err := prober.createPluginDir(); err != nil {
    62  		return err
    63  	}
    64  	if err := prober.initWatcher(); err != nil {
    65  		return err
    66  	}
    67  
    68  	return nil
    69  }
    70  
    71  // If probeAllNeeded is true, probe all pluginDir
    72  // else probe events in eventsMap
    73  func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) {
    74  	if prober.probeAllNeeded {
    75  		prober.testAndSetProbeAllNeeded(false)
    76  		return prober.probeAll()
    77  	}
    78  
    79  	return prober.probeMap()
    80  }
    81  
    82  func (prober *flexVolumeProber) probeMap() (events []volume.ProbeEvent, err error) {
    83  	// TODO use a concurrent map to avoid Locking the entire map
    84  	prober.mutex.Lock()
    85  	defer prober.mutex.Unlock()
    86  	probeEvents := []volume.ProbeEvent{}
    87  	allErrs := []error{}
    88  	for driverDirPathAbs, op := range prober.eventsMap {
    89  		driverDirName := filepath.Base(driverDirPathAbs) // e.g. driverDirName = vendor~cifs
    90  		probeEvent, pluginErr := prober.newProbeEvent(driverDirName, op)
    91  		if pluginErr != nil {
    92  			allErrs = append(allErrs, pluginErr)
    93  			continue
    94  		}
    95  		probeEvents = append(probeEvents, probeEvent)
    96  
    97  		delete(prober.eventsMap, driverDirPathAbs)
    98  	}
    99  	return probeEvents, errors.NewAggregate(allErrs)
   100  }
   101  
   102  func (prober *flexVolumeProber) probeAll() (events []volume.ProbeEvent, err error) {
   103  	probeEvents := []volume.ProbeEvent{}
   104  	allErrs := []error{}
   105  	files, err := prober.fs.ReadDir(prober.pluginDir)
   106  	if err != nil {
   107  		return nil, fmt.Errorf("error reading the Flexvolume directory: %s", err)
   108  	}
   109  	for _, f := range files {
   110  		// only directories with names that do not begin with '.' are counted as plugins
   111  		// and pluginDir/dirname/dirname should be an executable
   112  		// unless dirname contains '~' for escaping namespace
   113  		// e.g. dirname = vendor~cifs
   114  		// then, executable will be pluginDir/dirname/cifs
   115  		if f.IsDir() && filepath.Base(f.Name())[0] != '.' {
   116  			probeEvent, pluginErr := prober.newProbeEvent(f.Name(), volume.ProbeAddOrUpdate)
   117  			if pluginErr != nil {
   118  				allErrs = append(allErrs, pluginErr)
   119  				continue
   120  			}
   121  			probeEvents = append(probeEvents, probeEvent)
   122  		}
   123  	}
   124  	return probeEvents, errors.NewAggregate(allErrs)
   125  }
   126  
   127  func (prober *flexVolumeProber) newProbeEvent(driverDirName string, op volume.ProbeOperation) (volume.ProbeEvent, error) {
   128  	probeEvent := volume.ProbeEvent{
   129  		Op: op,
   130  	}
   131  	if op == volume.ProbeAddOrUpdate {
   132  		plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, driverDirName, prober.runner)
   133  		if pluginErr != nil {
   134  			pluginErr = fmt.Errorf(
   135  				"error creating Flexvolume plugin from directory %s, skipping. Error: %s",
   136  				driverDirName, pluginErr)
   137  			return probeEvent, pluginErr
   138  		}
   139  		probeEvent.Plugin = plugin
   140  		probeEvent.PluginName = plugin.GetPluginName()
   141  	} else if op == volume.ProbeRemove {
   142  		driverName := utilstrings.UnescapeQualifiedName(driverDirName)
   143  		probeEvent.PluginName = driverName
   144  
   145  	} else {
   146  		return probeEvent, fmt.Errorf("Unknown Operation on directory: %s. ", driverDirName)
   147  	}
   148  	return probeEvent, nil
   149  }
   150  
   151  func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
   152  	// event.Name is the watched path.
   153  	if filepath.Base(event.Name)[0] == '.' {
   154  		// Ignore files beginning with '.'
   155  		return nil
   156  	}
   157  
   158  	eventPathAbs, err := filepath.Abs(event.Name)
   159  	if err != nil {
   160  		return err
   161  	}
   162  	parentPathAbs := filepath.Dir(eventPathAbs)
   163  	pluginDirAbs, err := filepath.Abs(prober.pluginDir)
   164  	if err != nil {
   165  		return err
   166  	}
   167  
   168  	// event of pluginDirAbs
   169  	if eventPathAbs == pluginDirAbs {
   170  		// If the Flexvolume plugin directory is removed, need to recreate it
   171  		// in order to keep it under watch.
   172  		if event.Has(fsnotify.Remove) {
   173  			if err := prober.createPluginDir(); err != nil {
   174  				return err
   175  			}
   176  			if err := prober.addWatchRecursive(pluginDirAbs); err != nil {
   177  				return err
   178  			}
   179  		}
   180  		return nil
   181  	}
   182  
   183  	// watch newly added subdirectories inside a driver directory
   184  	if event.Has(fsnotify.Create) {
   185  		if err := prober.addWatchRecursive(eventPathAbs); err != nil {
   186  			return err
   187  		}
   188  	}
   189  
   190  	eventRelPathToPluginDir, err := filepath.Rel(pluginDirAbs, eventPathAbs)
   191  	if err != nil {
   192  		return err
   193  	}
   194  
   195  	// event inside specific driver dir
   196  	if len(eventRelPathToPluginDir) > 0 {
   197  		driverDirName := strings.Split(eventRelPathToPluginDir, string(os.PathSeparator))[0]
   198  		driverDirAbs := filepath.Join(pluginDirAbs, driverDirName)
   199  		// executable is removed, will trigger ProbeRemove event
   200  		if event.Has(fsnotify.Remove) && (eventRelPathToPluginDir == getExecutablePathRel(driverDirName) || parentPathAbs == pluginDirAbs) {
   201  			prober.updateEventsMap(driverDirAbs, volume.ProbeRemove)
   202  		} else {
   203  			prober.updateEventsMap(driverDirAbs, volume.ProbeAddOrUpdate)
   204  		}
   205  	}
   206  
   207  	return nil
   208  }
   209  
   210  // getExecutableName returns the executableName of a flex plugin
   211  func getExecutablePathRel(driverDirName string) string {
   212  	parts := strings.Split(driverDirName, "~")
   213  	return filepath.Join(driverDirName, parts[len(parts)-1])
   214  }
   215  
   216  func (prober *flexVolumeProber) updateEventsMap(eventDirAbs string, op volume.ProbeOperation) {
   217  	prober.mutex.Lock()
   218  	defer prober.mutex.Unlock()
   219  	if prober.probeAllNeeded {
   220  		return
   221  	}
   222  	prober.eventsMap[eventDirAbs] = op
   223  }
   224  
   225  // Recursively adds to watch all directories inside and including the file specified by the given filename.
   226  // If the file is a symlink to a directory, it will watch the symlink but not any of the subdirectories.
   227  //
   228  // Each file or directory change triggers two events: one from the watch on itself, another from the watch
   229  // on its parent directory.
   230  func (prober *flexVolumeProber) addWatchRecursive(filename string) error {
   231  	// this may be called with an actual absolute path on Windows (with a C:\ prefix).
   232  	// But the prober.fs.Walk below will execute filepath.Join(fs.root, filenameAbove), which
   233  	// will result in an incorrect path, you can't join C:\path and C:\another\path.
   234  	filename = strings.TrimPrefix(filename, `C:\`)
   235  	addWatch := func(path string, info os.FileInfo, err error) error {
   236  		if err == nil && info.IsDir() {
   237  			if err := prober.watcher.AddWatch(path); err != nil {
   238  				klog.Errorf("Error recursively adding watch: %v", err)
   239  			}
   240  		}
   241  		return nil
   242  	}
   243  	return prober.fs.Walk(filename, addWatch)
   244  }
   245  
   246  // Creates a new filesystem watcher and adds watches for the plugin directory
   247  // and all of its subdirectories.
   248  func (prober *flexVolumeProber) initWatcher() error {
   249  	err := prober.watcher.Init(func(event fsnotify.Event) {
   250  		if err := prober.handleWatchEvent(event); err != nil {
   251  			klog.Errorf("Flexvolume prober watch: %s", err)
   252  		}
   253  	}, func(err error) {
   254  		klog.Errorf("Received an error from watcher: %s", err)
   255  	})
   256  	if err != nil {
   257  		return fmt.Errorf("error initializing watcher: %s", err)
   258  	}
   259  
   260  	if err := prober.addWatchRecursive(prober.pluginDir); err != nil {
   261  		return fmt.Errorf("error adding watch on Flexvolume directory: %s", err)
   262  	}
   263  
   264  	prober.watcher.Run()
   265  
   266  	return nil
   267  }
   268  
   269  // Creates the plugin directory, if it doesn't already exist.
   270  func (prober *flexVolumeProber) createPluginDir() error {
   271  	if _, err := prober.fs.Stat(prober.pluginDir); os.IsNotExist(err) {
   272  		klog.Warningf("Flexvolume plugin directory at %s does not exist. Recreating.", prober.pluginDir)
   273  		err := prober.fs.MkdirAll(prober.pluginDir, 0755)
   274  		if err != nil {
   275  			return fmt.Errorf("error (re-)creating driver directory: %s", err)
   276  		}
   277  	}
   278  
   279  	return nil
   280  }
   281  
   282  func (prober *flexVolumeProber) testAndSetProbeAllNeeded(newval bool) (oldval bool) {
   283  	prober.mutex.Lock()
   284  	defer prober.mutex.Unlock()
   285  	oldval, prober.probeAllNeeded = prober.probeAllNeeded, newval
   286  	return
   287  }
   288  

View as plain text