...
1
2
3
4
19
20 package config
21
22 import (
23 "fmt"
24 "os"
25 "path/filepath"
26 "strings"
27 "time"
28
29 "github.com/fsnotify/fsnotify"
30 "k8s.io/klog/v2"
31
32 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/util/wait"
34 "k8s.io/client-go/util/flowcontrol"
35 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
36 )
37
38 const (
39 retryPeriod = 1 * time.Second
40 maxRetryPeriod = 20 * time.Second
41 )
42
43 type retryableError struct {
44 message string
45 }
46
47 func (e *retryableError) Error() string {
48 return e.message
49 }
50
51 func (s *sourceFile) startWatch() {
52 backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
53 backOffID := "watch"
54
55 go wait.Forever(func() {
56 if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
57 return
58 }
59
60 if err := s.doWatch(); err != nil {
61 klog.ErrorS(err, "Unable to read config path", "path", s.path)
62 if _, retryable := err.(*retryableError); !retryable {
63 backOff.Next(backOffID, time.Now())
64 }
65 }
66 }, retryPeriod)
67 }
68
69 func (s *sourceFile) doWatch() error {
70 _, err := os.Stat(s.path)
71 if err != nil {
72 if !os.IsNotExist(err) {
73 return err
74 }
75
76 s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
77 return &retryableError{"path does not exist, ignoring"}
78 }
79
80 w, err := fsnotify.NewWatcher()
81 if err != nil {
82 return fmt.Errorf("unable to create inotify: %v", err)
83 }
84 defer w.Close()
85
86 err = w.Add(s.path)
87 if err != nil {
88 return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err)
89 }
90
91 for {
92 select {
93 case event := <-w.Events:
94 if err = s.produceWatchEvent(&event); err != nil {
95 return fmt.Errorf("error while processing inotify event (%+v): %v", event, err)
96 }
97 case err = <-w.Errors:
98 return fmt.Errorf("error while watching %q: %v", s.path, err)
99 }
100 }
101 }
102
103 func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
104
105 if strings.HasPrefix(filepath.Base(e.Name), ".") {
106 klog.V(4).InfoS("Ignored pod manifest, because it starts with dots", "eventName", e.Name)
107 return nil
108 }
109 var eventType podEventType
110 switch {
111 case (e.Op & fsnotify.Create) > 0:
112 eventType = podAdd
113 case (e.Op & fsnotify.Write) > 0:
114 eventType = podModify
115 case (e.Op & fsnotify.Chmod) > 0:
116 eventType = podModify
117 case (e.Op & fsnotify.Remove) > 0:
118 eventType = podDelete
119 case (e.Op & fsnotify.Rename) > 0:
120 eventType = podDelete
121 default:
122
123 return nil
124 }
125
126 s.watchEvents <- &watchEvent{e.Name, eventType}
127 return nil
128 }
129
130 func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
131 switch e.eventType {
132 case podAdd, podModify:
133 pod, err := s.extractFromFile(e.fileName)
134 if err != nil {
135 return fmt.Errorf("can't process config file %q: %v", e.fileName, err)
136 }
137 return s.store.Add(pod)
138 case podDelete:
139 if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist {
140 pod, podExist, err := s.store.GetByKey(objKey)
141 if err != nil {
142 return err
143 }
144 if !podExist {
145 return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey)
146 }
147 if err = s.store.Delete(pod); err != nil {
148 return fmt.Errorf("failed to remove deleted pod from cache: %v", err)
149 }
150 delete(s.fileKeyMapping, e.fileName)
151 }
152 }
153 return nil
154 }
155
View as plain text