1 package entrypoint
2
3 import (
4 "context"
5 "fmt"
6 "io/ioutil"
7 "os"
8 "path"
9 "path/filepath"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/datawire/dlib/dlog"
15 "github.com/fsnotify/fsnotify"
16 )
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 type FSWatcher struct {
47 FSW *fsnotify.Watcher
48
49 mutex sync.Mutex
50 handlers map[string]FSWEventHandler
51 handleError FSWErrorHandler
52 cTimer *time.Timer
53 marker chan time.Time
54 outstanding map[string]bool
55 }
56
57
58
59 type FSWEventHandler func(ctx context.Context, event FSWEvent)
60
61
62 type FSWErrorHandler func(ctx context.Context, err error)
63
64
65 type FSWOp string
66
67 const (
68
69 FSWUpdate FSWOp = "update"
70
71
72 FSWDelete FSWOp = "delete"
73 )
74
75
76 type FSWEvent struct {
77
78 Path string
79
80 Op FSWOp
81
82
83
84 Bootstrap bool
85
86 Time time.Time
87 }
88
89
90 func (event FSWEvent) String() string {
91 bstr := ""
92 if event.Bootstrap {
93 bstr = "B|"
94 }
95
96 return fmt.Sprintf("%s%s %s", bstr, event.Op, event.Path)
97 }
98
99
100
101 func NewFSWatcher(ctx context.Context) (*FSWatcher, error) {
102 watcher, err := fsnotify.NewWatcher()
103
104 if err != nil {
105 dlog.Errorf(ctx, "FSW: could not initialize FSWatcher: %v", err)
106 return nil, err
107 }
108
109 dlog.Debugf(ctx, "FSW: initialized FSWatcher!")
110
111 fsw := &FSWatcher{
112 FSW: watcher,
113 handlers: make(map[string]FSWEventHandler),
114 outstanding: make(map[string]bool),
115 marker: make(chan time.Time),
116 }
117
118
119 fsw.handleError = fsw.defaultErrorHandler
120
121 return fsw, nil
122 }
123
124
125 func (fsw *FSWatcher) SetErrorHandler(handler FSWErrorHandler) {
126 fsw.handleError = handler
127 }
128
129
130
131
132 func (fsw *FSWatcher) WatchDir(ctx context.Context, dir string, handler FSWEventHandler) error {
133 fsw.mutex.Lock()
134 defer fsw.mutex.Unlock()
135
136 dlog.Infof(ctx, "FSW: watching %s", dir)
137
138 if err := fsw.FSW.Add(dir); err != nil {
139 return err
140 }
141 fsw.handlers[dir] = handler
142
143 fileinfos, err := ioutil.ReadDir(dir)
144
145 if err != nil {
146 return err
147 }
148
149 for _, info := range fileinfos {
150 fswevent := FSWEvent{
151 Path: path.Join(dir, info.Name()),
152 Op: FSWUpdate,
153 Bootstrap: true,
154 Time: info.ModTime(),
155 }
156
157 dlog.Debugf(ctx, "FSWatcher: synthesizing %s", fswevent)
158
159 handler(ctx, fswevent)
160 }
161 return nil
162 }
163
164
165 func (fsw *FSWatcher) defaultErrorHandler(ctx context.Context, err error) {
166 dlog.Errorf(ctx, "FSW: FSWatcher error: %s", err)
167 }
168
169
170 func (fsw *FSWatcher) Run(ctx context.Context) {
171 for {
172 select {
173 case event := <-fsw.FSW.Events:
174 fsw.mutex.Lock()
175
176 dlog.Debugf(ctx, "FSW: raw event %s", event)
177
178
179 fsw.outstanding[event.Name] = true
180
181
182 if fsw.cTimer != nil {
183 dlog.Debugf(ctx, "FSW: stopping cTimer")
184
185 if !fsw.cTimer.Stop() {
186 <-fsw.cTimer.C
187 }
188 }
189
190 dlog.Debugf(ctx, "FSW: starting cTimer")
191 fsw.cTimer = time.AfterFunc(500*time.Millisecond, func() {
192 fsw.marker <- time.Now()
193 })
194
195 dlog.Debugf(ctx, "FSW: unlocking")
196 fsw.mutex.Unlock()
197
198 case <-fsw.marker:
199 fsw.mutex.Lock()
200 dlog.Debugf(ctx, "FSW: MARKER LOCK")
201 fsw.cTimer = nil
202
203 keys := make([]string, 0, len(fsw.outstanding))
204 for key := range fsw.outstanding {
205 keys = append(keys, key)
206 }
207
208 fsw.outstanding = make(map[string]bool)
209
210 fsw.mutex.Unlock()
211
212 dlog.Debugf(ctx, "FSW: updates! %s", strings.Join(keys, ", "))
213
214 for _, evtPath := range keys {
215 dirname := filepath.Dir(evtPath)
216 handler, handlerExists := fsw.handlers[dirname]
217
218 if handlerExists {
219 op := FSWUpdate
220
221 info, err := os.Stat(evtPath)
222
223 eventTime := time.Now()
224
225 if err != nil {
226 op = FSWDelete
227 } else {
228 eventTime = info.ModTime()
229 }
230
231 fswevent := FSWEvent{
232 Path: evtPath,
233 Op: op,
234 Bootstrap: false,
235 Time: eventTime,
236 }
237
238 dlog.Debugf(ctx, "FSW: handling %s", fswevent)
239 handler(ctx, fswevent)
240 } else {
241 dlog.Debugf(ctx, "FSW: drop, no handler for dir %s", dirname)
242 }
243 }
244
245 case err := <-fsw.FSW.Errors:
246 dlog.Errorf(ctx, "FSW: filesystem watch error %s", err)
247
248 fsw.handleError(ctx, err)
249
250 case <-ctx.Done():
251 dlog.Infof(ctx, "FSW: ctx shutdown, exiting")
252 return
253 }
254 }
255 }
256
View as plain text