...
1
16
17 package filesystem
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "github.com/fsnotify/fsnotify"
25 )
26
27
28 type FSWatcher interface {
29
30
31 Init(FSEventHandler, FSErrorHandler) error
32
33
34
35 Run()
36
37
38 AddWatch(path string) error
39 }
40
41
42 type FSEventHandler func(event fsnotify.Event)
43
44
45 type FSErrorHandler func(err error)
46
47 type fsnotifyWatcher struct {
48 watcher *fsnotify.Watcher
49 eventHandler FSEventHandler
50 errorHandler FSErrorHandler
51 }
52
53 var _ FSWatcher = &fsnotifyWatcher{}
54
55
56
57 func NewFsnotifyWatcher() FSWatcher {
58 return &fsnotifyWatcher{}
59 }
60
61 func (w *fsnotifyWatcher) AddWatch(path string) error {
62 return w.watcher.Add(path)
63 }
64
65 func (w *fsnotifyWatcher) Init(eventHandler FSEventHandler, errorHandler FSErrorHandler) error {
66 var err error
67 w.watcher, err = fsnotify.NewWatcher()
68 if err != nil {
69 return err
70 }
71
72 w.eventHandler = eventHandler
73 w.errorHandler = errorHandler
74 return nil
75 }
76
77 func (w *fsnotifyWatcher) Run() {
78 go func() {
79 defer w.watcher.Close()
80 for {
81 select {
82 case event := <-w.watcher.Events:
83 if w.eventHandler != nil {
84 w.eventHandler(event)
85 }
86 case err := <-w.watcher.Errors:
87 if w.errorHandler != nil {
88 w.errorHandler(err)
89 }
90 }
91 }
92 }()
93 }
94
95 type watchAddRemover interface {
96 Add(path string) error
97 Remove(path string) error
98 }
99 type noopWatcher struct{}
100
101 func (noopWatcher) Add(path string) error { return nil }
102 func (noopWatcher) Remove(path string) error { return nil }
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 func WatchUntil(ctx context.Context, pollInterval time.Duration, path string, eventHandler func(), errorHandler func(err error)) {
118 if pollInterval <= 0 {
119 panic(fmt.Errorf("pollInterval must be > 0"))
120 }
121 if eventHandler == nil {
122 panic(fmt.Errorf("eventHandler must be non-nil"))
123 }
124 if errorHandler == nil {
125 errorHandler = func(err error) {}
126 }
127
128
129 var (
130 eventsCh chan fsnotify.Event
131 errorCh chan error
132 watcher watchAddRemover
133 )
134 if w, err := fsnotify.NewWatcher(); err != nil {
135 errorHandler(fmt.Errorf("error creating file watcher, falling back to poll at interval %s: %w", pollInterval, err))
136 watcher = noopWatcher{}
137 } else {
138 watcher = w
139 eventsCh = w.Events
140 errorCh = w.Errors
141 defer func() {
142 _ = w.Close()
143 }()
144 }
145
146
147 t := time.NewTicker(pollInterval)
148 defer t.Stop()
149
150 attemptPeriodicRewatch := false
151
152
153 if err := watcher.Add(path); err != nil {
154 errorHandler(err)
155 attemptPeriodicRewatch = true
156 } else {
157
158
159 eventHandler()
160 }
161
162 for {
163 select {
164 case <-ctx.Done():
165 return
166
167 case <-t.C:
168
169 if ctx.Err() != nil {
170 return
171 }
172
173
174 if attemptPeriodicRewatch {
175 _ = watcher.Remove(path)
176 if err := watcher.Add(path); err != nil {
177 errorHandler(err)
178 } else {
179 attemptPeriodicRewatch = false
180 }
181 }
182
183
184 eventHandler()
185
186 case e := <-eventsCh:
187
188 if ctx.Err() != nil {
189 return
190 }
191
192
193 if e.Name == path && (e.Has(fsnotify.Remove) || e.Has(fsnotify.Rename)) {
194 _ = watcher.Remove(path)
195 if err := watcher.Add(path); err != nil {
196 errorHandler(err)
197 attemptPeriodicRewatch = true
198 }
199 }
200
201
202 eventHandler()
203
204 case err := <-errorCh:
205
206 if ctx.Err() != nil {
207 return
208 }
209
210
211
212 errorHandler(err)
213 attemptPeriodicRewatch = true
214 }
215 }
216 }
217
View as plain text