1
16
17
18
19
20
21
22
23 package podlogs
24
25 import (
26 "bufio"
27 "bytes"
28 "context"
29 "fmt"
30 "io"
31 "os"
32 "path"
33 "regexp"
34 "strings"
35 "sync"
36 "time"
37
38 v1 "k8s.io/api/core/v1"
39 meta "k8s.io/apimachinery/pkg/apis/meta/v1"
40 clientset "k8s.io/client-go/kubernetes"
41 )
42
43
44 type LogOutput struct {
45
46 StatusWriter io.Writer
47
48
49 LogWriter io.Writer
50
51
52
53 LogPathPrefix string
54 }
55
56
57 var expectedErrors = regexp.MustCompile(`container .* in pod .* is (terminated|waiting to start|not available)|the server could not find the requested resource`)
58
59
60 func CopyAllLogs(ctx context.Context, cs clientset.Interface, ns string, to LogOutput) error {
61 return CopyPodLogs(ctx, cs, ns, "", to)
62 }
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string, to LogOutput) error {
89 options := meta.ListOptions{}
90 if podName != "" {
91 options = meta.ListOptions{
92 FieldSelector: fmt.Sprintf("metadata.name=%s", podName),
93 }
94 }
95 watcher, err := cs.CoreV1().Pods(ns).Watch(context.TODO(), options)
96
97 if err != nil {
98 return fmt.Errorf("cannot create Pod event watcher: %w", err)
99 }
100
101 go func() {
102 var m sync.Mutex
103
104 active := map[string]bool{}
105
106 started := map[string]bool{}
107
108 check := func() {
109 m.Lock()
110 defer m.Unlock()
111
112 pods, err := cs.CoreV1().Pods(ns).List(context.TODO(), options)
113 if err != nil {
114 if to.StatusWriter != nil {
115 fmt.Fprintf(to.StatusWriter, "ERROR: get pod list in %s: %s\n", ns, err)
116 }
117 return
118 }
119
120 for _, pod := range pods.Items {
121 for i, c := range pod.Spec.Containers {
122
123 if len(pod.Status.ContainerStatuses) <= i {
124 continue
125 }
126 name := pod.ObjectMeta.Name + "/" + c.Name
127 id := name + "/" + pod.Status.ContainerStatuses[i].ContainerID
128 if active[name] ||
129
130
131 (pod.Status.ContainerStatuses[i].State.Terminated != nil &&
132 started[id]) ||
133
134
135 (pod.DeletionTimestamp != nil && started[id]) ||
136
137
138 (pod.Status.ContainerStatuses[i].State.Running == nil &&
139 pod.Status.ContainerStatuses[i].State.Terminated == nil) {
140 continue
141 }
142 readCloser, err := logsForPod(ctx, cs, ns, pod.ObjectMeta.Name,
143 &v1.PodLogOptions{
144 Container: c.Name,
145 Follow: true,
146 })
147 if err != nil {
148
149
150 if to.StatusWriter != nil &&
151 expectedErrors.FindStringIndex(err.Error()) == nil {
152 fmt.Fprintf(to.StatusWriter, "WARNING: pod log: %s: %s\n", name, err)
153 }
154 continue
155 }
156
157
158
159
160 var out io.Writer
161 var closer io.Closer
162 var prefix string
163 if to.LogWriter != nil {
164 out = to.LogWriter
165 nodeName := pod.Spec.NodeName
166 if len(nodeName) > 10 {
167 nodeName = nodeName[0:4] + ".." + nodeName[len(nodeName)-4:]
168 }
169 prefix = name + "@" + nodeName + ": "
170 } else {
171 var err error
172 filename := to.LogPathPrefix + pod.ObjectMeta.Name + "-" + c.Name + ".log"
173 err = os.MkdirAll(path.Dir(filename), 0755)
174 if err != nil {
175 if to.StatusWriter != nil {
176 fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create directory for %s: %s\n", filename, err)
177 }
178 return
179 }
180
181
182 file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
183 if err != nil {
184 if to.StatusWriter != nil {
185 fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create file %s: %s\n", filename, err)
186 }
187 return
188 }
189 closer = file
190 out = file
191 }
192 go func() {
193 if closer != nil {
194 defer closer.Close()
195 }
196 first := true
197 defer func() {
198 m.Lock()
199
200 if !first {
201 if prefix != "" {
202 fmt.Fprintf(out, "%s==== end of pod log ====\n", prefix)
203 } else {
204 fmt.Fprintf(out, "==== end of pod log for container %s ====\n", name)
205 }
206 }
207 active[name] = false
208 m.Unlock()
209 readCloser.Close()
210 }()
211 scanner := bufio.NewScanner(readCloser)
212 for scanner.Scan() {
213 line := scanner.Text()
214
215
216
217
218 if !strings.HasPrefix(line, "rpc error: code = Unknown desc = Error: No such container:") &&
219 !strings.HasPrefix(line, "unable to retrieve container logs for ") &&
220 !strings.HasPrefix(line, "Unable to retrieve container logs for ") {
221 if first {
222
223
224
225 if prefix == "" {
226 fmt.Fprintf(out, "==== start of pod log for container %s ====\n", name)
227 } else {
228 fmt.Fprintf(out, "%s==== start of pod log ====\n", prefix)
229 }
230 first = false
231 }
232 fmt.Fprintf(out, "%s%s\n", prefix, line)
233 }
234 }
235 }()
236 active[name] = true
237 started[id] = true
238 }
239 }
240 }
241
242
243
244 check()
245 for {
246 select {
247 case <-watcher.ResultChan():
248 check()
249 case <-ctx.Done():
250 return
251 }
252 }
253 }()
254
255 return nil
256 }
257
258
259
260
261
262
263
264
265 func logsForPod(ctx context.Context, cs clientset.Interface, ns, pod string, opts *v1.PodLogOptions) (io.ReadCloser, error) {
266 return cs.CoreV1().Pods(ns).GetLogs(pod, opts).Stream(ctx)
267 }
268
269
270
271
272 func WatchPods(ctx context.Context, cs clientset.Interface, ns string, to io.Writer, toCloser io.Closer) (finalErr error) {
273 defer func() {
274 if finalErr != nil && toCloser != nil {
275 toCloser.Close()
276 }
277 }()
278
279 pods, err := cs.CoreV1().Pods(ns).Watch(context.Background(), meta.ListOptions{})
280 if err != nil {
281 return fmt.Errorf("cannot create Pod watcher: %w", err)
282 }
283 defer func() {
284 if finalErr != nil {
285 pods.Stop()
286 }
287 }()
288
289 events, err := cs.CoreV1().Events(ns).Watch(context.Background(), meta.ListOptions{})
290 if err != nil {
291 return fmt.Errorf("cannot create Event watcher: %w", err)
292 }
293
294 go func() {
295 defer func() {
296 pods.Stop()
297 events.Stop()
298 if toCloser != nil {
299 toCloser.Close()
300 }
301 }()
302 timeFormat := "15:04:05.000"
303 for {
304 select {
305 case e := <-pods.ResultChan():
306 if e.Object == nil {
307 continue
308 }
309
310 pod, ok := e.Object.(*v1.Pod)
311 if !ok {
312 continue
313 }
314 buffer := new(bytes.Buffer)
315 fmt.Fprintf(buffer,
316 "%s pod: %s: %s/%s %s: %s %s\n",
317 time.Now().Format(timeFormat),
318 e.Type,
319 pod.Namespace,
320 pod.Name,
321 pod.Status.Phase,
322 pod.Status.Reason,
323 pod.Status.Conditions,
324 )
325 for _, cst := range pod.Status.ContainerStatuses {
326 fmt.Fprintf(buffer, " %s: ", cst.Name)
327 if cst.State.Waiting != nil {
328 fmt.Fprintf(buffer, "WAITING: %s - %s",
329 cst.State.Waiting.Reason,
330 cst.State.Waiting.Message,
331 )
332 } else if cst.State.Running != nil {
333 fmt.Fprintf(buffer, "RUNNING")
334 } else if cst.State.Terminated != nil {
335 fmt.Fprintf(buffer, "TERMINATED: %s - %s",
336 cst.State.Terminated.Reason,
337 cst.State.Terminated.Message,
338 )
339 }
340 fmt.Fprintf(buffer, "\n")
341 }
342 to.Write(buffer.Bytes())
343 case e := <-events.ResultChan():
344 if e.Object == nil {
345 continue
346 }
347
348 event, ok := e.Object.(*v1.Event)
349 if !ok {
350 continue
351 }
352 to.Write([]byte(fmt.Sprintf("%s event: %s/%s %s: %s %s: %s (%v - %v)\n",
353 time.Now().Format(timeFormat),
354 event.InvolvedObject.APIVersion,
355 event.InvolvedObject.Kind,
356 event.InvolvedObject.Name,
357 event.Source.Component,
358 event.Type,
359 event.Message,
360 event.FirstTimestamp,
361 event.LastTimestamp,
362 )))
363 case <-ctx.Done():
364 to.Write([]byte(fmt.Sprintf("%s ==== stopping pod watch ====\n",
365 time.Now().Format(timeFormat))))
366 return
367 }
368 }
369 }()
370
371 return nil
372 }
373
View as plain text