1
2
3 package k8s
4
5 import (
6 "context"
7 "fmt"
8 "io"
9 "os"
10 "strings"
11
12 corev1 "k8s.io/api/core/v1"
13 rbacv1 "k8s.io/api/rbac/v1"
14 "k8s.io/apimachinery/pkg/api/errors"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/client-go/kubernetes"
17 "k8s.io/client-go/rest"
18 "sigs.k8s.io/controller-runtime/pkg/client"
19 "sigs.k8s.io/controller-runtime/pkg/manager"
20
21 konfigkonnector "edge-infra.dev/pkg/k8s/konfigkonnector"
22 configconnector "edge-infra.dev/pkg/k8s/konfigkonnector/apis/configconnector/v1beta1"
23 "edge-infra.dev/pkg/k8s/unstructured"
24
25 "edge-infra.dev/pkg/k8s/runtime/sap"
26 "edge-infra.dev/pkg/k8s/runtime/sap/install"
27 "edge-infra.dev/pkg/lib/gcp/iam"
28 "edge-infra.dev/test/framework"
29 "edge-infra.dev/test/framework/gcp"
30 "edge-infra.dev/test/framework/integration"
31 "edge-infra.dev/third_party/k8s/certmanager"
32 )
33
34
35
36 type K8s struct {
37 skipNamespaceCreation bool
38 cfg *rest.Config
39
40
41
42
43
44 mgr manager.Manager
45
46
47 mgrCancel context.CancelFunc
48
49 kfgkonnector bool
50 certmgr bool
51
52 Namespace string
53 Client client.Client
54 }
55
56
57
58
59 type Option func(*K8s)
60
61
62
63
64 func SkipNamespaceCreation() Option {
65 return func(k8s *K8s) {
66 k8s.skipNamespaceCreation = true
67 }
68 }
69
70
71
72 func WithKonfigKonnector() Option {
73 return func(k *K8s) {
74 k.kfgkonnector = true
75 }
76 }
77
78
79 func WithCertManager() Option {
80 return func(k *K8s) {
81 k.certmgr = true
82 }
83 }
84
85
86 func WithCtrlManager(mgr manager.Manager) Option {
87 return func(k *K8s) {
88 k.mgr = mgr
89 }
90 }
91
92
93 func New(cfg *rest.Config, opts ...Option) *K8s {
94 k8s := &K8s{
95 skipNamespaceCreation: false,
96 cfg: cfg,
97 }
98
99 for _, opt := range opts {
100 opt(k8s)
101 }
102
103 return k8s
104 }
105
106
107
108 func (k *K8s) SetupWithFramework(f *framework.Framework) {
109 k.setClient(f)
110
111
112 if integration.Only(k.kfgkonnector) {
113 f.Setup(k.setupKonfigConnector)
114 }
115 if integration.Only(k.certmgr) {
116 f.Setup(k.setupCertManager)
117 }
118 if k.mgr != nil {
119 f.Setup(k.startManager)
120 }
121
122
123 f.Label("k8s", "true").
124 Setup(k.setup).
125 Teardown(k.teardown).
126 BeforeEachTest(k.beforeEach)
127 }
128
129 func (k *K8s) RESTConfig() *rest.Config {
130 return k.cfg
131 }
132
133
134
135
136 func (k *K8s) setupKonfigConnector(f *framework.Framework) {
137
138
139 integration.Skip(NeedsKonfigKonnector)(f)
140
141
142 secret := "gcp-creds"
143 ctx := context.Background()
144 c, err := client.New(k.cfg, client.Options{Scheme: konfigkonnector.CreateScheme()})
145 if err != nil {
146 f.FailNow("failed to create k8s client", err)
147 }
148
149 manifests, err := konfigkonnector.LoadManifests()
150 if err != nil {
151 f.FailNow("failed to load k8s cfg connector manifests", err)
152 }
153
154
155 if _, err := install.Install(ctx, k.cfg, manifests, FieldManagerOwner(f), InstallOpts()...); err != nil {
156 f.FailNow("failed to install k8s cfg connector", err)
157 }
158
159 cfgConn := &configconnector.ConfigConnector{}
160
161 if KonfigKonnector.ServiceAccount != "" {
162
163 cfgConn = configconnector.New(
164 configconnector.WithSvcAccount(
165 iam.SvcAccountEmail(KonfigKonnector.ServiceAccount, gcp.GCloud.ProjectID),
166 ),
167 )
168 }
169
170 if KonfigKonnector.APIKey != "" {
171 key, err := os.ReadFile(framework.ResolvePath(KonfigKonnector.APIKey))
172 if err != nil {
173 f.FailNow("failed to read k8s cfg connector api key", err)
174 }
175 if err := konfigkonnector.SetupCNRMSystem(ctx, c, secret, key); err != nil {
176 f.FailNow("failed to setup k8s cfg connector namespace", err)
177 }
178 cfgConn = configconnector.New(configconnector.WithCredentialsSecret(secret))
179 }
180
181 if err := c.Create(ctx, cfgConn); err != nil && !errors.IsAlreadyExists(err) {
182 f.FailNow("failed to create ConfigConnector object", err)
183 }
184
185 f.Eventually(func() bool {
186 _ = c.Get(ctx, client.ObjectKeyFromObject(cfgConn), cfgConn)
187 return cfgConn.IsHealthy()
188 }, Timeouts.DefaultTimeout, Timeouts.Tick, "ConfigConnector object didnt become ready in time")
189 }
190
191
192
193
194 func (k *K8s) setupCertManager(f *framework.Framework) {
195 ctx := context.Background()
196 manifests, err := certmanager.LoadManifests()
197 if err != nil {
198 f.FailNow("failed to load embedded certmanager manifests", err)
199 }
200
201
202 if _, err := install.Install(ctx, k.cfg, manifests, FieldManagerOwner(f), InstallOpts()...); err != nil {
203 f.FailNow("failed to install cert manager", err)
204 }
205 }
206
207
208
209
210 func (k *K8s) startManager(f *framework.Framework) {
211 ctx, cancel := context.WithCancel(context.TODO())
212 k.mgrCancel = cancel
213 go func() {
214 if err := k.mgr.Start(ctx); err != nil {
215 f.FailNow("failed to start manager", err)
216 }
217 }()
218 }
219
220
221 func (k *K8s) setClient(f *framework.Framework) {
222 if k.Client == nil {
223 opts := client.Options{}
224
225 if k.mgr != nil {
226 opts.Scheme = k.mgr.GetScheme()
227 }
228 c, err := client.New(k.cfg, opts)
229 if err != nil {
230 f.FailNow("failed to create client", err)
231 }
232 k.Client = c
233 }
234 }
235
236
237 func ProcessManifest(manifest *unstructured.Unstructured, namespace string) (err error) {
238 manifest.SetNamespace(namespace)
239 if manifest.GetKind() == "RoleBinding" {
240 err = processRoleBindings(manifest, namespace)
241 }
242 return err
243 }
244
245 func processRoleBindings(manifest *unstructured.Unstructured, namespace string) error {
246 var roleBinding rbacv1.RoleBinding
247 err := unstructured.FromUnstructured(manifest, &roleBinding)
248 if err != nil {
249 return err
250 }
251 for idx := range roleBinding.Subjects {
252 if roleBinding.Subjects[idx].Kind == "ServiceAccount" {
253 roleBinding.Subjects[idx].Namespace = namespace
254 }
255 }
256 updatedManifest, err := unstructured.ToUnstructured(&roleBinding)
257 if err != nil {
258 return err
259 }
260 *manifest = *updatedManifest
261 return nil
262 }
263
264
265 func (k *K8s) teardown(_ *framework.Framework) {
266 if k.mgr != nil {
267 k.mgrCancel()
268 }
269 }
270
271
272 func (k *K8s) setup(f *framework.Framework) {
273 if !k.skipNamespaceCreation {
274
275 k.Namespace = f.UniqueName
276
277 f.NoError(k.Client.Create(context.Background(), &corev1.Namespace{
278 ObjectMeta: metav1.ObjectMeta{
279 Name: k.Namespace,
280 Labels: map[string]string{
281 "edge-framework": f.BaseName,
282 },
283 },
284 }))
285 }
286 }
287
288
289
290
291 func (k *K8s) beforeEach(f *framework.Framework) {
292 k.setClient(f)
293 }
294
295
296
297
298 func FieldManagerOwner(f *framework.Framework) sap.Owner {
299 return sap.Owner{
300 Field: fmt.Sprintf("edge-framework-%s", f.BaseName),
301 Group: "edge-framework",
302 }
303 }
304
305
306
307 func InstallOpts() []install.Option {
308 return []install.Option{
309 install.WithForce(),
310 install.WithTimeout(Timeouts.DefaultTimeout),
311 }
312 }
313
314
315
316 func CreateTestPod(name string, namespace string, args []string, image string, registry ...string) *corev1.Pod {
317 var reg string
318 if registry == nil {
319 reg = "localhost:21700"
320 } else {
321 reg = strings.Trim(registry[0], "/")
322 }
323 image = strings.Trim(image, "/")
324 return &corev1.Pod{
325 ObjectMeta: metav1.ObjectMeta{
326 Name: name,
327 Namespace: namespace,
328 },
329 Spec: corev1.PodSpec{
330 Containers: []corev1.Container{
331 {
332 Name: name,
333 Image: reg + "/" + image,
334 Args: args,
335 ImagePullPolicy: corev1.PullAlways,
336 },
337 },
338 RestartPolicy: corev1.RestartPolicyNever,
339 },
340 }
341 }
342
343
344 func (k *K8s) FindPodLogs(ctx context.Context, podname string, namespace string) (io.ReadCloser, error) {
345 clientset, err := kubernetes.NewForConfig(k.cfg)
346 if err != nil {
347 return nil, err
348 }
349
350 podLogOptions := corev1.PodLogOptions{}
351
352 req := clientset.CoreV1().Pods(namespace).GetLogs(podname, &podLogOptions)
353 return req.Stream(ctx)
354 }
355
View as plain text