...

Source file src/k8s.io/kubernetes/pkg/kubelet/kuberuntime/logs/logs.go

Documentation: k8s.io/kubernetes/pkg/kubelet/kuberuntime/logs

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package logs
    18  
    19  import (
    20  	"bufio"
    21  	"bytes"
    22  	"context"
    23  	"encoding/json"
    24  	"errors"
    25  	"fmt"
    26  	"io"
    27  	"math"
    28  	"os"
    29  	"path/filepath"
    30  	"time"
    31  
    32  	"github.com/fsnotify/fsnotify"
    33  	"k8s.io/klog/v2"
    34  
    35  	v1 "k8s.io/api/core/v1"
    36  	internalapi "k8s.io/cri-api/pkg/apis"
    37  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    38  	"k8s.io/kubernetes/pkg/kubelet/cri/remote"
    39  	"k8s.io/kubernetes/pkg/kubelet/types"
    40  	"k8s.io/kubernetes/pkg/util/tail"
    41  )
    42  
    43  // Notice that the current CRI logs implementation doesn't handle
    44  // log rotation.
    45  // * It will not retrieve logs in rotated log file.
    46  // * If log rotation happens when following the log:
    47  //   * If the rotation is using create mode, we'll still follow the old file.
    48  //   * If the rotation is using copytruncate, we'll be reading at the original position and get nothing.
    49  // TODO(random-liu): Support log rotation.
    50  
    51  const (
    52  	// timeFormatOut is the format for writing timestamps to output.
    53  	timeFormatOut = types.RFC3339NanoFixed
    54  	// timeFormatIn is the format for parsing timestamps from other logs.
    55  	timeFormatIn = types.RFC3339NanoLenient
    56  
    57  	// logForceCheckPeriod is the period to check for a new read
    58  	logForceCheckPeriod = 1 * time.Second
    59  )
    60  
    61  var (
    62  	// eol is the end-of-line sign in the log.
    63  	eol = []byte{'\n'}
    64  	// delimiter is the delimiter for timestamp and stream type in log line.
    65  	delimiter = []byte{' '}
    66  	// tagDelimiter is the delimiter for log tags.
    67  	tagDelimiter = []byte(runtimeapi.LogTagDelimiter)
    68  )
    69  
    70  // logMessage is the CRI internal log type.
    71  type logMessage struct {
    72  	timestamp time.Time
    73  	stream    runtimeapi.LogStreamType
    74  	log       []byte
    75  }
    76  
    77  // reset resets the log to nil.
    78  func (l *logMessage) reset() {
    79  	l.timestamp = time.Time{}
    80  	l.stream = ""
    81  	l.log = nil
    82  }
    83  
    84  // LogOptions is the CRI internal type of all log options.
    85  type LogOptions struct {
    86  	tail      int64
    87  	bytes     int64
    88  	since     time.Time
    89  	follow    bool
    90  	timestamp bool
    91  }
    92  
    93  // NewLogOptions convert the v1.PodLogOptions to CRI internal LogOptions.
    94  func NewLogOptions(apiOpts *v1.PodLogOptions, now time.Time) *LogOptions {
    95  	opts := &LogOptions{
    96  		tail:      -1, // -1 by default which means read all logs.
    97  		bytes:     -1, // -1 by default which means read all logs.
    98  		follow:    apiOpts.Follow,
    99  		timestamp: apiOpts.Timestamps,
   100  	}
   101  	if apiOpts.TailLines != nil {
   102  		opts.tail = *apiOpts.TailLines
   103  	}
   104  	if apiOpts.LimitBytes != nil {
   105  		opts.bytes = *apiOpts.LimitBytes
   106  	}
   107  	if apiOpts.SinceSeconds != nil {
   108  		opts.since = now.Add(-time.Duration(*apiOpts.SinceSeconds) * time.Second)
   109  	}
   110  	if apiOpts.SinceTime != nil && apiOpts.SinceTime.After(opts.since) {
   111  		opts.since = apiOpts.SinceTime.Time
   112  	}
   113  	return opts
   114  }
   115  
   116  // parseFunc is a function parsing one log line to the internal log type.
   117  // Notice that the caller must make sure logMessage is not nil.
   118  type parseFunc func([]byte, *logMessage) error
   119  
   120  var parseFuncs = []parseFunc{
   121  	parseCRILog,        // CRI log format parse function
   122  	parseDockerJSONLog, // Docker JSON log format parse function
   123  }
   124  
   125  // parseCRILog parses logs in CRI log format. CRI Log format example:
   126  //
   127  //	2016-10-06T00:17:09.669794202Z stdout P log content 1
   128  //	2016-10-06T00:17:09.669794203Z stderr F log content 2
   129  func parseCRILog(log []byte, msg *logMessage) error {
   130  	var err error
   131  	// Parse timestamp
   132  	idx := bytes.Index(log, delimiter)
   133  	if idx < 0 {
   134  		return fmt.Errorf("timestamp is not found")
   135  	}
   136  	msg.timestamp, err = time.Parse(timeFormatIn, string(log[:idx]))
   137  	if err != nil {
   138  		return fmt.Errorf("unexpected timestamp format %q: %v", timeFormatIn, err)
   139  	}
   140  
   141  	// Parse stream type
   142  	log = log[idx+1:]
   143  	idx = bytes.Index(log, delimiter)
   144  	if idx < 0 {
   145  		return fmt.Errorf("stream type is not found")
   146  	}
   147  	msg.stream = runtimeapi.LogStreamType(log[:idx])
   148  	if msg.stream != runtimeapi.Stdout && msg.stream != runtimeapi.Stderr {
   149  		return fmt.Errorf("unexpected stream type %q", msg.stream)
   150  	}
   151  
   152  	// Parse log tag
   153  	log = log[idx+1:]
   154  	idx = bytes.Index(log, delimiter)
   155  	if idx < 0 {
   156  		return fmt.Errorf("log tag is not found")
   157  	}
   158  	// Keep this forward compatible.
   159  	tags := bytes.Split(log[:idx], tagDelimiter)
   160  	partial := runtimeapi.LogTag(tags[0]) == runtimeapi.LogTagPartial
   161  	// Trim the tailing new line if this is a partial line.
   162  	if partial && len(log) > 0 && log[len(log)-1] == '\n' {
   163  		log = log[:len(log)-1]
   164  	}
   165  
   166  	// Get log content
   167  	msg.log = log[idx+1:]
   168  
   169  	return nil
   170  }
   171  
   172  // jsonLog is a log message, typically a single entry from a given log stream.
   173  // since the data structure is originally from docker, we should be careful to
   174  // with any changes to jsonLog
   175  type jsonLog struct {
   176  	// Log is the log message
   177  	Log string `json:"log,omitempty"`
   178  	// Stream is the log source
   179  	Stream string `json:"stream,omitempty"`
   180  	// Created is the created timestamp of log
   181  	Created time.Time `json:"time"`
   182  }
   183  
   184  // parseDockerJSONLog parses logs in Docker JSON log format. Docker JSON log format
   185  // example:
   186  //
   187  //	{"log":"content 1","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"}
   188  //	{"log":"content 2","stream":"stderr","time":"2016-10-20T18:39:20.57606444Z"}
   189  func parseDockerJSONLog(log []byte, msg *logMessage) error {
   190  	var l = &jsonLog{}
   191  
   192  	// TODO: JSON decoding is fairly expensive, we should evaluate this.
   193  	if err := json.Unmarshal(log, l); err != nil {
   194  		return fmt.Errorf("failed with %v to unmarshal log %q", err, l)
   195  	}
   196  	msg.timestamp = l.Created
   197  	msg.stream = runtimeapi.LogStreamType(l.Stream)
   198  	msg.log = []byte(l.Log)
   199  	return nil
   200  }
   201  
   202  // getParseFunc returns proper parse function based on the sample log line passed in.
   203  func getParseFunc(log []byte) (parseFunc, error) {
   204  	for _, p := range parseFuncs {
   205  		if err := p(log, &logMessage{}); err == nil {
   206  			return p, nil
   207  		}
   208  	}
   209  	return nil, fmt.Errorf("unsupported log format: %q", log)
   210  }
   211  
   212  // logWriter controls the writing into the stream based on the log options.
   213  type logWriter struct {
   214  	stdout io.Writer
   215  	stderr io.Writer
   216  	opts   *LogOptions
   217  	remain int64
   218  }
   219  
   220  // errMaximumWrite is returned when all bytes have been written.
   221  var errMaximumWrite = errors.New("maximum write")
   222  
   223  // errShortWrite is returned when the message is not fully written.
   224  var errShortWrite = errors.New("short write")
   225  
   226  func newLogWriter(stdout io.Writer, stderr io.Writer, opts *LogOptions) *logWriter {
   227  	w := &logWriter{
   228  		stdout: stdout,
   229  		stderr: stderr,
   230  		opts:   opts,
   231  		remain: math.MaxInt64, // initialize it as infinity
   232  	}
   233  	if opts.bytes >= 0 {
   234  		w.remain = opts.bytes
   235  	}
   236  	return w
   237  }
   238  
   239  // writeLogs writes logs into stdout, stderr.
   240  func (w *logWriter) write(msg *logMessage, addPrefix bool) error {
   241  	if msg.timestamp.Before(w.opts.since) {
   242  		// Skip the line because it's older than since
   243  		return nil
   244  	}
   245  	line := msg.log
   246  	if w.opts.timestamp && addPrefix {
   247  		prefix := append([]byte(msg.timestamp.Format(timeFormatOut)), delimiter[0])
   248  		line = append(prefix, line...)
   249  	}
   250  	// If the line is longer than the remaining bytes, cut it.
   251  	if int64(len(line)) > w.remain {
   252  		line = line[:w.remain]
   253  	}
   254  	// Get the proper stream to write to.
   255  	var stream io.Writer
   256  	switch msg.stream {
   257  	case runtimeapi.Stdout:
   258  		stream = w.stdout
   259  	case runtimeapi.Stderr:
   260  		stream = w.stderr
   261  	default:
   262  		return fmt.Errorf("unexpected stream type %q", msg.stream)
   263  	}
   264  	n, err := stream.Write(line)
   265  	w.remain -= int64(n)
   266  	if err != nil {
   267  		return err
   268  	}
   269  	// If the line has not been fully written, return errShortWrite
   270  	if n < len(line) {
   271  		return errShortWrite
   272  	}
   273  	// If there are no more bytes left, return errMaximumWrite
   274  	if w.remain <= 0 {
   275  		return errMaximumWrite
   276  	}
   277  	return nil
   278  }
   279  
   280  // ReadLogs read the container log and redirect into stdout and stderr.
   281  // Note that containerID is only needed when following the log, or else
   282  // just pass in empty string "".
   283  func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error {
   284  	// fsnotify has different behavior for symlinks in different platform,
   285  	// for example it follows symlink on Linux, but not on Windows,
   286  	// so we explicitly resolve symlinks before reading the logs.
   287  	// There shouldn't be security issue because the container log
   288  	// path is owned by kubelet and the container runtime.
   289  	evaluated, err := filepath.EvalSymlinks(path)
   290  	if err != nil {
   291  		return fmt.Errorf("failed to try resolving symlinks in path %q: %v", path, err)
   292  	}
   293  	path = evaluated
   294  	f, err := os.Open(path)
   295  	if err != nil {
   296  		return fmt.Errorf("failed to open log file %q: %v", path, err)
   297  	}
   298  	defer f.Close()
   299  
   300  	// Search start point based on tail line.
   301  	start, err := tail.FindTailLineStartIndex(f, opts.tail)
   302  	if err != nil {
   303  		return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err)
   304  	}
   305  	if _, err := f.Seek(start, io.SeekStart); err != nil {
   306  		return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err)
   307  	}
   308  
   309  	limitedMode := (opts.tail >= 0) && (!opts.follow)
   310  	limitedNum := opts.tail
   311  	// Start parsing the logs.
   312  	r := bufio.NewReader(f)
   313  	// Do not create watcher here because it is not needed if `Follow` is false.
   314  	var watcher *fsnotify.Watcher
   315  	var parse parseFunc
   316  	var stop bool
   317  	isNewLine := true
   318  	found := true
   319  	writer := newLogWriter(stdout, stderr, opts)
   320  	msg := &logMessage{}
   321  	baseName := filepath.Base(path)
   322  	dir := filepath.Dir(path)
   323  	for {
   324  		if stop || (limitedMode && limitedNum == 0) {
   325  			klog.V(2).InfoS("Finished parsing log file", "path", path)
   326  			return nil
   327  		}
   328  		l, err := r.ReadBytes(eol[0])
   329  		if err != nil {
   330  			if err != io.EOF { // This is an real error
   331  				return fmt.Errorf("failed to read log file %q: %v", path, err)
   332  			}
   333  			if opts.follow {
   334  				// The container is not running, we got to the end of the log.
   335  				if !found {
   336  					return nil
   337  				}
   338  				// Reset seek so that if this is an incomplete line,
   339  				// it will be read again.
   340  				if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
   341  					return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
   342  				}
   343  				if watcher == nil {
   344  					// Initialize the watcher if it has not been initialized yet.
   345  					if watcher, err = fsnotify.NewWatcher(); err != nil {
   346  						return fmt.Errorf("failed to create fsnotify watcher: %v", err)
   347  					}
   348  					defer watcher.Close()
   349  					if err := watcher.Add(dir); err != nil {
   350  						return fmt.Errorf("failed to watch directory %q: %w", dir, err)
   351  					}
   352  					// If we just created the watcher, try again to read as we might have missed
   353  					// the event.
   354  					continue
   355  				}
   356  				var recreated bool
   357  				// Wait until the next log change.
   358  				found, recreated, err = waitLogs(ctx, containerID, baseName, watcher, runtimeService)
   359  				if err != nil {
   360  					return err
   361  				}
   362  				if recreated {
   363  					newF, err := os.Open(path)
   364  					if err != nil {
   365  						if os.IsNotExist(err) {
   366  							continue
   367  						}
   368  						return fmt.Errorf("failed to open log file %q: %v", path, err)
   369  					}
   370  					defer newF.Close()
   371  					f.Close()
   372  					f = newF
   373  					r = bufio.NewReader(f)
   374  				}
   375  				// If the container exited consume data until the next EOF
   376  				continue
   377  			}
   378  			// Should stop after writing the remaining content.
   379  			stop = true
   380  			if len(l) == 0 {
   381  				continue
   382  			}
   383  			klog.InfoS("Incomplete line in log file", "path", path, "line", l)
   384  		}
   385  		if parse == nil {
   386  			// Initialize the log parsing function.
   387  			parse, err = getParseFunc(l)
   388  			if err != nil {
   389  				return fmt.Errorf("failed to get parse function: %v", err)
   390  			}
   391  		}
   392  		// Parse the log line.
   393  		msg.reset()
   394  		if err := parse(l, msg); err != nil {
   395  			klog.ErrorS(err, "Failed when parsing line in log file", "path", path, "line", l)
   396  			continue
   397  		}
   398  		// Write the log line into the stream.
   399  		if err := writer.write(msg, isNewLine); err != nil {
   400  			if err == errMaximumWrite {
   401  				klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
   402  				return nil
   403  			}
   404  			klog.ErrorS(err, "Failed when writing line to log file", "path", path, "line", msg)
   405  			return err
   406  		}
   407  		if limitedMode {
   408  			limitedNum--
   409  		}
   410  		if len(msg.log) > 0 {
   411  			isNewLine = msg.log[len(msg.log)-1] == eol[0]
   412  		} else {
   413  			isNewLine = true
   414  		}
   415  	}
   416  }
   417  
   418  func isContainerRunning(ctx context.Context, id string, r internalapi.RuntimeService) (bool, error) {
   419  	resp, err := r.ContainerStatus(ctx, id, false)
   420  	if err != nil {
   421  		return false, err
   422  	}
   423  	status := resp.GetStatus()
   424  	if status == nil {
   425  		return false, remote.ErrContainerStatusNil
   426  	}
   427  	// Only keep following container log when it is running.
   428  	if status.State != runtimeapi.ContainerState_CONTAINER_RUNNING {
   429  		klog.V(5).InfoS("Container is not running", "containerId", id, "state", status.State)
   430  		// Do not return error because it's normal that the container stops
   431  		// during waiting.
   432  		return false, nil
   433  	}
   434  	return true, nil
   435  }
   436  
   437  // waitLogs wait for the next log write. It returns two booleans and an error. The first boolean
   438  // indicates whether a new log is found; the second boolean if the log file was recreated;
   439  // the error is error happens during waiting new logs.
   440  func waitLogs(ctx context.Context, id string, logName string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) {
   441  	// no need to wait if the pod is not running
   442  	if running, err := isContainerRunning(ctx, id, runtimeService); !running {
   443  		return false, false, err
   444  	}
   445  	errRetry := 5
   446  	for {
   447  		select {
   448  		case <-ctx.Done():
   449  			return false, false, fmt.Errorf("context cancelled")
   450  		case e := <-w.Events:
   451  			switch e.Op {
   452  			case fsnotify.Write, fsnotify.Rename, fsnotify.Remove, fsnotify.Chmod:
   453  				return true, false, nil
   454  			case fsnotify.Create:
   455  				return true, filepath.Base(e.Name) == logName, nil
   456  			default:
   457  				klog.ErrorS(nil, "Received unexpected fsnotify event, retrying", "event", e)
   458  			}
   459  		case err := <-w.Errors:
   460  			klog.ErrorS(err, "Received fsnotify watch error, retrying unless no more retries left", "retries", errRetry)
   461  			if errRetry == 0 {
   462  				return false, false, err
   463  			}
   464  			errRetry--
   465  		case <-time.After(logForceCheckPeriod):
   466  			return true, false, nil
   467  		}
   468  	}
   469  }
   470  

View as plain text