1
16
17 package pluginwatcher
18
19 import (
20 "fmt"
21 "os"
22 "strings"
23
24 "github.com/fsnotify/fsnotify"
25 "k8s.io/klog/v2"
26
27 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
28 "k8s.io/kubernetes/pkg/kubelet/util"
29 utilfs "k8s.io/kubernetes/pkg/util/filesystem"
30 )
31
32
33 type Watcher struct {
34 path string
35 fs utilfs.Filesystem
36 fsWatcher *fsnotify.Watcher
37 desiredStateOfWorld cache.DesiredStateOfWorld
38 }
39
40
41 func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
42 return &Watcher{
43 path: sockDir,
44 fs: &utilfs.DefaultFs{},
45 desiredStateOfWorld: desiredStateOfWorld,
46 }
47 }
48
49
50 func (w *Watcher) Start(stopCh <-chan struct{}) error {
51 klog.V(2).InfoS("Plugin Watcher Start", "path", w.path)
52
53
54
55 if err := w.init(); err != nil {
56 return err
57 }
58
59 fsWatcher, err := fsnotify.NewWatcher()
60 if err != nil {
61 return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
62 }
63 w.fsWatcher = fsWatcher
64
65
66 if err := w.traversePluginDir(w.path); err != nil {
67 klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)
68 }
69
70 go func(fsWatcher *fsnotify.Watcher) {
71 for {
72 select {
73 case event := <-fsWatcher.Events:
74
75 if event.Has(fsnotify.Create) {
76 err := w.handleCreateEvent(event)
77 if err != nil {
78 klog.ErrorS(err, "Error when handling create event", "event", event)
79 }
80 } else if event.Has(fsnotify.Remove) {
81 w.handleDeleteEvent(event)
82 }
83 continue
84 case err := <-fsWatcher.Errors:
85 if err != nil {
86 klog.ErrorS(err, "FsWatcher received error")
87 }
88 continue
89 case <-stopCh:
90 w.fsWatcher.Close()
91 return
92 }
93 }
94 }(fsWatcher)
95
96 return nil
97 }
98
99 func (w *Watcher) init() error {
100 klog.V(4).InfoS("Ensuring Plugin directory", "path", w.path)
101
102 if err := w.fs.MkdirAll(w.path, 0755); err != nil {
103 return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
104 }
105
106 return nil
107 }
108
109
110
111 func (w *Watcher) traversePluginDir(dir string) error {
112
113 err := w.fsWatcher.Add(dir)
114 if err != nil {
115 return fmt.Errorf("failed to watch %s, err: %v", w.path, err)
116 }
117
118 return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
119 if err != nil {
120 if path == dir {
121 return fmt.Errorf("error accessing path: %s error: %v", path, err)
122 }
123
124 klog.ErrorS(err, "Error accessing path", "path", path)
125 return nil
126 }
127
128
129 if path == dir {
130 return nil
131 }
132
133 mode := info.Mode()
134 if mode.IsDir() {
135 if err := w.fsWatcher.Add(path); err != nil {
136 return fmt.Errorf("failed to watch %s, err: %v", path, err)
137 }
138 } else if isSocket, _ := util.IsUnixDomainSocket(path); isSocket {
139 event := fsnotify.Event{
140 Name: path,
141 Op: fsnotify.Create,
142 }
143
144 if err := w.handleCreateEvent(event); err != nil {
145 klog.ErrorS(err, "Error when handling create", "event", event)
146 }
147 } else {
148 klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode)
149 }
150
151 return nil
152 })
153 }
154
155
156
157
158 func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
159 klog.V(6).InfoS("Handling create event", "event", event)
160
161 fi, err := getStat(event)
162 if err != nil {
163 return fmt.Errorf("stat file %s failed: %v", event.Name, err)
164 }
165
166 if strings.HasPrefix(fi.Name(), ".") {
167 klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name())
168 return nil
169 }
170
171 if !fi.IsDir() {
172 isSocket, err := util.IsUnixDomainSocket(util.NormalizePath(event.Name))
173 if err != nil {
174 return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err)
175 }
176 if !isSocket {
177 klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name())
178 return nil
179 }
180
181 return w.handlePluginRegistration(event.Name)
182 }
183
184 return w.traversePluginDir(event.Name)
185 }
186
187 func (w *Watcher) handlePluginRegistration(socketPath string) error {
188 socketPath = getSocketPath(socketPath)
189
190
191
192
193
194 klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath)
195 err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
196 if err != nil {
197 return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
198 }
199 return nil
200 }
201
202 func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
203 klog.V(6).InfoS("Handling delete event", "event", event)
204
205 socketPath := event.Name
206 klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath)
207 w.desiredStateOfWorld.RemovePlugin(socketPath)
208 }
209
View as plain text