...

Source file src/k8s.io/kubernetes/pkg/kubelet/kubelet_server_journal.go

Documentation: k8s.io/kubernetes/pkg/kubelet

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"compress/gzip"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"net/http"
    26  	"net/url"
    27  	"os"
    28  	"os/exec"
    29  	"reflect"
    30  	"regexp"
    31  	"regexp/syntax"
    32  	"runtime"
    33  	"strconv"
    34  	"strings"
    35  	"time"
    36  
    37  	securejoin "github.com/cyphar/filepath-securejoin"
    38  
    39  	utilvalidation "k8s.io/apimachinery/pkg/util/validation"
    40  	"k8s.io/apimachinery/pkg/util/validation/field"
    41  )
    42  
    43  const (
    44  	dateLayout       = "2006-1-2 15:4:5"
    45  	maxTailLines     = 100000
    46  	maxServiceLength = 256
    47  	maxServices      = 4
    48  	nodeLogDir       = "/var/log/"
    49  )
    50  
    51  var (
    52  	journal = journalServer{}
    53  	// The set of known safe characters to pass to journalctl / GetWinEvent flags - only add to this list if the
    54  	// character cannot be used to create invalid sequences. This is intended as a broad defense against malformed
    55  	// input that could cause an escape.
    56  	reServiceNameUnsafeCharacters = regexp.MustCompile(`[^a-zA-Z\-_.:0-9@]+`)
    57  )
    58  
    59  // journalServer returns text output from the OS specific service logger to view
    60  // from the client. It runs with the privileges of the calling  process
    61  // (the kubelet) and should only be allowed to be invoked by a root user.
    62  type journalServer struct{}
    63  
    64  // ServeHTTP translates HTTP query parameters into arguments to be passed
    65  // to journalctl on the current system. It supports content-encoding of
    66  // gzip to reduce total content size.
    67  func (journalServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    68  	var out io.Writer = w
    69  
    70  	nlq, errs := newNodeLogQuery(req.URL.Query())
    71  	if len(errs) > 0 {
    72  		http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
    73  		return
    74  	}
    75  
    76  	// TODO: Also set a response header that indicates how the request's query was resolved,
    77  	// e.g. "kube-log-source: journal://foobar?arg1=value" or "kube-log-source: file:///var/log/foobar.log"
    78  	w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
    79  	if req.Header.Get("Accept-Encoding") == "gzip" {
    80  		w.Header().Set("Content-Encoding", "gzip")
    81  
    82  		gz, err := gzip.NewWriterLevel(out, gzip.BestSpeed)
    83  		if err != nil {
    84  			fmt.Fprintf(w, "\nfailed to get gzip writer: %v\n", err)
    85  			return
    86  		}
    87  		defer gz.Close()
    88  		out = gz
    89  	}
    90  	nlq.Copy(out)
    91  }
    92  
    93  // nodeLogQuery encapsulates the log query request
    94  type nodeLogQuery struct {
    95  	// Services are the list of services to be queried
    96  	Services []string
    97  	// Files are the list of files
    98  	Files []string
    99  	options
   100  }
   101  
   102  // options encapsulates the query options for services
   103  type options struct {
   104  	// SinceTime is an RFC3339 timestamp from which to show logs.
   105  	SinceTime *time.Time
   106  	// UntilTime is an RFC3339 timestamp until which to show logs.
   107  	UntilTime *time.Time
   108  	// TailLines is used to retrieve the specified number of lines (not more than 100k) from the end of the log.
   109  	// Support for this is implementation specific and only available for service logs.
   110  	TailLines *int
   111  	// Boot show messages from a specific boot. Allowed values are less than 1. Passing an invalid boot offset will fail
   112  	// retrieving logs and return an error. Support for this is implementation specific
   113  	Boot *int
   114  	// Pattern filters log entries by the provided regex pattern. On Linux nodes, this pattern will be read as a
   115  	// PCRE2 regex, on Windows nodes it will be read as a PowerShell regex. Support for this is implementation specific.
   116  	Pattern string
   117  }
   118  
   119  // newNodeLogQuery parses query values and converts all known options into nodeLogQuery
   120  func newNodeLogQuery(query url.Values) (*nodeLogQuery, field.ErrorList) {
   121  	allErrs := field.ErrorList{}
   122  	var nlq nodeLogQuery
   123  	var err error
   124  
   125  	queries, ok := query["query"]
   126  	if len(queries) > 0 {
   127  		for _, q := range queries {
   128  			// The presence of / or \ is a hint that the query is for a log file. If the query is for foo.log without a
   129  			// slash prefix, the heuristics will still return the file contents.
   130  			if strings.ContainsAny(q, `/\`) {
   131  				nlq.Files = append(nlq.Files, q)
   132  			} else if strings.TrimSpace(q) != "" { // Prevent queries with just spaces
   133  				nlq.Services = append(nlq.Services, q)
   134  			}
   135  		}
   136  	}
   137  
   138  	// Prevent specifying  an empty or blank space query.
   139  	// Example: kubectl get --raw /api/v1/nodes/$node/proxy/logs?query="   "
   140  	if ok && (len(nlq.Files) == 0 && len(nlq.Services) == 0) {
   141  		allErrs = append(allErrs, field.Invalid(field.NewPath("query"), queries, "query cannot be empty"))
   142  	}
   143  
   144  	var sinceTime time.Time
   145  	sinceTimeValue := query.Get("sinceTime")
   146  	if len(sinceTimeValue) > 0 {
   147  		sinceTime, err = time.Parse(time.RFC3339, sinceTimeValue)
   148  		if err != nil {
   149  			allErrs = append(allErrs, field.Invalid(field.NewPath("sinceTime"), sinceTimeValue, "invalid time format"))
   150  		} else {
   151  			nlq.SinceTime = &sinceTime
   152  		}
   153  	}
   154  
   155  	var untilTime time.Time
   156  	untilTimeValue := query.Get("untilTime")
   157  	if len(untilTimeValue) > 0 {
   158  		untilTime, err = time.Parse(time.RFC3339, untilTimeValue)
   159  		if err != nil {
   160  			allErrs = append(allErrs, field.Invalid(field.NewPath("untilTime"), untilTimeValue, "invalid time format"))
   161  		} else {
   162  			nlq.UntilTime = &untilTime
   163  		}
   164  	}
   165  
   166  	var boot int
   167  	bootValue := query.Get("boot")
   168  	if len(bootValue) > 0 {
   169  		boot, err = strconv.Atoi(bootValue)
   170  		if err != nil {
   171  			allErrs = append(allErrs, field.Invalid(field.NewPath("boot"), bootValue, err.Error()))
   172  		} else {
   173  			nlq.Boot = &boot
   174  		}
   175  	}
   176  
   177  	var tailLines int
   178  	tailLinesValue := query.Get("tailLines")
   179  	if len(tailLinesValue) > 0 {
   180  		tailLines, err = strconv.Atoi(tailLinesValue)
   181  		if err != nil {
   182  			allErrs = append(allErrs, field.Invalid(field.NewPath("tailLines"), tailLinesValue, err.Error()))
   183  		} else {
   184  			nlq.TailLines = &tailLines
   185  		}
   186  	}
   187  
   188  	pattern := query.Get("pattern")
   189  	if len(pattern) > 0 {
   190  		nlq.Pattern = pattern
   191  	}
   192  
   193  	if len(allErrs) > 0 {
   194  		return nil, allErrs
   195  	}
   196  
   197  	if reflect.DeepEqual(nlq, nodeLogQuery{}) {
   198  		return nil, allErrs
   199  	}
   200  
   201  	return &nlq, allErrs
   202  }
   203  
   204  func validateServices(services []string) field.ErrorList {
   205  	allErrs := field.ErrorList{}
   206  
   207  	for _, s := range services {
   208  		if err := safeServiceName(s); err != nil {
   209  			allErrs = append(allErrs, field.Invalid(field.NewPath("query"), s, err.Error()))
   210  		}
   211  	}
   212  
   213  	if len(services) > maxServices {
   214  		allErrs = append(allErrs, field.TooMany(field.NewPath("query"), len(services), maxServices))
   215  	}
   216  	return allErrs
   217  }
   218  
   219  func (n *nodeLogQuery) validate() field.ErrorList {
   220  	allErrs := validateServices(n.Services)
   221  	switch {
   222  	case len(n.Files) == 0 && len(n.Services) == 0:
   223  		allErrs = append(allErrs, field.Required(field.NewPath("query"), "cannot be empty with options"))
   224  	case len(n.Files) > 0 && len(n.Services) > 0:
   225  		allErrs = append(allErrs, field.Invalid(field.NewPath("query"), fmt.Sprintf("%v, %v", n.Files, n.Services),
   226  			"cannot specify a file and service"))
   227  	case len(n.Files) > 1:
   228  		allErrs = append(allErrs, field.Invalid(field.NewPath("query"), n.Files, "cannot specify more than one file"))
   229  	case len(n.Files) == 1 && n.options != (options{}):
   230  		allErrs = append(allErrs, field.Invalid(field.NewPath("query"), n.Files, "cannot specify file with options"))
   231  	case len(n.Files) == 1:
   232  		if fullLogFilename, err := securejoin.SecureJoin(nodeLogDir, n.Files[0]); err != nil {
   233  			allErrs = append(allErrs, field.Invalid(field.NewPath("query"), n.Files, err.Error()))
   234  		} else if _, err := os.Stat(fullLogFilename); err != nil {
   235  			allErrs = append(allErrs, field.Invalid(field.NewPath("query"), n.Files, err.Error()))
   236  		}
   237  	}
   238  
   239  	if n.SinceTime != nil && n.UntilTime != nil && (n.SinceTime.After(*n.UntilTime)) {
   240  		allErrs = append(allErrs, field.Invalid(field.NewPath("untilTime"), n.UntilTime, "must be after `sinceTime`"))
   241  	}
   242  
   243  	if n.Boot != nil && runtime.GOOS == "windows" {
   244  		allErrs = append(allErrs, field.Invalid(field.NewPath("boot"), *n.Boot, "boot is not supported on Windows"))
   245  	}
   246  
   247  	if n.Boot != nil && *n.Boot > 0 {
   248  		allErrs = append(allErrs, field.Invalid(field.NewPath("boot"), *n.Boot, "must be less than 1"))
   249  	}
   250  
   251  	if n.TailLines != nil {
   252  		if err := utilvalidation.IsInRange((int)(*n.TailLines), 0, maxTailLines); err != nil {
   253  			allErrs = append(allErrs, field.Invalid(field.NewPath("tailLines"), *n.TailLines, err[0]))
   254  		}
   255  	}
   256  
   257  	if _, err := syntax.Parse(n.Pattern, syntax.Perl); err != nil {
   258  		allErrs = append(allErrs, field.Invalid(field.NewPath("pattern"), n.Pattern, err.Error()))
   259  	}
   260  
   261  	return allErrs
   262  }
   263  
   264  // Copy streams the contents of the OS specific logging command executed  with the current args to the provided
   265  // writer. If an error occurs a line is written to the output.
   266  func (n *nodeLogQuery) Copy(w io.Writer) {
   267  	// set the deadline to the maximum across both runs
   268  	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
   269  	defer cancel()
   270  	boot := 0
   271  	if n.Boot != nil {
   272  		boot = *n.Boot
   273  	}
   274  	n.copyForBoot(ctx, w, boot)
   275  }
   276  
   277  // copyForBoot invokes the OS specific logging command with the  provided args
   278  func (n *nodeLogQuery) copyForBoot(ctx context.Context, w io.Writer, previousBoot int) {
   279  	if ctx.Err() != nil {
   280  		return
   281  	}
   282  	nativeLoggers, fileLoggers := n.splitNativeVsFileLoggers(ctx)
   283  	if len(nativeLoggers) > 0 {
   284  		n.copyServiceLogs(ctx, w, nativeLoggers, previousBoot)
   285  	}
   286  
   287  	if len(fileLoggers) > 0 && n.options != (options{}) {
   288  		fmt.Fprintf(w, "\noptions present and query resolved to log files for %v\ntry without specifying options\n",
   289  			fileLoggers)
   290  		return
   291  	}
   292  
   293  	if len(fileLoggers) > 0 {
   294  		copyFileLogs(ctx, w, fileLoggers)
   295  	}
   296  }
   297  
   298  // splitNativeVsFileLoggers checks if each service logs to native OS logs or to a file and returns a list of services
   299  // that log natively vs maybe to a file
   300  func (n *nodeLogQuery) splitNativeVsFileLoggers(ctx context.Context) ([]string, []string) {
   301  	var nativeLoggers []string
   302  	var fileLoggers []string
   303  
   304  	for _, service := range n.Services {
   305  		// Check the journalctl output to figure if the service is using journald or not. This is not needed in the
   306  		// Get-WinEvent case as the command returns an error if a service is not logging to the Application provider.
   307  		if checkForNativeLogger(ctx, service) {
   308  			nativeLoggers = append(nativeLoggers, service)
   309  		} else {
   310  			fileLoggers = append(fileLoggers, service)
   311  		}
   312  	}
   313  	return nativeLoggers, fileLoggers
   314  }
   315  
   316  // copyServiceLogs invokes journalctl or Get-WinEvent with the provided args. Note that
   317  // services are explicitly passed here to account for the heuristics.
   318  func (n *nodeLogQuery) copyServiceLogs(ctx context.Context, w io.Writer, services []string, previousBoot int) {
   319  	cmdStr, args, err := getLoggingCmd(n, services)
   320  	if err != nil {
   321  		fmt.Fprintf(w, "\nfailed to get logging cmd: %v\n", err)
   322  		return
   323  	}
   324  	cmd := exec.CommandContext(ctx, cmdStr, args...)
   325  	cmd.Stdout = w
   326  	cmd.Stderr = w
   327  
   328  	if err := cmd.Run(); err != nil {
   329  		if _, ok := err.(*exec.ExitError); ok {
   330  			return
   331  		}
   332  		if previousBoot == 0 {
   333  			fmt.Fprintf(w, "\nerror: journal output not available\n")
   334  		}
   335  	}
   336  }
   337  
   338  // copyFileLogs loops over all the services and attempts to collect the file logs of each service
   339  func copyFileLogs(ctx context.Context, w io.Writer, services []string) {
   340  	if ctx.Err() != nil {
   341  		fmt.Fprintf(w, "\ncontext error: %v\n", ctx.Err())
   342  		return
   343  	}
   344  
   345  	for _, service := range services {
   346  		heuristicsCopyFileLogs(ctx, w, service)
   347  	}
   348  }
   349  
   350  // heuristicsCopyFileLogs attempts to collect logs from either
   351  // /var/log/service
   352  // /var/log/service.log or
   353  // /var/log/service/service.log or
   354  // in that order stopping on first success.
   355  func heuristicsCopyFileLogs(ctx context.Context, w io.Writer, service string) {
   356  	logFileNames := [3]string{
   357  		service,
   358  		fmt.Sprintf("%s.log", service),
   359  		fmt.Sprintf("%s/%s.log", service, service),
   360  	}
   361  
   362  	var err error
   363  	for _, logFileName := range logFileNames {
   364  		var logFile string
   365  		logFile, err = securejoin.SecureJoin(nodeLogDir, logFileName)
   366  		if err != nil {
   367  			break
   368  		}
   369  		err = heuristicsCopyFileLog(ctx, w, logFile)
   370  		if err == nil {
   371  			break
   372  		} else if errors.Is(err, os.ErrNotExist) {
   373  			continue
   374  		} else {
   375  			break
   376  		}
   377  	}
   378  
   379  	if err != nil {
   380  		// If the last error was file not found it implies that no log file was found for the service
   381  		if errors.Is(err, os.ErrNotExist) {
   382  			fmt.Fprintf(w, "\nlog not found for %s\n", service)
   383  			return
   384  		}
   385  		fmt.Fprintf(w, "\nerror getting log for %s: %v\n", service, err)
   386  	}
   387  }
   388  
   389  // readerCtx is the interface that wraps io.Reader with a context
   390  type readerCtx struct {
   391  	ctx context.Context
   392  	io.Reader
   393  }
   394  
   395  func (r *readerCtx) Read(p []byte) (n int, err error) {
   396  	if err := r.ctx.Err(); err != nil {
   397  		return 0, err
   398  	}
   399  	return r.Reader.Read(p)
   400  }
   401  
   402  // newReaderCtx gets a context-aware io.Reader
   403  func newReaderCtx(ctx context.Context, r io.Reader) io.Reader {
   404  	return &readerCtx{
   405  		ctx:    ctx,
   406  		Reader: r,
   407  	}
   408  }
   409  
   410  // heuristicsCopyFileLog returns the contents of the given logFile
   411  func heuristicsCopyFileLog(ctx context.Context, w io.Writer, logFile string) error {
   412  	fInfo, err := os.Stat(logFile)
   413  	if err != nil {
   414  		return err
   415  	}
   416  	// This is to account for the heuristics where logs for service foo
   417  	// could be in /var/log/foo/
   418  	if fInfo.IsDir() {
   419  		return os.ErrNotExist
   420  	}
   421  
   422  	f, err := os.Open(logFile)
   423  	if err != nil {
   424  		return err
   425  	}
   426  	defer f.Close()
   427  
   428  	if _, err := io.Copy(w, newReaderCtx(ctx, f)); err != nil {
   429  		return err
   430  	}
   431  	return nil
   432  }
   433  
   434  func safeServiceName(s string) error {
   435  	// Max length of a service name is 256 across supported OSes
   436  	if len(s) > maxServiceLength {
   437  		return fmt.Errorf("length must be less than 100")
   438  	}
   439  
   440  	if reServiceNameUnsafeCharacters.MatchString(s) {
   441  		return fmt.Errorf("input contains unsupported characters")
   442  	}
   443  	return nil
   444  }
   445  

View as plain text