...
1import tempfile
2
3from tests.runutils import run_with_retry
4
5
6def meta_action_kube_artifacts(namespace, artifacts, action, retries=0):
7 temp_file = tempfile.NamedTemporaryFile()
8 temp_file.write(artifacts.encode())
9 temp_file.flush()
10
11 command = ["tools/bin/kubectl", action, "-f", temp_file.name]
12 if namespace is None:
13 namespace = "default"
14
15 if namespace is not None:
16 command.extend(["-n", namespace])
17
18 run_with_retry(command, retries=retries)
19 temp_file.close()
20
21
22def apply_kube_artifacts(namespace, artifacts):
23 meta_action_kube_artifacts(namespace=namespace, artifacts=artifacts, action="apply", retries=1)
24
25
26def delete_kube_artifacts(namespace, artifacts):
27 meta_action_kube_artifacts(namespace=namespace, artifacts=artifacts, action="delete")
View as plain text