...
1 package wait
2
3 import (
4 "context"
5 "os"
6 "regexp"
7 "time"
8
9 "edge-infra.dev/pkg/lib/fog"
10 v1deviceclass "edge-infra.dev/pkg/sds/devices/k8s/apis/v1"
11 )
12
13 const (
14 deviceDir = "/var/lib/kubelet/device-plugins"
15 deviceCachePath = "/zynstra/config"
16 )
17
18 var (
19 checkInterval = time.Second * 1
20 pattern = regexp.MustCompile(`^ds-.*\.sock$`)
21 )
22
23
24
25 func ForDevices(ctx context.Context) error {
26 w := waiter{
27 deviceDir: deviceDir,
28 deviceCachePath: deviceCachePath,
29 checkInterval: checkInterval,
30 }
31
32 return w.wait(ctx)
33 }
34
35
36
37 type waiter struct {
38 deviceDir string
39 deviceCachePath string
40 checkInterval time.Duration
41 }
42
43
44
45 func (w *waiter) wait(ctx context.Context) error {
46 log := fog.FromContext(ctx)
47
48 log.Info("waiting for devices...", "deviceDirectory", w.deviceDir, "deviceCacheDir", w.deviceCachePath, "deviceCacheFilename", "deviceclasses.json")
49
50 deviceClasses, err := v1deviceclass.ReadDeviceClassFromPersistence(w.deviceCachePath)
51 if err != nil {
52 return err
53 }
54
55 expectedDeviceCount := len(deviceClasses)
56 for {
57 registrationComplete, err := w.checkDevices(ctx, expectedDeviceCount)
58 if err != nil {
59 return err
60 }
61
62 if registrationComplete {
63 log.Info("device registration complete")
64 break
65 }
66 time.Sleep(w.checkInterval)
67 }
68 return nil
69 }
70
71
72
73 func (w *waiter) checkDevices(ctx context.Context, expectedDeviceCount int) (bool, error) {
74 log := fog.FromContext(ctx)
75
76 files, err := os.ReadDir(w.deviceDir)
77 if err != nil {
78 return false, err
79 }
80
81 count := 0
82 for _, file := range files {
83 if !file.IsDir() && pattern.MatchString(file.Name()) {
84 count++
85 }
86 }
87 log.Info("found devices", "expectedCount", expectedDeviceCount, "count", count)
88 return count >= expectedDeviceCount, nil
89 }
90
View as plain text