1 package kubeapply
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "path/filepath"
9 "sort"
10 "strings"
11 "syscall"
12 "time"
13
14 "github.com/datawire/dlib/derror"
15 "github.com/datawire/dlib/dexec"
16 "github.com/datawire/dlib/dlog"
17 "github.com/emissary-ingress/emissary/v3/pkg/k8s"
18 )
19
20
21
22 var errorDeadlineExceeded = errors.New("timeout exceeded")
23
24
25
26
27
28
29 func Kubeapply(ctx context.Context, kubeinfo *k8s.KubeInfo, perPhaseTimeout time.Duration, debug, dryRun bool, files ...string) error {
30 collection, err := CollectYAML(files...)
31 if err != nil {
32 return fmt.Errorf("CollectYAML: %w", err)
33 }
34
35 if err = collection.ApplyAndWait(ctx, kubeinfo, perPhaseTimeout, debug, dryRun); err != nil {
36 return fmt.Errorf("ApplyAndWait: %w", err)
37 }
38
39 return nil
40 }
41
42
43 type YAMLCollection map[string][]string
44
45
46
47 func CollectYAML(paths ...string) (YAMLCollection, error) {
48 ret := make(YAMLCollection)
49 for _, path := range paths {
50 err := filepath.Walk(path, func(filename string, fileinfo os.FileInfo, err error) error {
51 if err != nil {
52 return err
53 }
54 if fileinfo.IsDir() {
55 return nil
56 }
57
58 if strings.HasSuffix(filename, ".yaml") {
59 ret.addFile(filename)
60 }
61 return nil
62 })
63 if err != nil {
64 return nil, err
65 }
66 }
67 return ret, nil
68 }
69
70 func hasNumberPrefix(filepart string) bool {
71 if len(filepart) < 3 {
72 return false
73 }
74 return '0' <= filepart[0] && filepart[0] <= '9' &&
75 '0' <= filepart[1] && filepart[1] <= '9' &&
76 filepart[2] == '-'
77 }
78
79 func (collection YAMLCollection) addFile(path string) {
80 _, notdir := filepath.Split(path)
81 phaseName := "last"
82 if hasNumberPrefix(notdir) {
83 phaseName = notdir[:2]
84 }
85
86 collection[phaseName] = append(collection[phaseName], path)
87 }
88
89
90
91
92
93 func (collection YAMLCollection) ApplyAndWait(
94 ctx context.Context,
95 kubeinfo *k8s.KubeInfo,
96 perPhaseTimeout time.Duration,
97 debug, dryRun bool,
98 ) error {
99 if kubeinfo == nil {
100 kubeinfo = k8s.NewKubeInfo("", "", "")
101 }
102
103 phaseNames := make([]string, 0, len(collection))
104 for phaseName := range collection {
105 phaseNames = append(phaseNames, phaseName)
106 }
107 sort.Strings(phaseNames)
108
109 for _, phaseName := range phaseNames {
110 deadline := time.Now().Add(perPhaseTimeout)
111 err := applyAndWait(ctx, kubeinfo, deadline, debug, dryRun, collection[phaseName])
112 if err != nil {
113 if errors.Is(err, errorDeadlineExceeded) {
114 err = fmt.Errorf("phase %q not ready after %v: %w", phaseName, perPhaseTimeout, err)
115 }
116 return err
117 }
118 }
119 return nil
120 }
121
122 func applyAndWait(ctx context.Context, kubeinfo *k8s.KubeInfo, deadline time.Time, debug, dryRun bool, sourceFilenames []string) error {
123 expandedFilenames, err := expand(ctx, sourceFilenames)
124 if err != nil {
125 return fmt.Errorf("expanding YAML: %w", err)
126 }
127
128 cli, err := k8s.NewClient(kubeinfo)
129 if err != nil {
130 return fmt.Errorf("connecting to cluster %v: %w", kubeinfo, err)
131 }
132 waiter, err := NewWaiter(cli.Watcher())
133 if err != nil {
134 return err
135 }
136
137 valid := make(map[string]bool)
138 var scanErrs derror.MultiError
139 for _, filename := range expandedFilenames {
140 valid[filename] = true
141 if err := waiter.Scan(ctx, filename); err != nil {
142 scanErrs = append(scanErrs, fmt.Errorf("watch %q: %w", filename, err))
143 valid[filename] = false
144 }
145 }
146 if !debug {
147
148
149 defer func() {
150 for _, filename := range expandedFilenames {
151 if valid[filename] {
152 if err := os.Remove(filename); err != nil {
153
154
155
156 dlog.Error(ctx, err)
157 }
158 }
159 }
160 }()
161 }
162 if len(scanErrs) > 0 {
163 return fmt.Errorf("waiter: %w", scanErrs)
164 }
165
166 if err := kubectlApply(ctx, kubeinfo, dryRun, expandedFilenames); err != nil {
167 return err
168 }
169
170 finished, err := waiter.Wait(ctx, deadline)
171 if err != nil {
172 return err
173 }
174 if !finished {
175 return errorDeadlineExceeded
176 }
177
178 return nil
179 }
180
181 func expand(ctx context.Context, names []string) ([]string, error) {
182 dlog.Printf(ctx, "expanding %s\n", strings.Join(names, " "))
183 var result []string
184 for _, n := range names {
185 resources, err := LoadResources(ctx, n)
186 if err != nil {
187 return nil, err
188 }
189 out := n + ".o"
190 err = SaveResources(out, resources)
191 if err != nil {
192 return nil, err
193 }
194 result = append(result, out)
195 }
196 return result, nil
197 }
198
199 func kubectlApply(ctx context.Context, info *k8s.KubeInfo, dryRun bool, filenames []string) error {
200 args := []string{"apply"}
201 if dryRun {
202 args = append(args, "--dry-run")
203 }
204 for _, filename := range filenames {
205
206 filehandle, err := os.Open(filename)
207 if err != nil {
208 return err
209 }
210 defer filehandle.Close()
211 if err := syscall.Flock(int(filehandle.Fd()), syscall.LOCK_EX); err != nil {
212 return err
213 }
214 args = append(args, "-f", filename)
215 }
216 kargs, err := info.GetKubectlArray(args...)
217 if err != nil {
218 return err
219 }
220 dlog.Printf(ctx, "kubectl %s\n", strings.Join(kargs, " "))
221
222 if err := dexec.CommandContext(ctx, "kubectl", kargs...).Run(); err != nil {
223 return err
224 }
225
226 return nil
227 }
228
View as plain text