...

Source file src/k8s.io/kubernetes/pkg/util/filesystem/watcher.go

Documentation: k8s.io/kubernetes/pkg/util/filesystem

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package filesystem
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"github.com/fsnotify/fsnotify"
    25  )
    26  
    27  // FSWatcher is a callback-based filesystem watcher abstraction for fsnotify.
    28  type FSWatcher interface {
    29  	// Initializes the watcher with the given watch handlers.
    30  	// Called before all other methods.
    31  	Init(FSEventHandler, FSErrorHandler) error
    32  
    33  	// Starts listening for events and errors.
    34  	// When an event or error occurs, the corresponding handler is called.
    35  	Run()
    36  
    37  	// Add a filesystem path to watch
    38  	AddWatch(path string) error
    39  }
    40  
    41  // FSEventHandler is called when a fsnotify event occurs.
    42  type FSEventHandler func(event fsnotify.Event)
    43  
    44  // FSErrorHandler is called when a fsnotify error occurs.
    45  type FSErrorHandler func(err error)
    46  
    47  type fsnotifyWatcher struct {
    48  	watcher      *fsnotify.Watcher
    49  	eventHandler FSEventHandler
    50  	errorHandler FSErrorHandler
    51  }
    52  
    53  var _ FSWatcher = &fsnotifyWatcher{}
    54  
    55  // NewFsnotifyWatcher returns an implementation of FSWatcher that continuously listens for
    56  // fsnotify events and calls the event handler as soon as an event is received.
    57  func NewFsnotifyWatcher() FSWatcher {
    58  	return &fsnotifyWatcher{}
    59  }
    60  
    61  func (w *fsnotifyWatcher) AddWatch(path string) error {
    62  	return w.watcher.Add(path)
    63  }
    64  
    65  func (w *fsnotifyWatcher) Init(eventHandler FSEventHandler, errorHandler FSErrorHandler) error {
    66  	var err error
    67  	w.watcher, err = fsnotify.NewWatcher()
    68  	if err != nil {
    69  		return err
    70  	}
    71  
    72  	w.eventHandler = eventHandler
    73  	w.errorHandler = errorHandler
    74  	return nil
    75  }
    76  
    77  func (w *fsnotifyWatcher) Run() {
    78  	go func() {
    79  		defer w.watcher.Close()
    80  		for {
    81  			select {
    82  			case event := <-w.watcher.Events:
    83  				if w.eventHandler != nil {
    84  					w.eventHandler(event)
    85  				}
    86  			case err := <-w.watcher.Errors:
    87  				if w.errorHandler != nil {
    88  					w.errorHandler(err)
    89  				}
    90  			}
    91  		}
    92  	}()
    93  }
    94  
    95  type watchAddRemover interface {
    96  	Add(path string) error
    97  	Remove(path string) error
    98  }
    99  type noopWatcher struct{}
   100  
   101  func (noopWatcher) Add(path string) error    { return nil }
   102  func (noopWatcher) Remove(path string) error { return nil }
   103  
   104  // WatchUntil watches the specified path for changes and blocks until ctx is canceled.
   105  // eventHandler() must be non-nil, and pollInterval must be greater than 0.
   106  // eventHandler() is invoked whenever a change event is observed or pollInterval elapses.
   107  // errorHandler() is invoked (if non-nil) whenever an error occurs initializing or watching the specified path.
   108  //
   109  // If path is a directory, only the directory and immediate children are watched.
   110  //
   111  // If path does not exist or cannot be watched, an error is passed to errorHandler() and eventHandler() is called at pollInterval.
   112  //
   113  // Multiple observed events may collapse to a single invocation of eventHandler().
   114  //
   115  // eventHandler() is invoked immediately after successful initialization of the filesystem watch,
   116  // in case the path changed concurrent with calling WatchUntil().
   117  func WatchUntil(ctx context.Context, pollInterval time.Duration, path string, eventHandler func(), errorHandler func(err error)) {
   118  	if pollInterval <= 0 {
   119  		panic(fmt.Errorf("pollInterval must be > 0"))
   120  	}
   121  	if eventHandler == nil {
   122  		panic(fmt.Errorf("eventHandler must be non-nil"))
   123  	}
   124  	if errorHandler == nil {
   125  		errorHandler = func(err error) {}
   126  	}
   127  
   128  	// Initialize watcher, fall back to no-op
   129  	var (
   130  		eventsCh chan fsnotify.Event
   131  		errorCh  chan error
   132  		watcher  watchAddRemover
   133  	)
   134  	if w, err := fsnotify.NewWatcher(); err != nil {
   135  		errorHandler(fmt.Errorf("error creating file watcher, falling back to poll at interval %s: %w", pollInterval, err))
   136  		watcher = noopWatcher{}
   137  	} else {
   138  		watcher = w
   139  		eventsCh = w.Events
   140  		errorCh = w.Errors
   141  		defer func() {
   142  			_ = w.Close()
   143  		}()
   144  	}
   145  
   146  	// Initialize background poll
   147  	t := time.NewTicker(pollInterval)
   148  	defer t.Stop()
   149  
   150  	attemptPeriodicRewatch := false
   151  
   152  	// Start watching the path
   153  	if err := watcher.Add(path); err != nil {
   154  		errorHandler(err)
   155  		attemptPeriodicRewatch = true
   156  	} else {
   157  		// Invoke handle() at least once after successfully registering the listener,
   158  		// in case the file changed concurrent with calling WatchUntil.
   159  		eventHandler()
   160  	}
   161  
   162  	for {
   163  		select {
   164  		case <-ctx.Done():
   165  			return
   166  
   167  		case <-t.C:
   168  			// Prioritize exiting if context is canceled
   169  			if ctx.Err() != nil {
   170  				return
   171  			}
   172  
   173  			// Try to re-establish the watcher if we previously got a watch error
   174  			if attemptPeriodicRewatch {
   175  				_ = watcher.Remove(path)
   176  				if err := watcher.Add(path); err != nil {
   177  					errorHandler(err)
   178  				} else {
   179  					attemptPeriodicRewatch = false
   180  				}
   181  			}
   182  
   183  			// Handle
   184  			eventHandler()
   185  
   186  		case e := <-eventsCh:
   187  			// Prioritize exiting if context is canceled
   188  			if ctx.Err() != nil {
   189  				return
   190  			}
   191  
   192  			// Try to re-establish the watcher for events which dropped the existing watch
   193  			if e.Name == path && (e.Has(fsnotify.Remove) || e.Has(fsnotify.Rename)) {
   194  				_ = watcher.Remove(path)
   195  				if err := watcher.Add(path); err != nil {
   196  					errorHandler(err)
   197  					attemptPeriodicRewatch = true
   198  				}
   199  			}
   200  
   201  			// Handle
   202  			eventHandler()
   203  
   204  		case err := <-errorCh:
   205  			// Prioritize exiting if context is canceled
   206  			if ctx.Err() != nil {
   207  				return
   208  			}
   209  
   210  			// If the error occurs in response to calling watcher.Add, re-adding here could hot-loop.
   211  			// The periodic poll will attempt to re-establish the watch.
   212  			errorHandler(err)
   213  			attemptPeriodicRewatch = true
   214  		}
   215  	}
   216  }
   217  

View as plain text