1 package v1
2
3 import (
4 "context"
5 "fmt"
6 "maps"
7 "slices"
8 "time"
9
10 v1 "k8s.io/api/core/v1"
11 "k8s.io/apimachinery/pkg/types"
12 "k8s.io/client-go/dynamic"
13 "k8s.io/client-go/dynamic/dynamicinformer"
14 "k8s.io/client-go/rest"
15 "k8s.io/client-go/tools/cache"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17
18 "edge-infra.dev/pkg/lib/kernel/devices"
19 "edge-infra.dev/pkg/sds/devices/logger"
20 )
21
22 const (
23 MarkDeletedAnn = "device-system/mark-deleted"
24 )
25
26 type opts struct {
27 persistence bool
28 path string
29 }
30
31 type ListOption func(*opts)
32
33
34 func WithPersistence(enabled bool) ListOption {
35 return func(o *opts) {
36 o.persistence = enabled
37 }
38 }
39
40
41 func WithPersistencePath(path string) ListOption {
42 return func(o *opts) {
43 o.path = path
44 }
45 }
46
47
48
49 func FromClient(ctx context.Context, c client.Client, name string) (*DeviceClass, error) {
50 class := DeviceClass{}
51 if err := c.Get(ctx, types.NamespacedName{Name: name}, &class); err != nil {
52 return nil, err
53 }
54 if err := class.populate(); err != nil {
55 return nil, err
56 }
57 return &class, nil
58 }
59
60
61 func WatchFromClient(ctx context.Context, c client.Client, config *rest.Config, deviceClassChan chan *DeviceClass, options ...ListOption) (dynamicinformer.DynamicSharedInformerFactory, error) {
62 listOpts := &opts{}
63 for _, o := range options {
64 o(listOpts)
65 }
66
67 informerFactory, err := informerFactory(config)
68 if err != nil {
69 return nil, err
70 }
71
72 if err := addClassEventHandler(ctx, c, informerFactory, deviceClassChan, listOpts); err != nil {
73 return nil, err
74 }
75
76 if err := addDeviceSetsEventHandler(ctx, c, informerFactory, deviceClassChan, listOpts); err != nil {
77 return nil, err
78 }
79 return informerFactory, err
80 }
81
82
83 func addClassEventHandler(ctx context.Context, c client.Client, sharedInformer dynamicinformer.DynamicSharedInformerFactory, deviceClassChan chan *DeviceClass, options *opts) error {
84 log := logger.FromContext(ctx)
85 classInformer := sharedInformer.ForResource(GroupVersion.WithResource("deviceclasses")).Informer()
86 _, err := classInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
87 AddFunc: func(obj interface{}) {
88 deviceClass, err := convertDeviceClass(obj)
89 if err != nil {
90 log.Error("could not convert DeviceClass object", "error", err)
91 return
92 }
93
94 deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name())
95 if err != nil {
96 log.Error("failed to cache and populate device classes", "error", err)
97 return
98 }
99 deviceClassChan <- deviceClassMap[deviceClass.ClassName()]
100 },
101 UpdateFunc: func(oldObj, newObj interface{}) {
102 deviceClass, err := convertDeviceClass(newObj)
103 if err != nil {
104 log.Error("could not convert DeviceClass object", "error", err)
105 return
106 }
107
108 oldDeviceClass, err := convertDeviceClass(oldObj)
109 if err != nil {
110 log.Error("could not convert DeviceClass object", "error", err)
111 return
112 }
113
114 if deviceClass.ObjectMeta.Generation == oldDeviceClass.Generation {
115 return
116 }
117
118 deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name())
119 if err != nil {
120 log.Error("failed to cache and populate device classes", "error", err)
121 return
122 }
123 deviceClassChan <- deviceClassMap[deviceClass.ClassName()]
124 },
125 DeleteFunc: func(obj interface{}) {
126 deviceClass, err := convertDeviceClass(obj)
127 if err != nil {
128 log.Error("could not convert DeviceClass object", "error", err)
129 return
130 }
131
132 deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name())
133 if err != nil {
134 log.Error("failed to cache and delete device classes", "error", err)
135 return
136 }
137 updatedDeviceClass := deviceClassMap[deviceClass.ClassName()]
138 updatedDeviceClass.Annotations[MarkDeletedAnn] = "true"
139 deviceClassChan <- updatedDeviceClass
140 },
141 })
142 return err
143 }
144
145
146 func addDeviceSetsEventHandler(ctx context.Context, c client.Client, sharedInformer dynamicinformer.DynamicSharedInformerFactory, deviceClassChan chan *DeviceClass, options *opts) error {
147 log := logger.FromContext(ctx)
148 setsInformer := sharedInformer.ForResource(GroupVersion.WithResource("devicesets")).Informer()
149 _, err := setsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
150 AddFunc: func(obj interface{}) {
151 deviceSet, err := convertDeviceSet(obj)
152 if err != nil {
153 log.Error("could not convert DeviceSet object", "error", err)
154 return
155 }
156
157 if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil {
158 log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name)
159 }
160 },
161 UpdateFunc: func(oldObj, newObj interface{}) {
162 deviceSet, err := convertDeviceSet(newObj)
163 if err != nil {
164 log.Error("could not convert DeviceSet object", "error", err)
165 return
166 }
167
168 oldDeviceSet, err := convertDeviceSet(oldObj)
169 if err != nil {
170 log.Error("could not convert DeviceSet object", "error", err)
171 return
172 }
173
174 if deviceSet.Generation == oldDeviceSet.Generation {
175 return
176 }
177
178 if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil {
179 log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name)
180 }
181 },
182 DeleteFunc: func(obj interface{}) {
183 deviceSet, err := convertDeviceSet(obj)
184 if err != nil {
185 log.Error("could not convert DeviceSet object", "error", err)
186 return
187 }
188
189 if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil {
190 log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name)
191 }
192 },
193 })
194 return err
195 }
196
197
198
199 func ListFromClient(ctx context.Context, c client.Client, o ...ListOption) (map[string]*DeviceClass, error) {
200 options := &opts{}
201 for _, o := range o {
202 o(options)
203 }
204
205 deviceClasses, err := fetchPopulatedDeviceClasses(ctx, c, options)
206 if err != nil {
207 return nil, fmt.Errorf("failed to fetch device classes: %v", err)
208 }
209 return deviceClasses, nil
210 }
211
212
213 func updateDeviceClassesFromDeviceSet(ctx context.Context, c client.Client, deviceSet *DeviceSet, deviceClassChan chan *DeviceClass, options *opts) error {
214 log := logger.FromContext(ctx)
215 log.Info("device set was modified", "deviceset", deviceSet.Name())
216
217 deviceClasses, err := fetchAllDeviceClasses(ctx, c, options)
218 if err != nil {
219 return err
220 }
221
222 for _, class := range deviceClasses {
223 classMatched := false
224
225 for _, classDeviceSet := range class.Spec.DeviceSets {
226 if classDeviceSet.Name == deviceSet.Name() {
227 classMatched = true
228 break
229 }
230 }
231
232
233 if !classMatched {
234 delete(deviceClasses, class.Name())
235 }
236 }
237
238 deviceClassNames := slices.Collect(maps.Keys(deviceClasses))
239 deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClassNames...)
240 if err != nil {
241 return err
242 }
243
244 for _, class := range deviceClassMap {
245 deviceClassChan <- class
246 }
247 return nil
248 }
249
250
251 func applyDeviceSetsToClass(deviceSetList []*DeviceSet, class *DeviceClass) *DeviceClass {
252 for _, deviceRef := range class.Spec.Devices {
253 for _, device := range deviceSetList {
254 if device.Name() != deviceRef.Name {
255 continue
256 }
257 class.DeviceList = append(class.DeviceList, device)
258 }
259 }
260 return class
261 }
262
263
264
265
266 func fetchPopulatedDeviceClasses(ctx context.Context, c client.Client, opts *opts, filterNames ...string) (map[string]*DeviceClass, error) {
267 deviceList, err := fetchDeviceSets(ctx, c, opts)
268 if err != nil {
269 return map[string]*DeviceClass{}, err
270 }
271
272 deviceClasses, err := fetchAllDeviceClasses(ctx, c, opts)
273 if err != nil {
274 return map[string]*DeviceClass{}, err
275 }
276
277
278 for name, class := range deviceClasses {
279 if len(filterNames) != 0 && !slices.Contains(filterNames, class.Name()) {
280 delete(deviceClasses, name)
281 continue
282 }
283
284 deviceClasses[name] = applyDeviceSetsToClass(deviceList, class)
285 if deviceClasses[name], err = populateDeviceClass(ctx, deviceClasses[name]); err != nil {
286 return map[string]*DeviceClass{}, err
287 }
288 }
289 return deviceClasses, nil
290 }
291
292
293
294 func fetchAllDeviceClasses(ctx context.Context, c client.Client, opts *opts) (map[string]*DeviceClass, error) {
295 deviceClassList := &DeviceClassList{}
296 fetchErr := c.List(ctx, deviceClassList)
297 if !opts.persistence && fetchErr != nil {
298 return map[string]*DeviceClass{}, fetchErr
299 }
300
301 deviceClasses := map[string]*DeviceClass{}
302 if fetchErr != nil {
303 persistedDeviceClasses, err := readDeviceClassFromPersistence(opts.path)
304 if err != nil {
305 return map[string]*DeviceClass{}, err
306 }
307 deviceClasses = persistedDeviceClasses
308 } else {
309 for _, class := range deviceClassList.Items {
310 newClass := &class
311 deviceClasses[class.ClassName()] = newClass
312 }
313 }
314
315 if opts.persistence && fetchErr == nil {
316 if err := writeDeviceClassCache(ctx, opts.path, deviceClassList.Items); err != nil {
317 return deviceClasses, err
318 }
319 }
320 return deviceClasses, nil
321 }
322
323
324
325 func fetchDeviceSets(ctx context.Context, c client.Client, opts *opts) ([]*DeviceSet, error) {
326 deviceSetList := &DeviceSetList{}
327 err := c.List(ctx, deviceSetList)
328 if !opts.persistence && err != nil {
329 return []*DeviceSet{}, err
330 }
331
332 log := logger.FromContext(ctx)
333 if opts.persistence && err != nil {
334 log.Error("could not fetch device sets from api server", "error", err)
335 return readDeviceFromPersistence(opts.path)
336 }
337
338 deviceSets := []*DeviceSet{}
339 for _, dev := range deviceSetList.Items {
340 newDevice := &dev
341 deviceSets = append(deviceSets, newDevice)
342 }
343
344 if opts.persistence {
345 if err := writeDevicesCache(ctx, opts.path, deviceSetList.Items); err != nil {
346 log.Error("error writing device sets cache", "error", err)
347 }
348 }
349 return deviceSets, nil
350 }
351
352
353 func populateDeviceClass(ctx context.Context, deviceClass *DeviceClass) (*DeviceClass, error) {
354 deviceClass.DeviceStatus.devices = map[string]devices.Device{}
355 startTime := time.Now()
356 if err := deviceClass.populate(); err != nil {
357 return nil, err
358 }
359 endTime := time.Now()
360 diffTime := endTime.Sub(startTime)
361 log := logger.FromContext(ctx)
362 log.Info("populated devices from sys fs", "class", deviceClass.ClassName(), "time", diffTime.String())
363 return deviceClass, nil
364 }
365
366 func informerFactory(clusterConfig *rest.Config) (dynamicinformer.DynamicSharedInformerFactory, error) {
367 clusterClient, err := dynamic.NewForConfig(clusterConfig)
368 if err != nil {
369 return nil, err
370 }
371 factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, time.Minute, v1.NamespaceAll, nil)
372 return factory, nil
373 }
374
View as plain text