1 package watcherx
2
3 import (
4 "context"
5 "io/ioutil"
6 "os"
7 "path/filepath"
8 "time"
9
10 "github.com/fsnotify/fsnotify"
11 "github.com/pkg/errors"
12 )
13
14 func WatchDirectory(ctx context.Context, dir string, c EventChannel) (Watcher, error) {
15 w, err := fsnotify.NewWatcher()
16 if err != nil {
17 return nil, errors.WithStack(err)
18 }
19 var subDirs []string
20 if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
21 if err != nil {
22 return err
23 }
24 if info.IsDir() {
25 subDirs = append(subDirs, path)
26 }
27 return nil
28 }); err != nil {
29 return nil, errors.WithStack(err)
30 }
31 for _, d := range append(subDirs, dir) {
32 if err := w.Add(d); err != nil {
33 return nil, errors.WithStack(err)
34 }
35 }
36
37 d := newDispatcher()
38 go streamDirectoryEvents(ctx, w, c, d.trigger, d.done, dir)
39 return d, nil
40 }
41
42 func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel) {
43 if e.Op&fsnotify.Remove != 0 {
44
45
46
47
48
49
50 select {
51 case <-time.After(time.Millisecond):
52 c <- &RemoveEvent{
53 source: source(e.Name),
54 }
55 return
56 case secondE := <-w.Events:
57 if (secondE.Name != "" && secondE.Name != e.Name) || secondE.Op&fsnotify.Remove == 0 {
58
59
60 c <- &RemoveEvent{
61 source: source(e.Name),
62 }
63 handleEvent(secondE, w, c)
64 }
65 }
66 } else if e.Op&(fsnotify.Write|fsnotify.Create) != 0 {
67 if stats, err := os.Stat(e.Name); err != nil {
68 c <- &ErrorEvent{
69 error: errors.WithStack(err),
70 source: source(e.Name),
71 }
72 return
73 } else if stats.IsDir() {
74 if err := w.Add(e.Name); err != nil {
75 c <- &ErrorEvent{
76 error: errors.WithStack(err),
77 source: source(e.Name),
78 }
79 }
80 return
81 }
82 data, err := ioutil.ReadFile(e.Name)
83 if err != nil {
84 c <- &ErrorEvent{
85 error: err,
86 source: source(e.Name),
87 }
88 } else {
89 c <- &ChangeEvent{
90 data: data,
91 source: source(e.Name),
92 }
93 }
94 }
95 }
96
97 func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, sendNowDone chan<- int, dir string) {
98 for {
99 select {
100 case <-ctx.Done():
101 _ = w.Close()
102 return
103 case e := <-w.Events:
104 handleEvent(e, w, c)
105 case <-sendNow:
106 var eventsSent int
107
108 if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
109 if err != nil {
110 return err
111 }
112 if !info.IsDir() {
113 data, err := ioutil.ReadFile(path)
114 if err != nil {
115 c <- &ErrorEvent{
116 error: err,
117 source: source(path),
118 }
119 } else {
120 c <- &ChangeEvent{
121 data: data,
122 source: source(path),
123 }
124 }
125 eventsSent++
126 }
127 return nil
128 }); err != nil {
129 c <- &ErrorEvent{
130 error: err,
131 source: source(dir),
132 }
133 eventsSent++
134 }
135
136 sendNowDone <- eventsSent
137 }
138 }
139 }
140
View as plain text