package v1 import ( "context" "fmt" "maps" "slices" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/lib/kernel/devices" "edge-infra.dev/pkg/sds/devices/logger" ) const ( MarkDeletedAnn = "device-system/mark-deleted" ) type opts struct { persistence bool path string } type ListOption func(*opts) // WithPersistence uses the local persistent cache if API server is unavailable func WithPersistence(enabled bool) ListOption { return func(o *opts) { o.persistence = enabled } } // WithPersistencePath sets the path to cache file func WithPersistencePath(path string) ListOption { return func(o *opts) { o.path = path } } // FromClient fetches and populates a device class by parsing sys fs. It is recommended // to use this class method to fetch the device classes from kubernetes API. func FromClient(ctx context.Context, c client.Client, name string) (*DeviceClass, error) { class := DeviceClass{} if err := c.Get(ctx, types.NamespacedName{Name: name}, &class); err != nil { return nil, err } if err := class.populate(); err != nil { return nil, err } return &class, nil } // WatchFromClient will return an informer to watch for device class updates and send them to the device class channel func WatchFromClient(ctx context.Context, c client.Client, config *rest.Config, deviceClassChan chan *DeviceClass, options ...ListOption) (dynamicinformer.DynamicSharedInformerFactory, error) { listOpts := &opts{} for _, o := range options { o(listOpts) } informerFactory, err := informerFactory(config) if err != nil { return nil, err } if err := addClassEventHandler(ctx, c, informerFactory, deviceClassChan, listOpts); err != nil { return nil, err } if err := addDeviceSetsEventHandler(ctx, c, informerFactory, deviceClassChan, listOpts); err != nil { return nil, err } return informerFactory, err } // addClassEventHandler adds a new event handler to watch for device class changes func addClassEventHandler(ctx context.Context, c client.Client, sharedInformer dynamicinformer.DynamicSharedInformerFactory, deviceClassChan chan *DeviceClass, options *opts) error { log := logger.FromContext(ctx) classInformer := sharedInformer.ForResource(GroupVersion.WithResource("deviceclasses")).Informer() _, err := classInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { deviceClass, err := convertDeviceClass(obj) if err != nil { log.Error("could not convert DeviceClass object", "error", err) return } deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name()) if err != nil { log.Error("failed to cache and populate device classes", "error", err) return } deviceClassChan <- deviceClassMap[deviceClass.ClassName()] }, UpdateFunc: func(oldObj, newObj interface{}) { deviceClass, err := convertDeviceClass(newObj) if err != nil { log.Error("could not convert DeviceClass object", "error", err) return } oldDeviceClass, err := convertDeviceClass(oldObj) if err != nil { log.Error("could not convert DeviceClass object", "error", err) return } if deviceClass.ObjectMeta.Generation == oldDeviceClass.Generation { return } deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name()) if err != nil { log.Error("failed to cache and populate device classes", "error", err) return } deviceClassChan <- deviceClassMap[deviceClass.ClassName()] }, DeleteFunc: func(obj interface{}) { deviceClass, err := convertDeviceClass(obj) if err != nil { log.Error("could not convert DeviceClass object", "error", err) return } deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name()) if err != nil { log.Error("failed to cache and delete device classes", "error", err) return } updatedDeviceClass := deviceClassMap[deviceClass.ClassName()] updatedDeviceClass.Annotations[MarkDeletedAnn] = "true" deviceClassChan <- updatedDeviceClass }, }) return err } // addDeviceSetsEventHandler creates a watch on DeviceSet CRs and triggers update to owning device classes if changed func addDeviceSetsEventHandler(ctx context.Context, c client.Client, sharedInformer dynamicinformer.DynamicSharedInformerFactory, deviceClassChan chan *DeviceClass, options *opts) error { log := logger.FromContext(ctx) setsInformer := sharedInformer.ForResource(GroupVersion.WithResource("devicesets")).Informer() _, err := setsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { deviceSet, err := convertDeviceSet(obj) if err != nil { log.Error("could not convert DeviceSet object", "error", err) return } if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil { log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name) } }, UpdateFunc: func(oldObj, newObj interface{}) { deviceSet, err := convertDeviceSet(newObj) if err != nil { log.Error("could not convert DeviceSet object", "error", err) return } oldDeviceSet, err := convertDeviceSet(oldObj) if err != nil { log.Error("could not convert DeviceSet object", "error", err) return } if deviceSet.Generation == oldDeviceSet.Generation { return } if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil { log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name) } }, DeleteFunc: func(obj interface{}) { deviceSet, err := convertDeviceSet(obj) if err != nil { log.Error("could not convert DeviceSet object", "error", err) return } if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil { log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name) } }, }) return err } // ListFromClient fetches and populates all device classes by parsing sys fs. It is recommended // to use this class method to fetch the device classes from kubernetes API. func ListFromClient(ctx context.Context, c client.Client, o ...ListOption) (map[string]*DeviceClass, error) { options := &opts{} for _, o := range o { o(options) } deviceClasses, err := fetchPopulatedDeviceClasses(ctx, c, options) if err != nil { return nil, fmt.Errorf("failed to fetch device classes: %v", err) } return deviceClasses, nil } // updateDeviceClassesFromDeviceSet will send DeviceClass update to the device class channel for any classes that reference the deviceSet resource. func updateDeviceClassesFromDeviceSet(ctx context.Context, c client.Client, deviceSet *DeviceSet, deviceClassChan chan *DeviceClass, options *opts) error { log := logger.FromContext(ctx) log.Info("device set was modified", "deviceset", deviceSet.Name()) deviceClasses, err := fetchAllDeviceClasses(ctx, c, options) if err != nil { return err } for _, class := range deviceClasses { classMatched := false for _, classDeviceSet := range class.Spec.DeviceSets { if classDeviceSet.Name == deviceSet.Name() { classMatched = true break } } // if device class does not match, delete it if !classMatched { delete(deviceClasses, class.Name()) } } deviceClassNames := slices.Collect(maps.Keys(deviceClasses)) deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClassNames...) if err != nil { return err } for _, class := range deviceClassMap { deviceClassChan <- class } return nil } // applyDeviceSetsToClass will match the devices sets to a class and add them to the object func applyDeviceSetsToClass(deviceSetList []*DeviceSet, class *DeviceClass) *DeviceClass { for _, deviceRef := range class.Spec.Devices { for _, device := range deviceSetList { if device.Name() != deviceRef.Name { continue } class.DeviceList = append(class.DeviceList, device) } } return class } // fetchDeviceClasses will list the device classes. If an API error occurs, it will attempt to fetch // the device class CRs from local cache on disk. The device classes will be populated with device sets and device rules. // If filterNames is set, only device classes with a matching name will be populated and returned. func fetchPopulatedDeviceClasses(ctx context.Context, c client.Client, opts *opts, filterNames ...string) (map[string]*DeviceClass, error) { deviceList, err := fetchDeviceSets(ctx, c, opts) if err != nil { return map[string]*DeviceClass{}, err } deviceClasses, err := fetchAllDeviceClasses(ctx, c, opts) if err != nil { return map[string]*DeviceClass{}, err } // populate device classes with sets and rules for name, class := range deviceClasses { if len(filterNames) != 0 && !slices.Contains(filterNames, class.Name()) { delete(deviceClasses, name) continue } deviceClasses[name] = applyDeviceSetsToClass(deviceList, class) if deviceClasses[name], err = populateDeviceClass(ctx, deviceClasses[name]); err != nil { return map[string]*DeviceClass{}, err } } return deviceClasses, nil } // fetchAllDeviceClasses will fetch the device classes from API server or local cache if API server is unavailable. // The device classes will not be populated with the device sets and device rules. func fetchAllDeviceClasses(ctx context.Context, c client.Client, opts *opts) (map[string]*DeviceClass, error) { deviceClassList := &DeviceClassList{} fetchErr := c.List(ctx, deviceClassList) if !opts.persistence && fetchErr != nil { return map[string]*DeviceClass{}, fetchErr } deviceClasses := map[string]*DeviceClass{} if fetchErr != nil { persistedDeviceClasses, err := readDeviceClassFromPersistence(opts.path) if err != nil { return map[string]*DeviceClass{}, err } deviceClasses = persistedDeviceClasses } else { for _, class := range deviceClassList.Items { newClass := &class deviceClasses[class.ClassName()] = newClass } } if opts.persistence && fetchErr == nil { if err := writeDeviceClassCache(ctx, opts.path, deviceClassList.Items); err != nil { return deviceClasses, err } } return deviceClasses, nil } // fetchDeviceSets will list the device CRs. If an error occurs, it will attempt to fetch the device CRs // from the local cache on disk func fetchDeviceSets(ctx context.Context, c client.Client, opts *opts) ([]*DeviceSet, error) { deviceSetList := &DeviceSetList{} err := c.List(ctx, deviceSetList) if !opts.persistence && err != nil { return []*DeviceSet{}, err } log := logger.FromContext(ctx) if opts.persistence && err != nil { log.Error("could not fetch device sets from api server", "error", err) return readDeviceFromPersistence(opts.path) } deviceSets := []*DeviceSet{} for _, dev := range deviceSetList.Items { newDevice := &dev deviceSets = append(deviceSets, newDevice) } if opts.persistence { if err := writeDevicesCache(ctx, opts.path, deviceSetList.Items); err != nil { log.Error("error writing device sets cache", "error", err) } } return deviceSets, nil } // populateDeviceClass will fetch the devices from sys fs that match the device class func populateDeviceClass(ctx context.Context, deviceClass *DeviceClass) (*DeviceClass, error) { deviceClass.DeviceStatus.devices = map[string]devices.Device{} startTime := time.Now() if err := deviceClass.populate(); err != nil { return nil, err } endTime := time.Now() diffTime := endTime.Sub(startTime) log := logger.FromContext(ctx) log.Info("populated devices from sys fs", "class", deviceClass.ClassName(), "time", diffTime.String()) return deviceClass, nil } func informerFactory(clusterConfig *rest.Config) (dynamicinformer.DynamicSharedInformerFactory, error) { clusterClient, err := dynamic.NewForConfig(clusterConfig) if err != nil { return nil, err } factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, time.Minute, v1.NamespaceAll, nil) return factory, nil }