...

Source file src/github.com/launchdarkly/go-server-sdk/v6/ldfilewatch/watched_file_data_source.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/ldfilewatch

     1  package ldfilewatch
     2  
     3  import (
     4  	"fmt"
     5  	"path"
     6  	"path/filepath"
     7  	"time"
     8  
     9  	"github.com/fsnotify/fsnotify"
    10  
    11  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    12  )
    13  
    14  const retryDuration = time.Second
    15  
    16  type fileWatcher struct {
    17  	watcher  *fsnotify.Watcher
    18  	loggers  ldlog.Loggers
    19  	reload   func()
    20  	paths    []string
    21  	absPaths map[string]bool
    22  }
    23  
    24  // WatchFiles sets up a mechanism for the file data source to reload its source files whenever one of them has
    25  // been modified. Use it as follows:
    26  //
    27  //	config := Config{
    28  //	    DataSource: ldfiledata.DataSource().
    29  //	        FilePaths(filePaths).
    30  //	        Reloader(ldfilewatch.WatchFiles),
    31  //	}
    32  func WatchFiles(paths []string, loggers ldlog.Loggers, reload func(), closeCh <-chan struct{}) error {
    33  	watcher, err := fsnotify.NewWatcher()
    34  	if err != nil { // COVERAGE: can't simulate this condition in unit tests
    35  		return fmt.Errorf("unable to create file watcher: %s", err)
    36  	}
    37  	fw := &fileWatcher{
    38  		watcher:  watcher,
    39  		loggers:  loggers,
    40  		reload:   reload,
    41  		paths:    paths,
    42  		absPaths: make(map[string]bool),
    43  	}
    44  	go fw.run(closeCh)
    45  	return nil
    46  }
    47  
    48  func (fw *fileWatcher) run(closeCh <-chan struct{}) {
    49  	retryCh := make(chan struct{}, 1)
    50  	scheduleRetry := func() {
    51  		time.AfterFunc(retryDuration, func() {
    52  			select {
    53  			case retryCh <- struct{}{}: // don't need multiple retries so no need to block
    54  			default: // COVERAGE: can't simulate this condition in unit tests
    55  			}
    56  		})
    57  	}
    58  	for {
    59  		if err := fw.setupWatches(); err != nil {
    60  			fw.loggers.Error(err)
    61  			scheduleRetry()
    62  		}
    63  
    64  		// We do the reload here rather than after waitForEvents, even though that means there will be a
    65  		// redundant load when we first start up, because otherwise there's a potential race condition where
    66  		// file changes could happen before we had set up our file watcher.
    67  		fw.reload()
    68  
    69  		quit := fw.waitForEvents(closeCh, retryCh)
    70  		if quit {
    71  			return
    72  		}
    73  	}
    74  }
    75  
    76  func (fw *fileWatcher) setupWatches() error {
    77  	for _, p := range fw.paths {
    78  		absDirPath := path.Dir(p)
    79  		realDirPath, err := filepath.EvalSymlinks(absDirPath)
    80  		if err != nil {
    81  			return fmt.Errorf(`unable to evaluate symlinks for "%s": %s`, absDirPath, err)
    82  		}
    83  
    84  		realPath := path.Join(realDirPath, path.Base(p))
    85  		fw.absPaths[realPath] = true
    86  		if err = fw.watcher.Add(realPath); err != nil { // COVERAGE: can't simulate this condition in unit tests
    87  			return fmt.Errorf(`unable to watch path "%s": %s`, realPath, err)
    88  		}
    89  		if err = fw.watcher.Add(realDirPath); err != nil { // COVERAGE: can't simulate this in unit tests
    90  			return fmt.Errorf(`unable to watch path "%s": %s`, realDirPath, err)
    91  		}
    92  	}
    93  	return nil
    94  }
    95  
    96  func (fw *fileWatcher) waitForEvents(closeCh <-chan struct{}, retryCh <-chan struct{}) bool {
    97  	for {
    98  		select {
    99  		case <-closeCh:
   100  			err := fw.watcher.Close()
   101  			if err != nil { // COVERAGE: can't simulate this condition in unit tests
   102  				fw.loggers.Errorf("Error closing Watcher: %s", err)
   103  			}
   104  			return true
   105  		case event := <-fw.watcher.Events:
   106  			if !fw.absPaths[event.Name] { // COVERAGE: can't simulate this condition in unit tests
   107  				break
   108  			}
   109  			fw.consumeExtraEvents()
   110  			return false
   111  		case err := <-fw.watcher.Errors:
   112  			fw.loggers.Error(err) // COVERAGE: can't simulate this condition in unit tests
   113  		case <-retryCh:
   114  			consumeExtraRetries(retryCh)
   115  			return false
   116  		}
   117  	}
   118  }
   119  
   120  func (fw *fileWatcher) consumeExtraEvents() {
   121  	for {
   122  		select {
   123  		case <-fw.watcher.Events: // COVERAGE: can't simulate this condition in unit tests
   124  		default:
   125  			return
   126  		}
   127  	}
   128  }
   129  
   130  func consumeExtraRetries(retryCh <-chan struct{}) {
   131  	for {
   132  		select {
   133  		case <-retryCh: // COVERAGE: can't simulate this condition in unit tests
   134  		default:
   135  			return
   136  		}
   137  	}
   138  }
   139  

View as plain text