...

Source file src/edge-infra.dev/pkg/sds/devices/k8s/apis/v1/client.go

Documentation: edge-infra.dev/pkg/sds/devices/k8s/apis/v1

     1  package v1
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"maps"
     7  	"slices"
     8  	"time"
     9  
    10  	v1 "k8s.io/api/core/v1"
    11  	"k8s.io/apimachinery/pkg/types"
    12  	"k8s.io/client-go/dynamic"
    13  	"k8s.io/client-go/dynamic/dynamicinformer"
    14  	"k8s.io/client-go/rest"
    15  	"k8s.io/client-go/tools/cache"
    16  	"sigs.k8s.io/controller-runtime/pkg/client"
    17  
    18  	"edge-infra.dev/pkg/lib/kernel/devices"
    19  	"edge-infra.dev/pkg/sds/devices/logger"
    20  )
    21  
    22  const (
    23  	MarkDeletedAnn = "device-system/mark-deleted"
    24  )
    25  
    26  type opts struct {
    27  	persistence bool
    28  	path        string
    29  }
    30  
    31  type ListOption func(*opts)
    32  
    33  // WithPersistence uses the local persistent cache if API server is unavailable
    34  func WithPersistence(enabled bool) ListOption {
    35  	return func(o *opts) {
    36  		o.persistence = enabled
    37  	}
    38  }
    39  
    40  // WithPersistencePath sets the path to cache file
    41  func WithPersistencePath(path string) ListOption {
    42  	return func(o *opts) {
    43  		o.path = path
    44  	}
    45  }
    46  
    47  // FromClient fetches and populates a device class by parsing sys fs. It is recommended
    48  // to use this class method to fetch the device classes from kubernetes API.
    49  func FromClient(ctx context.Context, c client.Client, name string) (*DeviceClass, error) {
    50  	class := DeviceClass{}
    51  	if err := c.Get(ctx, types.NamespacedName{Name: name}, &class); err != nil {
    52  		return nil, err
    53  	}
    54  	if err := class.populate(); err != nil {
    55  		return nil, err
    56  	}
    57  	return &class, nil
    58  }
    59  
    60  // WatchFromClient will return an informer to watch for device class updates and send them to the device class channel
    61  func WatchFromClient(ctx context.Context, c client.Client, config *rest.Config, deviceClassChan chan *DeviceClass, options ...ListOption) (dynamicinformer.DynamicSharedInformerFactory, error) {
    62  	listOpts := &opts{}
    63  	for _, o := range options {
    64  		o(listOpts)
    65  	}
    66  
    67  	informerFactory, err := informerFactory(config)
    68  	if err != nil {
    69  		return nil, err
    70  	}
    71  
    72  	if err := addClassEventHandler(ctx, c, informerFactory, deviceClassChan, listOpts); err != nil {
    73  		return nil, err
    74  	}
    75  
    76  	if err := addDeviceSetsEventHandler(ctx, c, informerFactory, deviceClassChan, listOpts); err != nil {
    77  		return nil, err
    78  	}
    79  	return informerFactory, err
    80  }
    81  
    82  // addClassEventHandler adds a new event handler to watch for device class changes
    83  func addClassEventHandler(ctx context.Context, c client.Client, sharedInformer dynamicinformer.DynamicSharedInformerFactory, deviceClassChan chan *DeviceClass, options *opts) error {
    84  	log := logger.FromContext(ctx)
    85  	classInformer := sharedInformer.ForResource(GroupVersion.WithResource("deviceclasses")).Informer()
    86  	_, err := classInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    87  		AddFunc: func(obj interface{}) {
    88  			deviceClass, err := convertDeviceClass(obj)
    89  			if err != nil {
    90  				log.Error("could not convert DeviceClass object", "error", err)
    91  				return
    92  			}
    93  
    94  			deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name())
    95  			if err != nil {
    96  				log.Error("failed to cache and populate device classes", "error", err)
    97  				return
    98  			}
    99  			deviceClassChan <- deviceClassMap[deviceClass.ClassName()]
   100  		},
   101  		UpdateFunc: func(oldObj, newObj interface{}) {
   102  			deviceClass, err := convertDeviceClass(newObj)
   103  			if err != nil {
   104  				log.Error("could not convert DeviceClass object", "error", err)
   105  				return
   106  			}
   107  
   108  			oldDeviceClass, err := convertDeviceClass(oldObj)
   109  			if err != nil {
   110  				log.Error("could not convert DeviceClass object", "error", err)
   111  				return
   112  			}
   113  
   114  			if deviceClass.ObjectMeta.Generation == oldDeviceClass.Generation {
   115  				return
   116  			}
   117  
   118  			deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name())
   119  			if err != nil {
   120  				log.Error("failed to cache and populate device classes", "error", err)
   121  				return
   122  			}
   123  			deviceClassChan <- deviceClassMap[deviceClass.ClassName()]
   124  		},
   125  		DeleteFunc: func(obj interface{}) {
   126  			deviceClass, err := convertDeviceClass(obj)
   127  			if err != nil {
   128  				log.Error("could not convert DeviceClass object", "error", err)
   129  				return
   130  			}
   131  
   132  			deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClass.Name())
   133  			if err != nil {
   134  				log.Error("failed to cache and delete device classes", "error", err)
   135  				return
   136  			}
   137  			updatedDeviceClass := deviceClassMap[deviceClass.ClassName()]
   138  			updatedDeviceClass.Annotations[MarkDeletedAnn] = "true"
   139  			deviceClassChan <- updatedDeviceClass
   140  		},
   141  	})
   142  	return err
   143  }
   144  
   145  // addDeviceSetsEventHandler creates a watch on DeviceSet CRs and triggers update to owning device classes if changed
   146  func addDeviceSetsEventHandler(ctx context.Context, c client.Client, sharedInformer dynamicinformer.DynamicSharedInformerFactory, deviceClassChan chan *DeviceClass, options *opts) error {
   147  	log := logger.FromContext(ctx)
   148  	setsInformer := sharedInformer.ForResource(GroupVersion.WithResource("devicesets")).Informer()
   149  	_, err := setsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
   150  		AddFunc: func(obj interface{}) {
   151  			deviceSet, err := convertDeviceSet(obj)
   152  			if err != nil {
   153  				log.Error("could not convert DeviceSet object", "error", err)
   154  				return
   155  			}
   156  
   157  			if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil {
   158  				log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name)
   159  			}
   160  		},
   161  		UpdateFunc: func(oldObj, newObj interface{}) {
   162  			deviceSet, err := convertDeviceSet(newObj)
   163  			if err != nil {
   164  				log.Error("could not convert DeviceSet object", "error", err)
   165  				return
   166  			}
   167  
   168  			oldDeviceSet, err := convertDeviceSet(oldObj)
   169  			if err != nil {
   170  				log.Error("could not convert DeviceSet object", "error", err)
   171  				return
   172  			}
   173  
   174  			if deviceSet.Generation == oldDeviceSet.Generation {
   175  				return
   176  			}
   177  
   178  			if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil {
   179  				log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name)
   180  			}
   181  		},
   182  		DeleteFunc: func(obj interface{}) {
   183  			deviceSet, err := convertDeviceSet(obj)
   184  			if err != nil {
   185  				log.Error("could not convert DeviceSet object", "error", err)
   186  				return
   187  			}
   188  
   189  			if err := updateDeviceClassesFromDeviceSet(ctx, c, deviceSet, deviceClassChan, options); err != nil {
   190  				log.Error("failed to update device classes from DeviceSet", "error", err, "DeviceSet", deviceSet.Name)
   191  			}
   192  		},
   193  	})
   194  	return err
   195  }
   196  
   197  // ListFromClient fetches and populates all device classes by parsing sys fs. It is recommended
   198  // to use this class method to fetch the device classes from kubernetes API.
   199  func ListFromClient(ctx context.Context, c client.Client, o ...ListOption) (map[string]*DeviceClass, error) {
   200  	options := &opts{}
   201  	for _, o := range o {
   202  		o(options)
   203  	}
   204  
   205  	deviceClasses, err := fetchPopulatedDeviceClasses(ctx, c, options)
   206  	if err != nil {
   207  		return nil, fmt.Errorf("failed to fetch device classes: %v", err)
   208  	}
   209  	return deviceClasses, nil
   210  }
   211  
   212  // updateDeviceClassesFromDeviceSet will send DeviceClass update to the device class channel for any classes that reference the deviceSet resource.
   213  func updateDeviceClassesFromDeviceSet(ctx context.Context, c client.Client, deviceSet *DeviceSet, deviceClassChan chan *DeviceClass, options *opts) error {
   214  	log := logger.FromContext(ctx)
   215  	log.Info("device set was modified", "deviceset", deviceSet.Name())
   216  
   217  	deviceClasses, err := fetchAllDeviceClasses(ctx, c, options)
   218  	if err != nil {
   219  		return err
   220  	}
   221  
   222  	for _, class := range deviceClasses {
   223  		classMatched := false
   224  
   225  		for _, classDeviceSet := range class.Spec.DeviceSets {
   226  			if classDeviceSet.Name == deviceSet.Name() {
   227  				classMatched = true
   228  				break
   229  			}
   230  		}
   231  
   232  		// if device class does not match, delete it
   233  		if !classMatched {
   234  			delete(deviceClasses, class.Name())
   235  		}
   236  	}
   237  
   238  	deviceClassNames := slices.Collect(maps.Keys(deviceClasses))
   239  	deviceClassMap, err := fetchPopulatedDeviceClasses(ctx, c, options, deviceClassNames...)
   240  	if err != nil {
   241  		return err
   242  	}
   243  
   244  	for _, class := range deviceClassMap {
   245  		deviceClassChan <- class
   246  	}
   247  	return nil
   248  }
   249  
   250  // applyDeviceSetsToClass will match the devices sets to a class and add them to the object
   251  func applyDeviceSetsToClass(deviceSetList []*DeviceSet, class *DeviceClass) *DeviceClass {
   252  	for _, deviceRef := range class.Spec.Devices {
   253  		for _, device := range deviceSetList {
   254  			if device.Name() != deviceRef.Name {
   255  				continue
   256  			}
   257  			class.DeviceList = append(class.DeviceList, device)
   258  		}
   259  	}
   260  	return class
   261  }
   262  
   263  // fetchDeviceClasses will list the device classes. If an API error occurs, it will attempt to fetch
   264  // the device class CRs from local cache on disk. The device classes will be populated with device sets and device rules.
   265  // If filterNames is set, only device classes with a matching name will be populated and returned.
   266  func fetchPopulatedDeviceClasses(ctx context.Context, c client.Client, opts *opts, filterNames ...string) (map[string]*DeviceClass, error) {
   267  	deviceList, err := fetchDeviceSets(ctx, c, opts)
   268  	if err != nil {
   269  		return map[string]*DeviceClass{}, err
   270  	}
   271  
   272  	deviceClasses, err := fetchAllDeviceClasses(ctx, c, opts)
   273  	if err != nil {
   274  		return map[string]*DeviceClass{}, err
   275  	}
   276  
   277  	// populate device classes with sets and rules
   278  	for name, class := range deviceClasses {
   279  		if len(filterNames) != 0 && !slices.Contains(filterNames, class.Name()) {
   280  			delete(deviceClasses, name)
   281  			continue
   282  		}
   283  
   284  		deviceClasses[name] = applyDeviceSetsToClass(deviceList, class)
   285  		if deviceClasses[name], err = populateDeviceClass(ctx, deviceClasses[name]); err != nil {
   286  			return map[string]*DeviceClass{}, err
   287  		}
   288  	}
   289  	return deviceClasses, nil
   290  }
   291  
   292  // fetchAllDeviceClasses will fetch the device classes from API server or local cache if API server is unavailable.
   293  // The device classes will not be populated with the device sets and device rules.
   294  func fetchAllDeviceClasses(ctx context.Context, c client.Client, opts *opts) (map[string]*DeviceClass, error) {
   295  	deviceClassList := &DeviceClassList{}
   296  	fetchErr := c.List(ctx, deviceClassList)
   297  	if !opts.persistence && fetchErr != nil {
   298  		return map[string]*DeviceClass{}, fetchErr
   299  	}
   300  
   301  	deviceClasses := map[string]*DeviceClass{}
   302  	if fetchErr != nil {
   303  		persistedDeviceClasses, err := readDeviceClassFromPersistence(opts.path)
   304  		if err != nil {
   305  			return map[string]*DeviceClass{}, err
   306  		}
   307  		deviceClasses = persistedDeviceClasses
   308  	} else {
   309  		for _, class := range deviceClassList.Items {
   310  			newClass := &class
   311  			deviceClasses[class.ClassName()] = newClass
   312  		}
   313  	}
   314  
   315  	if opts.persistence && fetchErr == nil {
   316  		if err := writeDeviceClassCache(ctx, opts.path, deviceClassList.Items); err != nil {
   317  			return deviceClasses, err
   318  		}
   319  	}
   320  	return deviceClasses, nil
   321  }
   322  
   323  // fetchDeviceSets will list the device CRs. If an error occurs, it will attempt to fetch the device CRs
   324  // from the local cache on disk
   325  func fetchDeviceSets(ctx context.Context, c client.Client, opts *opts) ([]*DeviceSet, error) {
   326  	deviceSetList := &DeviceSetList{}
   327  	err := c.List(ctx, deviceSetList)
   328  	if !opts.persistence && err != nil {
   329  		return []*DeviceSet{}, err
   330  	}
   331  
   332  	log := logger.FromContext(ctx)
   333  	if opts.persistence && err != nil {
   334  		log.Error("could not fetch device sets from api server", "error", err)
   335  		return readDeviceFromPersistence(opts.path)
   336  	}
   337  
   338  	deviceSets := []*DeviceSet{}
   339  	for _, dev := range deviceSetList.Items {
   340  		newDevice := &dev
   341  		deviceSets = append(deviceSets, newDevice)
   342  	}
   343  
   344  	if opts.persistence {
   345  		if err := writeDevicesCache(ctx, opts.path, deviceSetList.Items); err != nil {
   346  			log.Error("error writing device sets cache", "error", err)
   347  		}
   348  	}
   349  	return deviceSets, nil
   350  }
   351  
   352  // populateDeviceClass will fetch the devices from sys fs that match the device class
   353  func populateDeviceClass(ctx context.Context, deviceClass *DeviceClass) (*DeviceClass, error) {
   354  	deviceClass.DeviceStatus.devices = map[string]devices.Device{}
   355  	startTime := time.Now()
   356  	if err := deviceClass.populate(); err != nil {
   357  		return nil, err
   358  	}
   359  	endTime := time.Now()
   360  	diffTime := endTime.Sub(startTime)
   361  	log := logger.FromContext(ctx)
   362  	log.Info("populated devices from sys fs", "class", deviceClass.ClassName(), "time", diffTime.String())
   363  	return deviceClass, nil
   364  }
   365  
   366  func informerFactory(clusterConfig *rest.Config) (dynamicinformer.DynamicSharedInformerFactory, error) {
   367  	clusterClient, err := dynamic.NewForConfig(clusterConfig)
   368  	if err != nil {
   369  		return nil, err
   370  	}
   371  	factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, time.Minute, v1.NamespaceAll, nil)
   372  	return factory, nil
   373  }
   374  

View as plain text