1
16
17 package config
18
19 import (
20 "fmt"
21 "os"
22 "path/filepath"
23 "sort"
24 "strings"
25 "time"
26
27 "k8s.io/klog/v2"
28
29 v1 "k8s.io/api/core/v1"
30 "k8s.io/apimachinery/pkg/types"
31 "k8s.io/client-go/tools/cache"
32 api "k8s.io/kubernetes/pkg/apis/core"
33 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
34 utilio "k8s.io/utils/io"
35 )
36
37 type podEventType int
38
39 const (
40 podAdd podEventType = iota
41 podModify
42 podDelete
43
44 eventBufferLen = 10
45 )
46
47 type watchEvent struct {
48 fileName string
49 eventType podEventType
50 }
51
52 type sourceFile struct {
53 path string
54 nodeName types.NodeName
55 period time.Duration
56 store cache.Store
57 fileKeyMapping map[string]string
58 updates chan<- interface{}
59 watchEvents chan *watchEvent
60 }
61
62
63 func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
64
65 path = strings.TrimRight(path, string(os.PathSeparator))
66
67 config := newSourceFile(path, nodeName, period, updates)
68 klog.V(1).InfoS("Watching path", "path", path)
69 config.run()
70 }
71
72 func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
73 send := func(objs []interface{}) {
74 var pods []*v1.Pod
75 for _, o := range objs {
76 pods = append(pods, o.(*v1.Pod))
77 }
78 updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
79 }
80 store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
81 return &sourceFile{
82 path: path,
83 nodeName: nodeName,
84 period: period,
85 store: store,
86 fileKeyMapping: map[string]string{},
87 updates: updates,
88 watchEvents: make(chan *watchEvent, eventBufferLen),
89 }
90 }
91
92 func (s *sourceFile) run() {
93 listTicker := time.NewTicker(s.period)
94
95 go func() {
96
97 if err := s.listConfig(); err != nil {
98 klog.ErrorS(err, "Unable to read config path", "path", s.path)
99 }
100 for {
101 select {
102 case <-listTicker.C:
103 if err := s.listConfig(); err != nil {
104 klog.ErrorS(err, "Unable to read config path", "path", s.path)
105 }
106 case e := <-s.watchEvents:
107 if err := s.consumeWatchEvent(e); err != nil {
108 klog.ErrorS(err, "Unable to process watch event")
109 }
110 }
111 }
112 }()
113
114 s.startWatch()
115 }
116
117 func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
118 return applyDefaults(pod, source, true, s.nodeName)
119 }
120
121 func (s *sourceFile) listConfig() error {
122 path := s.path
123 statInfo, err := os.Stat(path)
124 if err != nil {
125 if !os.IsNotExist(err) {
126 return err
127 }
128
129 s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
130 return fmt.Errorf("path does not exist, ignoring")
131 }
132
133 switch {
134 case statInfo.Mode().IsDir():
135 pods, err := s.extractFromDir(path)
136 if err != nil {
137 return err
138 }
139 if len(pods) == 0 {
140
141 s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
142 return nil
143 }
144 return s.replaceStore(pods...)
145
146 case statInfo.Mode().IsRegular():
147 pod, err := s.extractFromFile(path)
148 if err != nil {
149 return err
150 }
151 return s.replaceStore(pod)
152
153 default:
154 return fmt.Errorf("path is not a directory or file")
155 }
156 }
157
158
159
160
161 func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
162 dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
163 if err != nil {
164 return nil, fmt.Errorf("glob failed: %v", err)
165 }
166
167 pods := make([]*v1.Pod, 0, len(dirents))
168 if len(dirents) == 0 {
169 return pods, nil
170 }
171
172 sort.Strings(dirents)
173 for _, path := range dirents {
174 statInfo, err := os.Stat(path)
175 if err != nil {
176 klog.ErrorS(err, "Could not get metadata", "path", path)
177 continue
178 }
179
180 switch {
181 case statInfo.Mode().IsDir():
182 klog.ErrorS(nil, "Provided manifest path is a directory, not recursing into manifest path", "path", path)
183 case statInfo.Mode().IsRegular():
184 pod, err := s.extractFromFile(path)
185 if err != nil {
186 if !os.IsNotExist(err) {
187 klog.ErrorS(err, "Could not process manifest file", "path", path)
188 }
189 } else {
190 pods = append(pods, pod)
191 }
192 default:
193 klog.ErrorS(nil, "Manifest path is not a directory or file", "path", path, "mode", statInfo.Mode())
194 }
195 }
196 return pods, nil
197 }
198
199
200 func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
201 klog.V(3).InfoS("Reading config file", "path", filename)
202 defer func() {
203 if err == nil && pod != nil {
204 objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
205 if keyErr != nil {
206 err = keyErr
207 return
208 }
209 s.fileKeyMapping[filename] = objKey
210 }
211 }()
212
213 file, err := os.Open(filename)
214 if err != nil {
215 return pod, err
216 }
217 defer file.Close()
218
219 data, err := utilio.ReadAtMost(file, maxConfigLength)
220 if err != nil {
221 return pod, err
222 }
223
224 defaultFn := func(pod *api.Pod) error {
225 return s.applyDefaults(pod, filename)
226 }
227
228 parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
229 if parsed {
230 if podErr != nil {
231 return pod, podErr
232 }
233 return pod, nil
234 }
235
236 return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
237 }
238
239 func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) {
240 objs := []interface{}{}
241 for _, pod := range pods {
242 objs = append(objs, pod)
243 }
244 return s.store.Replace(objs, "")
245 }
246
View as plain text