1
16
17 package logs
18
19 import (
20 "bufio"
21 "bytes"
22 "context"
23 "encoding/json"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "os"
29 "path/filepath"
30 "time"
31
32 "github.com/fsnotify/fsnotify"
33 "k8s.io/klog/v2"
34
35 v1 "k8s.io/api/core/v1"
36 internalapi "k8s.io/cri-api/pkg/apis"
37 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
38 "k8s.io/kubernetes/pkg/kubelet/cri/remote"
39 "k8s.io/kubernetes/pkg/kubelet/types"
40 "k8s.io/kubernetes/pkg/util/tail"
41 )
42
43
44
45
46
47
48
49
50
51 const (
52
53 timeFormatOut = types.RFC3339NanoFixed
54
55 timeFormatIn = types.RFC3339NanoLenient
56
57
58 logForceCheckPeriod = 1 * time.Second
59 )
60
61 var (
62
63 eol = []byte{'\n'}
64
65 delimiter = []byte{' '}
66
67 tagDelimiter = []byte(runtimeapi.LogTagDelimiter)
68 )
69
70
71 type logMessage struct {
72 timestamp time.Time
73 stream runtimeapi.LogStreamType
74 log []byte
75 }
76
77
78 func (l *logMessage) reset() {
79 l.timestamp = time.Time{}
80 l.stream = ""
81 l.log = nil
82 }
83
84
85 type LogOptions struct {
86 tail int64
87 bytes int64
88 since time.Time
89 follow bool
90 timestamp bool
91 }
92
93
94 func NewLogOptions(apiOpts *v1.PodLogOptions, now time.Time) *LogOptions {
95 opts := &LogOptions{
96 tail: -1,
97 bytes: -1,
98 follow: apiOpts.Follow,
99 timestamp: apiOpts.Timestamps,
100 }
101 if apiOpts.TailLines != nil {
102 opts.tail = *apiOpts.TailLines
103 }
104 if apiOpts.LimitBytes != nil {
105 opts.bytes = *apiOpts.LimitBytes
106 }
107 if apiOpts.SinceSeconds != nil {
108 opts.since = now.Add(-time.Duration(*apiOpts.SinceSeconds) * time.Second)
109 }
110 if apiOpts.SinceTime != nil && apiOpts.SinceTime.After(opts.since) {
111 opts.since = apiOpts.SinceTime.Time
112 }
113 return opts
114 }
115
116
117
118 type parseFunc func([]byte, *logMessage) error
119
120 var parseFuncs = []parseFunc{
121 parseCRILog,
122 parseDockerJSONLog,
123 }
124
125
126
127
128
129 func parseCRILog(log []byte, msg *logMessage) error {
130 var err error
131
132 idx := bytes.Index(log, delimiter)
133 if idx < 0 {
134 return fmt.Errorf("timestamp is not found")
135 }
136 msg.timestamp, err = time.Parse(timeFormatIn, string(log[:idx]))
137 if err != nil {
138 return fmt.Errorf("unexpected timestamp format %q: %v", timeFormatIn, err)
139 }
140
141
142 log = log[idx+1:]
143 idx = bytes.Index(log, delimiter)
144 if idx < 0 {
145 return fmt.Errorf("stream type is not found")
146 }
147 msg.stream = runtimeapi.LogStreamType(log[:idx])
148 if msg.stream != runtimeapi.Stdout && msg.stream != runtimeapi.Stderr {
149 return fmt.Errorf("unexpected stream type %q", msg.stream)
150 }
151
152
153 log = log[idx+1:]
154 idx = bytes.Index(log, delimiter)
155 if idx < 0 {
156 return fmt.Errorf("log tag is not found")
157 }
158
159 tags := bytes.Split(log[:idx], tagDelimiter)
160 partial := runtimeapi.LogTag(tags[0]) == runtimeapi.LogTagPartial
161
162 if partial && len(log) > 0 && log[len(log)-1] == '\n' {
163 log = log[:len(log)-1]
164 }
165
166
167 msg.log = log[idx+1:]
168
169 return nil
170 }
171
172
173
174
175 type jsonLog struct {
176
177 Log string `json:"log,omitempty"`
178
179 Stream string `json:"stream,omitempty"`
180
181 Created time.Time `json:"time"`
182 }
183
184
185
186
187
188
189 func parseDockerJSONLog(log []byte, msg *logMessage) error {
190 var l = &jsonLog{}
191
192
193 if err := json.Unmarshal(log, l); err != nil {
194 return fmt.Errorf("failed with %v to unmarshal log %q", err, l)
195 }
196 msg.timestamp = l.Created
197 msg.stream = runtimeapi.LogStreamType(l.Stream)
198 msg.log = []byte(l.Log)
199 return nil
200 }
201
202
203 func getParseFunc(log []byte) (parseFunc, error) {
204 for _, p := range parseFuncs {
205 if err := p(log, &logMessage{}); err == nil {
206 return p, nil
207 }
208 }
209 return nil, fmt.Errorf("unsupported log format: %q", log)
210 }
211
212
213 type logWriter struct {
214 stdout io.Writer
215 stderr io.Writer
216 opts *LogOptions
217 remain int64
218 }
219
220
221 var errMaximumWrite = errors.New("maximum write")
222
223
224 var errShortWrite = errors.New("short write")
225
226 func newLogWriter(stdout io.Writer, stderr io.Writer, opts *LogOptions) *logWriter {
227 w := &logWriter{
228 stdout: stdout,
229 stderr: stderr,
230 opts: opts,
231 remain: math.MaxInt64,
232 }
233 if opts.bytes >= 0 {
234 w.remain = opts.bytes
235 }
236 return w
237 }
238
239
240 func (w *logWriter) write(msg *logMessage, addPrefix bool) error {
241 if msg.timestamp.Before(w.opts.since) {
242
243 return nil
244 }
245 line := msg.log
246 if w.opts.timestamp && addPrefix {
247 prefix := append([]byte(msg.timestamp.Format(timeFormatOut)), delimiter[0])
248 line = append(prefix, line...)
249 }
250
251 if int64(len(line)) > w.remain {
252 line = line[:w.remain]
253 }
254
255 var stream io.Writer
256 switch msg.stream {
257 case runtimeapi.Stdout:
258 stream = w.stdout
259 case runtimeapi.Stderr:
260 stream = w.stderr
261 default:
262 return fmt.Errorf("unexpected stream type %q", msg.stream)
263 }
264 n, err := stream.Write(line)
265 w.remain -= int64(n)
266 if err != nil {
267 return err
268 }
269
270 if n < len(line) {
271 return errShortWrite
272 }
273
274 if w.remain <= 0 {
275 return errMaximumWrite
276 }
277 return nil
278 }
279
280
281
282
283 func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error {
284
285
286
287
288
289 evaluated, err := filepath.EvalSymlinks(path)
290 if err != nil {
291 return fmt.Errorf("failed to try resolving symlinks in path %q: %v", path, err)
292 }
293 path = evaluated
294 f, err := os.Open(path)
295 if err != nil {
296 return fmt.Errorf("failed to open log file %q: %v", path, err)
297 }
298 defer f.Close()
299
300
301 start, err := tail.FindTailLineStartIndex(f, opts.tail)
302 if err != nil {
303 return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err)
304 }
305 if _, err := f.Seek(start, io.SeekStart); err != nil {
306 return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err)
307 }
308
309 limitedMode := (opts.tail >= 0) && (!opts.follow)
310 limitedNum := opts.tail
311
312 r := bufio.NewReader(f)
313
314 var watcher *fsnotify.Watcher
315 var parse parseFunc
316 var stop bool
317 isNewLine := true
318 found := true
319 writer := newLogWriter(stdout, stderr, opts)
320 msg := &logMessage{}
321 baseName := filepath.Base(path)
322 dir := filepath.Dir(path)
323 for {
324 if stop || (limitedMode && limitedNum == 0) {
325 klog.V(2).InfoS("Finished parsing log file", "path", path)
326 return nil
327 }
328 l, err := r.ReadBytes(eol[0])
329 if err != nil {
330 if err != io.EOF {
331 return fmt.Errorf("failed to read log file %q: %v", path, err)
332 }
333 if opts.follow {
334
335 if !found {
336 return nil
337 }
338
339
340 if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
341 return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
342 }
343 if watcher == nil {
344
345 if watcher, err = fsnotify.NewWatcher(); err != nil {
346 return fmt.Errorf("failed to create fsnotify watcher: %v", err)
347 }
348 defer watcher.Close()
349 if err := watcher.Add(dir); err != nil {
350 return fmt.Errorf("failed to watch directory %q: %w", dir, err)
351 }
352
353
354 continue
355 }
356 var recreated bool
357
358 found, recreated, err = waitLogs(ctx, containerID, baseName, watcher, runtimeService)
359 if err != nil {
360 return err
361 }
362 if recreated {
363 newF, err := os.Open(path)
364 if err != nil {
365 if os.IsNotExist(err) {
366 continue
367 }
368 return fmt.Errorf("failed to open log file %q: %v", path, err)
369 }
370 defer newF.Close()
371 f.Close()
372 f = newF
373 r = bufio.NewReader(f)
374 }
375
376 continue
377 }
378
379 stop = true
380 if len(l) == 0 {
381 continue
382 }
383 klog.InfoS("Incomplete line in log file", "path", path, "line", l)
384 }
385 if parse == nil {
386
387 parse, err = getParseFunc(l)
388 if err != nil {
389 return fmt.Errorf("failed to get parse function: %v", err)
390 }
391 }
392
393 msg.reset()
394 if err := parse(l, msg); err != nil {
395 klog.ErrorS(err, "Failed when parsing line in log file", "path", path, "line", l)
396 continue
397 }
398
399 if err := writer.write(msg, isNewLine); err != nil {
400 if err == errMaximumWrite {
401 klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
402 return nil
403 }
404 klog.ErrorS(err, "Failed when writing line to log file", "path", path, "line", msg)
405 return err
406 }
407 if limitedMode {
408 limitedNum--
409 }
410 if len(msg.log) > 0 {
411 isNewLine = msg.log[len(msg.log)-1] == eol[0]
412 } else {
413 isNewLine = true
414 }
415 }
416 }
417
418 func isContainerRunning(ctx context.Context, id string, r internalapi.RuntimeService) (bool, error) {
419 resp, err := r.ContainerStatus(ctx, id, false)
420 if err != nil {
421 return false, err
422 }
423 status := resp.GetStatus()
424 if status == nil {
425 return false, remote.ErrContainerStatusNil
426 }
427
428 if status.State != runtimeapi.ContainerState_CONTAINER_RUNNING {
429 klog.V(5).InfoS("Container is not running", "containerId", id, "state", status.State)
430
431
432 return false, nil
433 }
434 return true, nil
435 }
436
437
438
439
440 func waitLogs(ctx context.Context, id string, logName string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) {
441
442 if running, err := isContainerRunning(ctx, id, runtimeService); !running {
443 return false, false, err
444 }
445 errRetry := 5
446 for {
447 select {
448 case <-ctx.Done():
449 return false, false, fmt.Errorf("context cancelled")
450 case e := <-w.Events:
451 switch e.Op {
452 case fsnotify.Write, fsnotify.Rename, fsnotify.Remove, fsnotify.Chmod:
453 return true, false, nil
454 case fsnotify.Create:
455 return true, filepath.Base(e.Name) == logName, nil
456 default:
457 klog.ErrorS(nil, "Received unexpected fsnotify event, retrying", "event", e)
458 }
459 case err := <-w.Errors:
460 klog.ErrorS(err, "Received fsnotify watch error, retrying unless no more retries left", "retries", errRetry)
461 if errRetry == 0 {
462 return false, false, err
463 }
464 errRetry--
465 case <-time.After(logForceCheckPeriod):
466 return true, false, nil
467 }
468 }
469 }
470
View as plain text