...

Source file src/k8s.io/kubectl/pkg/cmd/logs/logs.go

Documentation: k8s.io/kubectl/pkg/cmd/logs

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package logs
    18  
    19  import (
    20  	"bufio"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"regexp"
    26  	"sync"
    27  	"time"
    28  
    29  	"github.com/spf13/cobra"
    30  
    31  	corev1 "k8s.io/api/core/v1"
    32  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/runtime"
    35  	"k8s.io/cli-runtime/pkg/genericclioptions"
    36  	"k8s.io/cli-runtime/pkg/genericiooptions"
    37  	"k8s.io/client-go/rest"
    38  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    39  	"k8s.io/kubectl/pkg/polymorphichelpers"
    40  	"k8s.io/kubectl/pkg/scheme"
    41  	"k8s.io/kubectl/pkg/util"
    42  	"k8s.io/kubectl/pkg/util/completion"
    43  	"k8s.io/kubectl/pkg/util/i18n"
    44  	"k8s.io/kubectl/pkg/util/templates"
    45  )
    46  
    47  const (
    48  	logsUsageStr = "logs [-f] [-p] (POD | TYPE/NAME) [-c CONTAINER]"
    49  )
    50  
    51  var (
    52  	logsLong = templates.LongDesc(i18n.T(`
    53  		Print the logs for a container in a pod or specified resource. 
    54  		If the pod has only one container, the container name is optional.`))
    55  
    56  	logsExample = templates.Examples(i18n.T(`
    57  		# Return snapshot logs from pod nginx with only one container
    58  		kubectl logs nginx
    59  
    60  		# Return snapshot logs from pod nginx with multi containers
    61  		kubectl logs nginx --all-containers=true
    62  
    63  		# Return snapshot logs from all containers in pods defined by label app=nginx
    64  		kubectl logs -l app=nginx --all-containers=true
    65  
    66  		# Return snapshot of previous terminated ruby container logs from pod web-1
    67  		kubectl logs -p -c ruby web-1
    68  
    69  		# Begin streaming the logs of the ruby container in pod web-1
    70  		kubectl logs -f -c ruby web-1
    71  
    72  		# Begin streaming the logs from all containers in pods defined by label app=nginx
    73  		kubectl logs -f -l app=nginx --all-containers=true
    74  
    75  		# Display only the most recent 20 lines of output in pod nginx
    76  		kubectl logs --tail=20 nginx
    77  
    78  		# Show all logs from pod nginx written in the last hour
    79  		kubectl logs --since=1h nginx
    80  
    81  		# Show logs from a kubelet with an expired serving certificate
    82  		kubectl logs --insecure-skip-tls-verify-backend nginx
    83  
    84  		# Return snapshot logs from first container of a job named hello
    85  		kubectl logs job/hello
    86  
    87  		# Return snapshot logs from container nginx-1 of a deployment named nginx
    88  		kubectl logs deployment/nginx -c nginx-1`))
    89  
    90  	selectorTail    int64 = 10
    91  	logsUsageErrStr       = fmt.Sprintf("expected '%s'.\nPOD or TYPE/NAME is a required argument for the logs command", logsUsageStr)
    92  )
    93  
    94  const (
    95  	defaultPodLogsTimeout = 20 * time.Second
    96  )
    97  
    98  type LogsOptions struct {
    99  	Namespace     string
   100  	ResourceArg   string
   101  	AllContainers bool
   102  	Options       runtime.Object
   103  	Resources     []string
   104  
   105  	ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error
   106  
   107  	// PodLogOptions
   108  	SinceTime                    string
   109  	SinceSeconds                 time.Duration
   110  	Follow                       bool
   111  	Previous                     bool
   112  	Timestamps                   bool
   113  	IgnoreLogErrors              bool
   114  	LimitBytes                   int64
   115  	Tail                         int64
   116  	Container                    string
   117  	InsecureSkipTLSVerifyBackend bool
   118  
   119  	// whether or not a container name was given via --container
   120  	ContainerNameSpecified bool
   121  	Selector               string
   122  	MaxFollowConcurrency   int
   123  	Prefix                 bool
   124  
   125  	Object           runtime.Object
   126  	GetPodTimeout    time.Duration
   127  	RESTClientGetter genericclioptions.RESTClientGetter
   128  	LogsForObject    polymorphichelpers.LogsForObjectFunc
   129  
   130  	genericiooptions.IOStreams
   131  
   132  	TailSpecified bool
   133  
   134  	containerNameFromRefSpecRegexp *regexp.Regexp
   135  }
   136  
   137  func NewLogsOptions(streams genericiooptions.IOStreams, allContainers bool) *LogsOptions {
   138  	return &LogsOptions{
   139  		IOStreams:            streams,
   140  		AllContainers:        allContainers,
   141  		Tail:                 -1,
   142  		MaxFollowConcurrency: 5,
   143  
   144  		containerNameFromRefSpecRegexp: regexp.MustCompile(`spec\.(?:initContainers|containers|ephemeralContainers){(.+)}`),
   145  	}
   146  }
   147  
   148  // NewCmdLogs creates a new pod logs command
   149  func NewCmdLogs(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
   150  	o := NewLogsOptions(streams, false)
   151  
   152  	cmd := &cobra.Command{
   153  		Use:                   logsUsageStr,
   154  		DisableFlagsInUseLine: true,
   155  		Short:                 i18n.T("Print the logs for a container in a pod"),
   156  		Long:                  logsLong,
   157  		Example:               logsExample,
   158  		ValidArgsFunction:     completion.PodResourceNameAndContainerCompletionFunc(f),
   159  		Run: func(cmd *cobra.Command, args []string) {
   160  			cmdutil.CheckErr(o.Complete(f, cmd, args))
   161  			cmdutil.CheckErr(o.Validate())
   162  			cmdutil.CheckErr(o.RunLogs())
   163  		},
   164  	}
   165  	o.AddFlags(cmd)
   166  	return cmd
   167  }
   168  
   169  func (o *LogsOptions) AddFlags(cmd *cobra.Command) {
   170  	cmd.Flags().BoolVar(&o.AllContainers, "all-containers", o.AllContainers, "Get all containers' logs in the pod(s).")
   171  	cmd.Flags().BoolVarP(&o.Follow, "follow", "f", o.Follow, "Specify if the logs should be streamed.")
   172  	cmd.Flags().BoolVar(&o.Timestamps, "timestamps", o.Timestamps, "Include timestamps on each line in the log output")
   173  	cmd.Flags().Int64Var(&o.LimitBytes, "limit-bytes", o.LimitBytes, "Maximum bytes of logs to return. Defaults to no limit.")
   174  	cmd.Flags().BoolVarP(&o.Previous, "previous", "p", o.Previous, "If true, print the logs for the previous instance of the container in a pod if it exists.")
   175  	cmd.Flags().Int64Var(&o.Tail, "tail", o.Tail, "Lines of recent log file to display. Defaults to -1 with no selector, showing all log lines otherwise 10, if a selector is provided.")
   176  	cmd.Flags().BoolVar(&o.IgnoreLogErrors, "ignore-errors", o.IgnoreLogErrors, "If watching / following pod logs, allow for any errors that occur to be non-fatal")
   177  	cmd.Flags().StringVar(&o.SinceTime, "since-time", o.SinceTime, i18n.T("Only return logs after a specific date (RFC3339). Defaults to all logs. Only one of since-time / since may be used."))
   178  	cmd.Flags().DurationVar(&o.SinceSeconds, "since", o.SinceSeconds, "Only return logs newer than a relative duration like 5s, 2m, or 3h. Defaults to all logs. Only one of since-time / since may be used.")
   179  	cmd.Flags().StringVarP(&o.Container, "container", "c", o.Container, "Print the logs of this container")
   180  	cmd.Flags().BoolVar(&o.InsecureSkipTLSVerifyBackend, "insecure-skip-tls-verify-backend", o.InsecureSkipTLSVerifyBackend,
   181  		"Skip verifying the identity of the kubelet that logs are requested from.  In theory, an attacker could provide invalid log content back. You might want to use this if your kubelet serving certificates have expired.")
   182  	cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodLogsTimeout)
   183  	cmdutil.AddLabelSelectorFlagVar(cmd, &o.Selector)
   184  	cmd.Flags().IntVar(&o.MaxFollowConcurrency, "max-log-requests", o.MaxFollowConcurrency, "Specify maximum number of concurrent logs to follow when using by a selector. Defaults to 5.")
   185  	cmd.Flags().BoolVar(&o.Prefix, "prefix", o.Prefix, "Prefix each log line with the log source (pod name and container name)")
   186  }
   187  
   188  func (o *LogsOptions) ToLogOptions() (*corev1.PodLogOptions, error) {
   189  	logOptions := &corev1.PodLogOptions{
   190  		Container:                    o.Container,
   191  		Follow:                       o.Follow,
   192  		Previous:                     o.Previous,
   193  		Timestamps:                   o.Timestamps,
   194  		InsecureSkipTLSVerifyBackend: o.InsecureSkipTLSVerifyBackend,
   195  	}
   196  
   197  	if len(o.SinceTime) > 0 {
   198  		t, err := util.ParseRFC3339(o.SinceTime, metav1.Now)
   199  		if err != nil {
   200  			return nil, err
   201  		}
   202  
   203  		logOptions.SinceTime = &t
   204  	}
   205  
   206  	if o.LimitBytes != 0 {
   207  		logOptions.LimitBytes = &o.LimitBytes
   208  	}
   209  
   210  	if o.SinceSeconds != 0 {
   211  		// round up to the nearest second
   212  		sec := int64(o.SinceSeconds.Round(time.Second).Seconds())
   213  		logOptions.SinceSeconds = &sec
   214  	}
   215  
   216  	if len(o.Selector) > 0 && o.Tail == -1 && !o.TailSpecified {
   217  		logOptions.TailLines = &selectorTail
   218  	} else if o.Tail != -1 {
   219  		logOptions.TailLines = &o.Tail
   220  	}
   221  
   222  	return logOptions, nil
   223  }
   224  
   225  func (o *LogsOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
   226  	o.ContainerNameSpecified = cmd.Flag("container").Changed
   227  	o.TailSpecified = cmd.Flag("tail").Changed
   228  	o.Resources = args
   229  
   230  	switch len(args) {
   231  	case 0:
   232  		if len(o.Selector) == 0 {
   233  			return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)
   234  		}
   235  	case 1:
   236  		o.ResourceArg = args[0]
   237  		if len(o.Selector) != 0 {
   238  			return cmdutil.UsageErrorf(cmd, "only a selector (-l) or a POD name is allowed")
   239  		}
   240  	case 2:
   241  		o.ResourceArg = args[0]
   242  		o.Container = args[1]
   243  	default:
   244  		return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)
   245  	}
   246  	var err error
   247  	o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()
   248  	if err != nil {
   249  		return err
   250  	}
   251  
   252  	o.ConsumeRequestFn = DefaultConsumeRequest
   253  
   254  	o.GetPodTimeout, err = cmdutil.GetPodRunningTimeoutFlag(cmd)
   255  	if err != nil {
   256  		return err
   257  	}
   258  
   259  	o.Options, err = o.ToLogOptions()
   260  	if err != nil {
   261  		return err
   262  	}
   263  
   264  	o.RESTClientGetter = f
   265  	o.LogsForObject = polymorphichelpers.LogsForObjectFn
   266  
   267  	if o.Object == nil {
   268  		builder := f.NewBuilder().
   269  			WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
   270  			NamespaceParam(o.Namespace).DefaultNamespace().
   271  			SingleResourceType()
   272  		if o.ResourceArg != "" {
   273  			builder.ResourceNames("pods", o.ResourceArg)
   274  		}
   275  		if o.Selector != "" {
   276  			builder.ResourceTypes("pods").LabelSelectorParam(o.Selector)
   277  		}
   278  		infos, err := builder.Do().Infos()
   279  		if err != nil {
   280  			if apierrors.IsNotFound(err) {
   281  				err = fmt.Errorf("error from server (NotFound): %w in namespace %q", err, o.Namespace)
   282  			}
   283  			return err
   284  		}
   285  		if o.Selector == "" && len(infos) != 1 {
   286  			return errors.New("expected a resource")
   287  		}
   288  		o.Object = infos[0].Object
   289  		if o.Selector != "" && len(o.Object.(*corev1.PodList).Items) == 0 {
   290  			fmt.Fprintf(o.ErrOut, "No resources found in %s namespace.\n", o.Namespace)
   291  		}
   292  	}
   293  
   294  	return nil
   295  }
   296  
   297  func (o LogsOptions) Validate() error {
   298  	if len(o.SinceTime) > 0 && o.SinceSeconds != 0 {
   299  		return fmt.Errorf("at most one of `sinceTime` or `sinceSeconds` may be specified")
   300  	}
   301  
   302  	logsOptions, ok := o.Options.(*corev1.PodLogOptions)
   303  	if !ok {
   304  		return errors.New("unexpected logs options object")
   305  	}
   306  	if o.AllContainers && len(logsOptions.Container) > 0 {
   307  		return fmt.Errorf("--all-containers=true should not be specified with container name %s", logsOptions.Container)
   308  	}
   309  
   310  	if o.ContainerNameSpecified && len(o.Resources) == 2 {
   311  		return fmt.Errorf("only one of -c or an inline [CONTAINER] arg is allowed")
   312  	}
   313  
   314  	if o.LimitBytes < 0 {
   315  		return fmt.Errorf("--limit-bytes must be greater than 0")
   316  	}
   317  
   318  	if logsOptions.SinceSeconds != nil && *logsOptions.SinceSeconds < int64(0) {
   319  		return fmt.Errorf("--since must be greater than 0")
   320  	}
   321  
   322  	if logsOptions.TailLines != nil && *logsOptions.TailLines < -1 {
   323  		return fmt.Errorf("--tail must be greater than or equal to -1")
   324  	}
   325  
   326  	return nil
   327  }
   328  
   329  // RunLogs retrieves a pod log
   330  func (o LogsOptions) RunLogs() error {
   331  	requests, err := o.LogsForObject(o.RESTClientGetter, o.Object, o.Options, o.GetPodTimeout, o.AllContainers)
   332  	if err != nil {
   333  		return err
   334  	}
   335  
   336  	if o.Follow && len(requests) > 1 {
   337  		if len(requests) > o.MaxFollowConcurrency {
   338  			return fmt.Errorf(
   339  				"you are attempting to follow %d log streams, but maximum allowed concurrency is %d, use --max-log-requests to increase the limit",
   340  				len(requests), o.MaxFollowConcurrency,
   341  			)
   342  		}
   343  
   344  		return o.parallelConsumeRequest(requests)
   345  	}
   346  
   347  	return o.sequentialConsumeRequest(requests)
   348  }
   349  
   350  func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
   351  	reader, writer := io.Pipe()
   352  	wg := &sync.WaitGroup{}
   353  	wg.Add(len(requests))
   354  	for objRef, request := range requests {
   355  		go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) {
   356  			defer wg.Done()
   357  			out := o.addPrefixIfNeeded(objRef, writer)
   358  			if err := o.ConsumeRequestFn(request, out); err != nil {
   359  				if !o.IgnoreLogErrors {
   360  					writer.CloseWithError(err)
   361  
   362  					// It's important to return here to propagate the error via the pipe
   363  					return
   364  				}
   365  
   366  				fmt.Fprintf(writer, "error: %v\n", err)
   367  			}
   368  
   369  		}(objRef, request)
   370  	}
   371  
   372  	go func() {
   373  		wg.Wait()
   374  		writer.Close()
   375  	}()
   376  
   377  	_, err := io.Copy(o.Out, reader)
   378  	return err
   379  }
   380  
   381  func (o LogsOptions) sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
   382  	for objRef, request := range requests {
   383  		out := o.addPrefixIfNeeded(objRef, o.Out)
   384  		if err := o.ConsumeRequestFn(request, out); err != nil {
   385  			if !o.IgnoreLogErrors {
   386  				return err
   387  			}
   388  
   389  			fmt.Fprintf(o.Out, "error: %v\n", err)
   390  		}
   391  	}
   392  
   393  	return nil
   394  }
   395  
   396  func (o LogsOptions) addPrefixIfNeeded(ref corev1.ObjectReference, writer io.Writer) io.Writer {
   397  	if !o.Prefix || ref.FieldPath == "" || ref.Name == "" {
   398  		return writer
   399  	}
   400  
   401  	// We rely on ref.FieldPath to contain a reference to a container
   402  	// including a container name (not an index) so we can get a container name
   403  	// without making an extra API request.
   404  	var containerName string
   405  	containerNameMatches := o.containerNameFromRefSpecRegexp.FindStringSubmatch(ref.FieldPath)
   406  	if len(containerNameMatches) == 2 {
   407  		containerName = containerNameMatches[1]
   408  	}
   409  
   410  	prefix := fmt.Sprintf("[pod/%s/%s] ", ref.Name, containerName)
   411  	return &prefixingWriter{
   412  		prefix: []byte(prefix),
   413  		writer: writer,
   414  	}
   415  }
   416  
   417  // DefaultConsumeRequest reads the data from request and writes into
   418  // the out writer. It buffers data from requests until the newline or io.EOF
   419  // occurs in the data, so it doesn't interleave logs sub-line
   420  // when running concurrently.
   421  //
   422  // A successful read returns err == nil, not err == io.EOF.
   423  // Because the function is defined to read from request until io.EOF, it does
   424  // not treat an io.EOF as an error to be reported.
   425  func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {
   426  	readCloser, err := request.Stream(context.TODO())
   427  	if err != nil {
   428  		return err
   429  	}
   430  	defer readCloser.Close()
   431  
   432  	r := bufio.NewReader(readCloser)
   433  	for {
   434  		bytes, err := r.ReadBytes('\n')
   435  		if _, err := out.Write(bytes); err != nil {
   436  			return err
   437  		}
   438  
   439  		if err != nil {
   440  			if err != io.EOF {
   441  				return err
   442  			}
   443  			return nil
   444  		}
   445  	}
   446  }
   447  
   448  type prefixingWriter struct {
   449  	prefix []byte
   450  	writer io.Writer
   451  }
   452  
   453  func (pw *prefixingWriter) Write(p []byte) (int, error) {
   454  	if len(p) == 0 {
   455  		return 0, nil
   456  	}
   457  
   458  	// Perform an "atomic" write of a prefix and p to make sure that it doesn't interleave
   459  	// sub-line when used concurrently with io.PipeWrite.
   460  	n, err := pw.writer.Write(append(pw.prefix, p...))
   461  	if n > len(p) {
   462  		// To comply with the io.Writer interface requirements we must
   463  		// return a number of bytes written from p (0 <= n <= len(p)),
   464  		// so we are ignoring the length of the prefix here.
   465  		return len(p), err
   466  	}
   467  	return n, err
   468  }
   469  

View as plain text