...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/entrypoint/fswatcher.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"io/ioutil"
     7  	"os"
     8  	"path"
     9  	"path/filepath"
    10  	"strings"
    11  	"sync"
    12  	"time"
    13  
    14  	"github.com/datawire/dlib/dlog"
    15  	"github.com/fsnotify/fsnotify"
    16  )
    17  
    18  // FSWatcher is a thing that can watch the filesystem for us, and
    19  // call handler functions when things change.
    20  //
    21  // The core of an FSWatcher is fsnotify/fsnotify, but we wrap some
    22  // state around it.
    23  //
    24  // First, fsnotify tries to mark the operation associated with a
    25  // change -- however, these are not always accurate, since the
    26  // filesystem tries to coalesce events that are close in time.
    27  // Therefore FSWatcher doesn't actually look at the operation:
    28  // everything is just "a change happened".
    29  //
    30  // This causes one interesting problem: given a touch of
    31  // temporal separation between Create and Write, we may decide
    32  // to trigger a reconfigure on the Create, before the data have
    33  // been written. To mitigate against that, we'll wait up to half
    34  // a second after an event to see if any other events will be
    35  // happening (with the idea that if you've come within half a
    36  // second of your cert expiring before renewing it, uh, yeah,
    37  // maybe you _will_ have some transient errors).
    38  //
    39  // Second, when we start watching a directory, we make sure that
    40  // "update" events get posted for every file in the directory.
    41  // These are marked as "bootstrap" events.
    42  //
    43  // Finally, rather than posting things to channels, we call a
    44  // handler function whenever anything interesting happens,
    45  // where "interesting" is one of the events above, or an error.
    46  type FSWatcher struct {
    47  	FSW *fsnotify.Watcher
    48  
    49  	mutex       sync.Mutex
    50  	handlers    map[string]FSWEventHandler
    51  	handleError FSWErrorHandler
    52  	cTimer      *time.Timer
    53  	marker      chan time.Time
    54  	outstanding map[string]bool
    55  }
    56  
    57  // FSWEventHandler is a handler function for an interesting
    58  // event.
    59  type FSWEventHandler func(ctx context.Context, event FSWEvent)
    60  
    61  // FSWErrorHandler is a handler function for an error.
    62  type FSWErrorHandler func(ctx context.Context, err error)
    63  
    64  // FSWOp specifies the operation for an event.
    65  type FSWOp string
    66  
    67  const (
    68  	// FSWUpdate is an update operation
    69  	FSWUpdate FSWOp = "update"
    70  
    71  	// FSWDelete is a delete operation
    72  	FSWDelete FSWOp = "delete"
    73  )
    74  
    75  // FSWEvent represents a single interesting event.
    76  type FSWEvent struct {
    77  	// Path is the fully-qualified path of the file that changed.
    78  	Path string
    79  	// Op is the operation for this event.
    80  	Op FSWOp
    81  	// Bootstrap is true IFF this is a synthesized event noting
    82  	// that a file existed at the moment we started watching a
    83  	// directory.
    84  	Bootstrap bool
    85  	// Time is when this event happened
    86  	Time time.Time
    87  }
    88  
    89  // String returns a string representation of an FSEvent.
    90  func (event FSWEvent) String() string {
    91  	bstr := ""
    92  	if event.Bootstrap {
    93  		bstr = "B|"
    94  	}
    95  
    96  	return fmt.Sprintf("%s%s %s", bstr, event.Op, event.Path)
    97  }
    98  
    99  // NewFSWatcher instantiates an FSWatcher. At instantiation time,
   100  // no directories are being watched.
   101  func NewFSWatcher(ctx context.Context) (*FSWatcher, error) {
   102  	watcher, err := fsnotify.NewWatcher()
   103  
   104  	if err != nil {
   105  		dlog.Errorf(ctx, "FSW: could not initialize FSWatcher: %v", err)
   106  		return nil, err
   107  	}
   108  
   109  	dlog.Debugf(ctx, "FSW: initialized FSWatcher!")
   110  
   111  	fsw := &FSWatcher{
   112  		FSW:         watcher,
   113  		handlers:    make(map[string]FSWEventHandler),
   114  		outstanding: make(map[string]bool),
   115  		marker:      make(chan time.Time),
   116  	}
   117  
   118  	// Start with the default error handler...
   119  	fsw.handleError = fsw.defaultErrorHandler
   120  
   121  	return fsw, nil
   122  }
   123  
   124  // SetErrorHandler sets the function that will be used to respond to errors.
   125  func (fsw *FSWatcher) SetErrorHandler(handler FSWErrorHandler) {
   126  	fsw.handleError = handler
   127  }
   128  
   129  // WatchDir starts watching a directory, using a specific handler function.
   130  // You'll need to separately call WatchDir for subdirectories if you want
   131  // recursive watches.
   132  func (fsw *FSWatcher) WatchDir(ctx context.Context, dir string, handler FSWEventHandler) error {
   133  	fsw.mutex.Lock()
   134  	defer fsw.mutex.Unlock()
   135  
   136  	dlog.Infof(ctx, "FSW: watching %s", dir)
   137  
   138  	if err := fsw.FSW.Add(dir); err != nil {
   139  		return err
   140  	}
   141  	fsw.handlers[dir] = handler
   142  
   143  	fileinfos, err := ioutil.ReadDir(dir)
   144  
   145  	if err != nil {
   146  		return err
   147  	}
   148  
   149  	for _, info := range fileinfos {
   150  		fswevent := FSWEvent{
   151  			Path:      path.Join(dir, info.Name()),
   152  			Op:        FSWUpdate,
   153  			Bootstrap: true,
   154  			Time:      info.ModTime(),
   155  		}
   156  
   157  		dlog.Debugf(ctx, "FSWatcher: synthesizing %s", fswevent)
   158  
   159  		handler(ctx, fswevent)
   160  	}
   161  	return nil
   162  }
   163  
   164  // The default error handler just logs the error.
   165  func (fsw *FSWatcher) defaultErrorHandler(ctx context.Context, err error) {
   166  	dlog.Errorf(ctx, "FSW: FSWatcher error: %s", err)
   167  }
   168  
   169  // Watch for events, and handle them.
   170  func (fsw *FSWatcher) Run(ctx context.Context) {
   171  	for {
   172  		select {
   173  		case event := <-fsw.FSW.Events:
   174  			fsw.mutex.Lock()
   175  
   176  			dlog.Debugf(ctx, "FSW: raw event %s", event)
   177  
   178  			// Note that this path is outstanding.
   179  			fsw.outstanding[event.Name] = true
   180  
   181  			// Coalesce events for up to half a second.
   182  			if fsw.cTimer != nil {
   183  				dlog.Debugf(ctx, "FSW: stopping cTimer")
   184  
   185  				if !fsw.cTimer.Stop() {
   186  					<-fsw.cTimer.C
   187  				}
   188  			}
   189  
   190  			dlog.Debugf(ctx, "FSW: starting cTimer")
   191  			fsw.cTimer = time.AfterFunc(500*time.Millisecond, func() {
   192  				fsw.marker <- time.Now()
   193  			})
   194  
   195  			dlog.Debugf(ctx, "FSW: unlocking")
   196  			fsw.mutex.Unlock()
   197  
   198  		case <-fsw.marker:
   199  			fsw.mutex.Lock()
   200  			dlog.Debugf(ctx, "FSW: MARKER LOCK")
   201  			fsw.cTimer = nil
   202  
   203  			keys := make([]string, 0, len(fsw.outstanding))
   204  			for key := range fsw.outstanding {
   205  				keys = append(keys, key)
   206  			}
   207  
   208  			fsw.outstanding = make(map[string]bool)
   209  
   210  			fsw.mutex.Unlock()
   211  
   212  			dlog.Debugf(ctx, "FSW: updates! %s", strings.Join(keys, ", "))
   213  
   214  			for _, evtPath := range keys {
   215  				dirname := filepath.Dir(evtPath)
   216  				handler, handlerExists := fsw.handlers[dirname]
   217  
   218  				if handlerExists {
   219  					op := FSWUpdate
   220  
   221  					info, err := os.Stat(evtPath)
   222  
   223  					eventTime := time.Now()
   224  
   225  					if err != nil {
   226  						op = FSWDelete
   227  					} else {
   228  						eventTime = info.ModTime()
   229  					}
   230  
   231  					fswevent := FSWEvent{
   232  						Path:      evtPath,
   233  						Op:        op,
   234  						Bootstrap: false,
   235  						Time:      eventTime,
   236  					}
   237  
   238  					dlog.Debugf(ctx, "FSW: handling %s", fswevent)
   239  					handler(ctx, fswevent)
   240  				} else {
   241  					dlog.Debugf(ctx, "FSW: drop, no handler for dir %s", dirname)
   242  				}
   243  			}
   244  
   245  		case err := <-fsw.FSW.Errors:
   246  			dlog.Errorf(ctx, "FSW: filesystem watch error %s", err)
   247  
   248  			fsw.handleError(ctx, err)
   249  
   250  		case <-ctx.Done():
   251  			dlog.Infof(ctx, "FSW: ctx shutdown, exiting")
   252  			return
   253  		}
   254  	}
   255  }
   256  

View as plain text