1
16
17 package flexvolume
18
19 import (
20 "fmt"
21 "os"
22 "path/filepath"
23 "strings"
24 "sync"
25
26 "github.com/fsnotify/fsnotify"
27 "k8s.io/apimachinery/pkg/util/errors"
28 "k8s.io/klog/v2"
29 utilfs "k8s.io/kubernetes/pkg/util/filesystem"
30 "k8s.io/kubernetes/pkg/volume"
31 "k8s.io/utils/exec"
32 utilstrings "k8s.io/utils/strings"
33 )
34
35 type flexVolumeProber struct {
36 mutex sync.Mutex
37 pluginDir string
38 runner exec.Interface
39 watcher utilfs.FSWatcher
40 factory PluginFactory
41 fs utilfs.Filesystem
42 probeAllNeeded bool
43 eventsMap map[string]volume.ProbeOperation
44 }
45
46
47 func GetDynamicPluginProber(pluginDir string, runner exec.Interface) volume.DynamicPluginProber {
48 return &flexVolumeProber{
49 pluginDir: pluginDir,
50 watcher: utilfs.NewFsnotifyWatcher(),
51 factory: pluginFactory{},
52 runner: runner,
53 fs: &utilfs.DefaultFs{},
54 }
55 }
56
57 func (prober *flexVolumeProber) Init() error {
58 prober.testAndSetProbeAllNeeded(true)
59 prober.eventsMap = map[string]volume.ProbeOperation{}
60
61 if err := prober.createPluginDir(); err != nil {
62 return err
63 }
64 if err := prober.initWatcher(); err != nil {
65 return err
66 }
67
68 return nil
69 }
70
71
72
73 func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) {
74 if prober.probeAllNeeded {
75 prober.testAndSetProbeAllNeeded(false)
76 return prober.probeAll()
77 }
78
79 return prober.probeMap()
80 }
81
82 func (prober *flexVolumeProber) probeMap() (events []volume.ProbeEvent, err error) {
83
84 prober.mutex.Lock()
85 defer prober.mutex.Unlock()
86 probeEvents := []volume.ProbeEvent{}
87 allErrs := []error{}
88 for driverDirPathAbs, op := range prober.eventsMap {
89 driverDirName := filepath.Base(driverDirPathAbs)
90 probeEvent, pluginErr := prober.newProbeEvent(driverDirName, op)
91 if pluginErr != nil {
92 allErrs = append(allErrs, pluginErr)
93 continue
94 }
95 probeEvents = append(probeEvents, probeEvent)
96
97 delete(prober.eventsMap, driverDirPathAbs)
98 }
99 return probeEvents, errors.NewAggregate(allErrs)
100 }
101
102 func (prober *flexVolumeProber) probeAll() (events []volume.ProbeEvent, err error) {
103 probeEvents := []volume.ProbeEvent{}
104 allErrs := []error{}
105 files, err := prober.fs.ReadDir(prober.pluginDir)
106 if err != nil {
107 return nil, fmt.Errorf("error reading the Flexvolume directory: %s", err)
108 }
109 for _, f := range files {
110
111
112
113
114
115 if f.IsDir() && filepath.Base(f.Name())[0] != '.' {
116 probeEvent, pluginErr := prober.newProbeEvent(f.Name(), volume.ProbeAddOrUpdate)
117 if pluginErr != nil {
118 allErrs = append(allErrs, pluginErr)
119 continue
120 }
121 probeEvents = append(probeEvents, probeEvent)
122 }
123 }
124 return probeEvents, errors.NewAggregate(allErrs)
125 }
126
127 func (prober *flexVolumeProber) newProbeEvent(driverDirName string, op volume.ProbeOperation) (volume.ProbeEvent, error) {
128 probeEvent := volume.ProbeEvent{
129 Op: op,
130 }
131 if op == volume.ProbeAddOrUpdate {
132 plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, driverDirName, prober.runner)
133 if pluginErr != nil {
134 pluginErr = fmt.Errorf(
135 "error creating Flexvolume plugin from directory %s, skipping. Error: %s",
136 driverDirName, pluginErr)
137 return probeEvent, pluginErr
138 }
139 probeEvent.Plugin = plugin
140 probeEvent.PluginName = plugin.GetPluginName()
141 } else if op == volume.ProbeRemove {
142 driverName := utilstrings.UnescapeQualifiedName(driverDirName)
143 probeEvent.PluginName = driverName
144
145 } else {
146 return probeEvent, fmt.Errorf("Unknown Operation on directory: %s. ", driverDirName)
147 }
148 return probeEvent, nil
149 }
150
151 func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
152
153 if filepath.Base(event.Name)[0] == '.' {
154
155 return nil
156 }
157
158 eventPathAbs, err := filepath.Abs(event.Name)
159 if err != nil {
160 return err
161 }
162 parentPathAbs := filepath.Dir(eventPathAbs)
163 pluginDirAbs, err := filepath.Abs(prober.pluginDir)
164 if err != nil {
165 return err
166 }
167
168
169 if eventPathAbs == pluginDirAbs {
170
171
172 if event.Has(fsnotify.Remove) {
173 if err := prober.createPluginDir(); err != nil {
174 return err
175 }
176 if err := prober.addWatchRecursive(pluginDirAbs); err != nil {
177 return err
178 }
179 }
180 return nil
181 }
182
183
184 if event.Has(fsnotify.Create) {
185 if err := prober.addWatchRecursive(eventPathAbs); err != nil {
186 return err
187 }
188 }
189
190 eventRelPathToPluginDir, err := filepath.Rel(pluginDirAbs, eventPathAbs)
191 if err != nil {
192 return err
193 }
194
195
196 if len(eventRelPathToPluginDir) > 0 {
197 driverDirName := strings.Split(eventRelPathToPluginDir, string(os.PathSeparator))[0]
198 driverDirAbs := filepath.Join(pluginDirAbs, driverDirName)
199
200 if event.Has(fsnotify.Remove) && (eventRelPathToPluginDir == getExecutablePathRel(driverDirName) || parentPathAbs == pluginDirAbs) {
201 prober.updateEventsMap(driverDirAbs, volume.ProbeRemove)
202 } else {
203 prober.updateEventsMap(driverDirAbs, volume.ProbeAddOrUpdate)
204 }
205 }
206
207 return nil
208 }
209
210
211 func getExecutablePathRel(driverDirName string) string {
212 parts := strings.Split(driverDirName, "~")
213 return filepath.Join(driverDirName, parts[len(parts)-1])
214 }
215
216 func (prober *flexVolumeProber) updateEventsMap(eventDirAbs string, op volume.ProbeOperation) {
217 prober.mutex.Lock()
218 defer prober.mutex.Unlock()
219 if prober.probeAllNeeded {
220 return
221 }
222 prober.eventsMap[eventDirAbs] = op
223 }
224
225
226
227
228
229
230 func (prober *flexVolumeProber) addWatchRecursive(filename string) error {
231
232
233
234 filename = strings.TrimPrefix(filename, `C:\`)
235 addWatch := func(path string, info os.FileInfo, err error) error {
236 if err == nil && info.IsDir() {
237 if err := prober.watcher.AddWatch(path); err != nil {
238 klog.Errorf("Error recursively adding watch: %v", err)
239 }
240 }
241 return nil
242 }
243 return prober.fs.Walk(filename, addWatch)
244 }
245
246
247
248 func (prober *flexVolumeProber) initWatcher() error {
249 err := prober.watcher.Init(func(event fsnotify.Event) {
250 if err := prober.handleWatchEvent(event); err != nil {
251 klog.Errorf("Flexvolume prober watch: %s", err)
252 }
253 }, func(err error) {
254 klog.Errorf("Received an error from watcher: %s", err)
255 })
256 if err != nil {
257 return fmt.Errorf("error initializing watcher: %s", err)
258 }
259
260 if err := prober.addWatchRecursive(prober.pluginDir); err != nil {
261 return fmt.Errorf("error adding watch on Flexvolume directory: %s", err)
262 }
263
264 prober.watcher.Run()
265
266 return nil
267 }
268
269
270 func (prober *flexVolumeProber) createPluginDir() error {
271 if _, err := prober.fs.Stat(prober.pluginDir); os.IsNotExist(err) {
272 klog.Warningf("Flexvolume plugin directory at %s does not exist. Recreating.", prober.pluginDir)
273 err := prober.fs.MkdirAll(prober.pluginDir, 0755)
274 if err != nil {
275 return fmt.Errorf("error (re-)creating driver directory: %s", err)
276 }
277 }
278
279 return nil
280 }
281
282 func (prober *flexVolumeProber) testAndSetProbeAllNeeded(newval bool) (oldval bool) {
283 prober.mutex.Lock()
284 defer prober.mutex.Unlock()
285 oldval, prober.probeAllNeeded = prober.probeAllNeeded, newval
286 return
287 }
288
View as plain text