1
16
17 package e2enode
18
19 import (
20 "context"
21 "fmt"
22 "os"
23 "os/user"
24 "runtime"
25 "sync"
26 "time"
27
28 "k8s.io/klog/v2"
29
30 utilerrors "k8s.io/apimachinery/pkg/util/errors"
31 "k8s.io/apimachinery/pkg/util/sets"
32 internalapi "k8s.io/cri-api/pkg/apis"
33 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
34 commontest "k8s.io/kubernetes/test/e2e/common"
35 e2egpu "k8s.io/kubernetes/test/e2e/framework/gpu"
36 e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest"
37 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
38 e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
39 imageutils "k8s.io/kubernetes/test/utils/image"
40 )
41
42 const (
43
44 maxImagePullRetries = 5
45
46 imagePullRetryDelay = time.Second
47
48 maxParallelImagePullCount = 5
49 )
50
51
52
53 var NodePrePullImageList = sets.NewString(
54 imageutils.GetE2EImage(imageutils.Agnhost),
55 "gcr.io/cadvisor/cadvisor:v0.47.2",
56 busyboxImage,
57 "registry.k8s.io/e2e-test-images/busybox@sha256:a9155b13325b2abef48e71de77bb8ac015412a566829f621d06bfae5c699b1b9",
58 imageutils.GetE2EImage(imageutils.Nginx),
59 imageutils.GetE2EImage(imageutils.Perl),
60 imageutils.GetE2EImage(imageutils.Nonewprivs),
61 imageutils.GetPauseImageName(),
62 imageutils.GetE2EImage(imageutils.NodePerfNpbEp),
63 imageutils.GetE2EImage(imageutils.NodePerfNpbIs),
64 imageutils.GetE2EImage(imageutils.Etcd),
65 )
66
67
68
69
70
71 func updateImageAllowList(ctx context.Context) {
72
73 if !isRunningOnArm64() {
74
75 NodePrePullImageList = NodePrePullImageList.Insert(imageutils.GetE2EImage(imageutils.NodePerfTfWideDeep))
76 }
77
78 e2epod.ImagePrePullList = NodePrePullImageList.Union(commontest.PrePulledImages)
79
80 e2epod.ImagePrePullList.Insert(getNodeProblemDetectorImage())
81 if sriovDevicePluginImage, err := getSRIOVDevicePluginImage(); err != nil {
82 klog.Errorln(err)
83 } else {
84 e2epod.ImagePrePullList.Insert(sriovDevicePluginImage)
85 }
86 if gpuDevicePluginImage, err := getGPUDevicePluginImage(ctx); err != nil {
87 klog.Errorln(err)
88 } else {
89 e2epod.ImagePrePullList.Insert(gpuDevicePluginImage)
90 }
91 if samplePluginImage, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginDSYAML); err != nil {
92 klog.Errorln(err)
93 } else {
94 e2epod.ImagePrePullList.Insert(samplePluginImage)
95 }
96 if samplePluginImageCtrlReg, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginControlRegistrationDSYAML); err != nil {
97 klog.Errorln(err)
98 } else {
99 e2epod.ImagePrePullList.Insert(samplePluginImageCtrlReg)
100 }
101 }
102
103 func isRunningOnArm64() bool {
104 return runtime.GOARCH == "arm64"
105 }
106
107 func getNodeProblemDetectorImage() string {
108 const defaultImage string = "registry.k8s.io/node-problem-detector/node-problem-detector:v0.8.16"
109 image := os.Getenv("NODE_PROBLEM_DETECTOR_IMAGE")
110 if image == "" {
111 image = defaultImage
112 }
113 return image
114 }
115
116
117 type puller interface {
118
119 Pull(image string) ([]byte, error)
120
121 Name() string
122 }
123
124 type remotePuller struct {
125 imageService internalapi.ImageManagerService
126 }
127
128 func (rp *remotePuller) Name() string {
129 return "CRI"
130 }
131
132 func (rp *remotePuller) Pull(image string) ([]byte, error) {
133 resp, err := rp.imageService.ImageStatus(context.Background(), &runtimeapi.ImageSpec{Image: image}, false)
134 if err == nil && resp.GetImage() != nil {
135 return nil, nil
136 }
137 _, err = rp.imageService.PullImage(context.Background(), &runtimeapi.ImageSpec{Image: image}, nil, nil)
138 return nil, err
139 }
140
141 func getPuller() (puller, error) {
142 _, is, err := getCRIClient()
143 if err != nil {
144 return nil, err
145 }
146 return &remotePuller{
147 imageService: is,
148 }, nil
149 }
150
151
152 func PrePullAllImages() error {
153 puller, err := getPuller()
154 if err != nil {
155 return err
156 }
157 usr, err := user.Current()
158 if err != nil {
159 return err
160 }
161 images := e2epod.ImagePrePullList.List()
162 klog.V(4).Infof("Pre-pulling images with %s %+v", puller.Name(), images)
163
164 imageCh := make(chan int, len(images))
165 for i := range images {
166 imageCh <- i
167 }
168 close(imageCh)
169
170 pullErrs := make([]error, len(images))
171 ctx, cancel := context.WithCancel(context.Background())
172 defer cancel()
173
174 parallelImagePullCount := maxParallelImagePullCount
175 if len(images) < parallelImagePullCount {
176 parallelImagePullCount = len(images)
177 }
178
179 var wg sync.WaitGroup
180 wg.Add(parallelImagePullCount)
181 for i := 0; i < parallelImagePullCount; i++ {
182 go func() {
183 defer wg.Done()
184
185 for i := range imageCh {
186 var (
187 pullErr error
188 output []byte
189 )
190 for retryCount := 0; retryCount < maxImagePullRetries; retryCount++ {
191 select {
192 case <-ctx.Done():
193 return
194 default:
195 }
196
197 if retryCount > 0 {
198 time.Sleep(imagePullRetryDelay)
199 }
200 if output, pullErr = puller.Pull(images[i]); pullErr == nil {
201 break
202 }
203 klog.Warningf("Failed to pull %s as user %q, retrying in %s (%d of %d): %v",
204 images[i], usr.Username, imagePullRetryDelay.String(), retryCount+1, maxImagePullRetries, pullErr)
205 }
206 if pullErr != nil {
207 klog.Warningf("Could not pre-pull image %s %v output: %s", images[i], pullErr, output)
208 pullErrs[i] = pullErr
209 cancel()
210 return
211 }
212 }
213 }()
214 }
215
216 wg.Wait()
217 return utilerrors.NewAggregate(pullErrs)
218 }
219
220
221 func getGPUDevicePluginImage(ctx context.Context) (string, error) {
222 ds, err := e2emanifest.DaemonSetFromURL(ctx, e2egpu.GPUDevicePluginDSYAML)
223 if err != nil {
224 return "", fmt.Errorf("failed to parse the device plugin image: %w", err)
225 }
226 if ds == nil {
227 return "", fmt.Errorf("failed to parse the device plugin image: the extracted DaemonSet is nil")
228 }
229 if len(ds.Spec.Template.Spec.Containers) < 1 {
230 return "", fmt.Errorf("failed to parse the device plugin image: cannot extract the container from YAML")
231 }
232 return ds.Spec.Template.Spec.Containers[0].Image, nil
233 }
234
235 func getContainerImageFromE2ETestDaemonset(dsYamlPath string) (string, error) {
236 data, err := e2etestfiles.Read(dsYamlPath)
237 if err != nil {
238 return "", fmt.Errorf("failed to read the daemonset yaml: %w", err)
239 }
240
241 ds, err := e2emanifest.DaemonSetFromData(data)
242 if err != nil {
243 return "", fmt.Errorf("failed to parse daemonset yaml: %w", err)
244 }
245
246 if len(ds.Spec.Template.Spec.Containers) < 1 {
247 return "", fmt.Errorf("failed to parse the container image: cannot extract the container from YAML")
248 }
249 return ds.Spec.Template.Spec.Containers[0].Image, nil
250 }
251
252
253 func getSRIOVDevicePluginImage() (string, error) {
254 data, err := e2etestfiles.Read(SRIOVDevicePluginDSYAML)
255 if err != nil {
256 return "", fmt.Errorf("failed to read the device plugin manifest: %w", err)
257 }
258 ds, err := e2emanifest.DaemonSetFromData(data)
259 if err != nil {
260 return "", fmt.Errorf("failed to parse the device plugin image: %w", err)
261 }
262 if ds == nil {
263 return "", fmt.Errorf("failed to parse the device plugin image: the extracted DaemonSet is nil")
264 }
265 if len(ds.Spec.Template.Spec.Containers) < 1 {
266 return "", fmt.Errorf("failed to parse the device plugin image: cannot extract the container from YAML")
267 }
268 return ds.Spec.Template.Spec.Containers[0].Image, nil
269 }
270
View as plain text