...

Source file src/edge-infra.dev/pkg/sds/devices/wait/wait.go

Documentation: edge-infra.dev/pkg/sds/devices/wait

     1  package wait
     2  
     3  import (
     4  	"context"
     5  	"os"
     6  	"regexp"
     7  	"time"
     8  
     9  	"edge-infra.dev/pkg/lib/fog"
    10  	v1deviceclass "edge-infra.dev/pkg/sds/devices/k8s/apis/v1"
    11  )
    12  
    13  const (
    14  	deviceDir       = "/var/lib/kubelet/device-plugins"
    15  	deviceCachePath = "/zynstra/config"
    16  )
    17  
    18  var (
    19  	checkInterval = time.Second * 1
    20  	pattern       = regexp.MustCompile(`^ds-.*\.sock$`)
    21  )
    22  
    23  // ForDevices waits until a desired number of devices are registered and the
    24  // registration process has stabilized (devices are no longer being registered)
    25  func ForDevices(ctx context.Context) error {
    26  	w := waiter{
    27  		deviceDir:       deviceDir,
    28  		deviceCachePath: deviceCachePath,
    29  		checkInterval:   checkInterval,
    30  	}
    31  
    32  	return w.wait(ctx)
    33  }
    34  
    35  // waiter contains of the configuration options for the process of waiting for
    36  // device registration to complete
    37  type waiter struct {
    38  	deviceDir       string
    39  	deviceCachePath string
    40  	checkInterval   time.Duration
    41  }
    42  
    43  // wait for the device registration process to complete. This method will not
    44  // return until device registration has successfully completed
    45  func (w *waiter) wait(ctx context.Context) error {
    46  	log := fog.FromContext(ctx)
    47  
    48  	log.Info("waiting for devices...", "deviceDirectory", w.deviceDir, "deviceCacheDir", w.deviceCachePath, "deviceCacheFilename", "deviceclasses.json")
    49  
    50  	deviceClasses, err := v1deviceclass.ReadDeviceClassFromPersistence(w.deviceCachePath)
    51  	if err != nil {
    52  		return err
    53  	}
    54  
    55  	expectedDeviceCount := len(deviceClasses)
    56  	for {
    57  		registrationComplete, err := w.checkDevices(ctx, expectedDeviceCount)
    58  		if err != nil {
    59  			return err
    60  		}
    61  
    62  		if registrationComplete {
    63  			log.Info("device registration complete")
    64  			break
    65  		}
    66  		time.Sleep(w.checkInterval)
    67  	}
    68  	return nil
    69  }
    70  
    71  // checkDevices returns true if the expected number of devices have been
    72  // registered
    73  func (w *waiter) checkDevices(ctx context.Context, expectedDeviceCount int) (bool, error) {
    74  	log := fog.FromContext(ctx)
    75  
    76  	files, err := os.ReadDir(w.deviceDir)
    77  	if err != nil {
    78  		return false, err
    79  	}
    80  
    81  	count := 0
    82  	for _, file := range files {
    83  		if !file.IsDir() && pattern.MatchString(file.Name()) {
    84  			count++
    85  		}
    86  	}
    87  	log.Info("found devices", "expectedCount", expectedDeviceCount, "count", count)
    88  	return count >= expectedDeviceCount, nil
    89  }
    90  

View as plain text