    17  package logs
    19  import (
    20  	"bufio"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"regexp"
    26  	"sync"
    27  	"time"
    29  	"github.com/spf13/cobra"
    31  	corev1 "k8s.io/api/core/v1"
    32  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/runtime"
    35  	"k8s.io/cli-runtime/pkg/genericclioptions"
    36  	"k8s.io/cli-runtime/pkg/genericiooptions"
    37  	"k8s.io/client-go/rest"
    38  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    39  	"k8s.io/kubectl/pkg/polymorphichelpers"
    40  	"k8s.io/kubectl/pkg/scheme"
    41  	"k8s.io/kubectl/pkg/util"
    42  	"k8s.io/kubectl/pkg/util/completion"
    43  	"k8s.io/kubectl/pkg/util/i18n"
    44  	"k8s.io/kubectl/pkg/util/templates"
    45  )
    47  const (
    48  	logsUsageStr = "logs [-f] [-p] (POD | TYPE/NAME) [-c CONTAINER]"
    49  )
    51  var (
    52  	logsLong = templates.LongDesc(i18n.T(`
    53  		Print the logs for a container in a pod or specified resource. 
    54  		If the pod has only one container, the container name is optional.`))
    56  	logsExample = templates.Examples(i18n.T(`
    57  		# Return snapshot logs from pod nginx with only one container
    58  		kubectl logs nginx
    60  		# Return snapshot logs from pod nginx with multi containers
    61  		kubectl logs nginx --all-containers=true
    63  		# Return snapshot logs from all containers in pods defined by label app=nginx
    64  		kubectl logs -l app=nginx --all-containers=true
    66  		# Return snapshot of previous terminated ruby container logs from pod web-1
    67  		kubectl logs -p -c ruby web-1
    69  		# Begin streaming the logs of the ruby container in pod web-1
    70  		kubectl logs -f -c ruby web-1
    72  		# Begin streaming the logs from all containers in pods defined by label app=nginx
    73  		kubectl logs -f -l app=nginx --all-containers=true
    75  		# Display only the most recent 20 lines of output in pod nginx
    76  		kubectl logs --tail=20 nginx
    78  		# Show all logs from pod nginx written in the last hour
    79  		kubectl logs --since=1h nginx
    81  		# Show logs from a kubelet with an expired serving certificate
    82  		kubectl logs --insecure-skip-tls-verify-backend nginx
    84  		# Return snapshot logs from first container of a job named hello
    85  		kubectl logs job/hello
    87  		# Return snapshot logs from container nginx-1 of a deployment named nginx
    88  		kubectl logs deployment/nginx -c nginx-1`))
    90  	selectorTail    int64 = 10
    91  	logsUsageErrStr       = fmt.Sprintf("expected '%s'.\nPOD or TYPE/NAME is a required argument for the logs command", logsUsageStr)
    92  )
    94  const (
    95  	defaultPodLogsTimeout = 20 * time.Second
    96  )
    98  type LogsOptions struct {
    99  	Namespace     string
   100  	ResourceArg   string
   101  	AllContainers bool
   102  	Options       runtime.Object
   103  	Resources     []string
   105  	ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error
   107  	// PodLogOptions
   108  	SinceTime                    string
   109  	SinceSeconds                 time.Duration
   110  	Follow                       bool
   111  	Previous                     bool
   112  	Timestamps                   bool
   113  	IgnoreLogErrors              bool
   114  	LimitBytes                   int64
   115  	Tail                         int64
   116  	Container                    string
   117  	InsecureSkipTLSVerifyBackend bool
   119  	// whether or not a container name was given via --container
   120  	ContainerNameSpecified bool
   121  	Selector               string
   122  	MaxFollowConcurrency   int
   123  	Prefix                 bool
   125  	Object           runtime.Object
   126  	GetPodTimeout    time.Duration
   127  	RESTClientGetter genericclioptions.RESTClientGetter
   128  	LogsForObject    polymorphichelpers.LogsForObjectFunc
   130  	genericiooptions.IOStreams
   132  	TailSpecified bool
   134  	containerNameFromRefSpecRegexp *regexp.Regexp
   135  }
   137  func NewLogsOptions(streams genericiooptions.IOStreams, allContainers bool) *LogsOptions {
   138  	return &LogsOptions{
   139  		IOStreams:            streams,
   140  		AllContainers:        allContainers,
   141  		Tail:                 -1,
   142  		MaxFollowConcurrency: 5,
   144  		containerNameFromRefSpecRegexp: regexp.MustCompile(`spec\.(?:initContainers|containers|ephemeralContainers){(.+)}`),
   145  	}
   146  }
   148  // NewCmdLogs creates a new pod logs command
   149  func NewCmdLogs(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
   150  	o := NewLogsOptions(streams, false)
   152  	cmd := &cobra.Command{
   153  		Use:                   logsUsageStr,
   154  		DisableFlagsInUseLine: true,
   155  		Short:                 i18n.T("Print the logs for a container in a pod"),
   156  		Long:                  logsLong,
   157  		Example:               logsExample,
   158  		ValidArgsFunction:     completion.PodResourceNameAndContainerCompletionFunc(f),
   159  		Run: func(cmd *cobra.Command, args []string) {
   160  			cmdutil.CheckErr(o.Complete(f, cmd, args))
   161  			cmdutil.CheckErr(o.Validate())
   162  			cmdutil.CheckErr(o.RunLogs())
   163  		},
   164  	}
   165  	o.AddFlags(cmd)
   166  	return cmd
   167  }
   169  func (o *LogsOptions) AddFlags(cmd *cobra.Command) {
   170  	cmd.Flags().BoolVar(&o.AllContainers, "all-containers", o.AllContainers, "Get all containers' logs in the pod(s).")
   171  	cmd.Flags().BoolVarP(&o.Follow, "follow", "f", o.Follow, "Specify if the logs should be streamed.")
   172  	cmd.Flags().BoolVar(&o.Timestamps, "timestamps", o.Timestamps, "Include timestamps on each line in the log output")
   173  	cmd.Flags().Int64Var(&o.LimitBytes, "limit-bytes", o.LimitBytes, "Maximum bytes of logs to return. Defaults to no limit.")
   174  	cmd.Flags().BoolVarP(&o.Previous, "previous", "p", o.Previous, "If true, print the logs for the previous instance of the container in a pod if it exists.")
   175  	cmd.Flags().Int64Var(&o.Tail, "tail", o.Tail, "Lines of recent log file to display. Defaults to -1 with no selector, showing all log lines otherwise 10, if a selector is provided.")
   176  	cmd.Flags().BoolVar(&o.IgnoreLogErrors, "ignore-errors", o.IgnoreLogErrors, "If watching / following pod logs, allow for any errors that occur to be non-fatal")
   177  	cmd.Flags().StringVar(&o.SinceTime, "since-time", o.SinceTime, i18n.T("Only return logs after a specific date (RFC3339). Defaults to all logs. Only one of since-time / since may be used."))
   178  	cmd.Flags().DurationVar(&o.SinceSeconds, "since", o.SinceSeconds, "Only return logs newer than a relative duration like 5s, 2m, or 3h. Defaults to all logs. Only one of since-time / since may be used.")
   179  	cmd.Flags().StringVarP(&o.Container, "container", "c", o.Container, "Print the logs of this container")
   180  	cmd.Flags().BoolVar(&o.InsecureSkipTLSVerifyBackend, "insecure-skip-tls-verify-backend", o.InsecureSkipTLSVerifyBackend,
   181  		"Skip verifying the identity of the kubelet that logs are requested from.  In theory, an attacker could provide invalid log content back. You might want to use this if your kubelet serving certificates have expired.")
   182  	cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodLogsTimeout)
   183  	cmdutil.AddLabelSelectorFlagVar(cmd, &o.Selector)
   184  	cmd.Flags().IntVar(&o.MaxFollowConcurrency, "max-log-requests", o.MaxFollowConcurrency, "Specify maximum number of concurrent logs to follow when using by a selector. Defaults to 5.")
   185  	cmd.Flags().BoolVar(&o.Prefix, "prefix", o.Prefix, "Prefix each log line with the log source (pod name and container name)")
   186  }
   188  func (o *LogsOptions) ToLogOptions() (*corev1.PodLogOptions, error) {
   189  	logOptions := &corev1.PodLogOptions{
   190  		Container:                    o.Container,
   191  		Follow:                       o.Follow,
   192  		Previous:                     o.Previous,
   193  		Timestamps:                   o.Timestamps,
   194  		InsecureSkipTLSVerifyBackend: o.InsecureSkipTLSVerifyBackend,
   195  	}
   197  	if len(o.SinceTime) > 0 {
   198  		t, err := util.ParseRFC3339(o.SinceTime, metav1.Now)
   199  		if err != nil {
   200  			return nil, err
   201  		}
   203  		logOptions.SinceTime = &t
   204  	}
   206  	if o.LimitBytes != 0 {
   207  		logOptions.LimitBytes = &o.LimitBytes
   208  	}
   210  	if o.SinceSeconds != 0 {
   211  		// round up to the nearest second
   212  		sec := int64(o.SinceSeconds.Round(time.Second).Seconds())
   213  		logOptions.SinceSeconds = &sec
   214  	}
   216  	if len(o.Selector) > 0 && o.Tail == -1 && !o.TailSpecified {
   217  		logOptions.TailLines = &selectorTail
   218  	} else if o.Tail != -1 {
   219  		logOptions.TailLines = &o.Tail
   220  	}
   222  	return logOptions, nil
   223  }
   225  func (o *LogsOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
   226  	o.ContainerNameSpecified = cmd.Flag("container").Changed
   227  	o.TailSpecified = cmd.Flag("tail").Changed
   228  	o.Resources = args
   230  	switch len(args) {
   231  	case 0:
   232  		if len(o.Selector) == 0 {
   233  			return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)
   234  		}
   235  	case 1:
   236  		o.ResourceArg = args[0]
   237  		if len(o.Selector) != 0 {
   238  			return cmdutil.UsageErrorf(cmd, "only a selector (-l) or a POD name is allowed")
   239  		}
   240  	case 2:
   241  		o.ResourceArg = args[0]
   242  		o.Container = args[1]
   243  	default:
   244  		return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)
   245  	}
   246  	var err error
   247  	o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()
   248  	if err != nil {
   249  		return err
   250  	}
   252  	o.ConsumeRequestFn = DefaultConsumeRequest
   254  	o.GetPodTimeout, err = cmdutil.GetPodRunningTimeoutFlag(cmd)
   255  	if err != nil {
   256  		return err
   257  	}
   259  	o.Options, err = o.ToLogOptions()
   260  	if err != nil {
   261  		return err
   262  	}
   264  	o.RESTClientGetter = f
   265  	o.LogsForObject = polymorphichelpers.LogsForObjectFn
   267  	if o.Object == nil {
   268  		builder := f.NewBuilder().
   269  			WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
   270  			NamespaceParam(o.Namespace).DefaultNamespace().
   271  			SingleResourceType()
   272  		if o.ResourceArg != "" {
   273  			builder.ResourceNames("pods", o.ResourceArg)
   274  		}
   275  		if o.Selector != "" {
   276  			builder.ResourceTypes("pods").LabelSelectorParam(o.Selector)
   277  		}
   278  		infos, err := builder.Do().Infos()
   279  		if err != nil {
   280  			if apierrors.IsNotFound(err) {
   281  				err = fmt.Errorf("error from server (NotFound): %w in namespace %q", err, o.Namespace)
   282  			}
   283  			return err
   284  		}
   285  		if o.Selector == "" && len(infos) != 1 {
   286  			return errors.New("expected a resource")
   287  		}
   288  		o.Object = infos[0].Object
   289  		if o.Selector != "" && len(o.Object.(*corev1.PodList).Items) == 0 {
   290  			fmt.Fprintf(o.ErrOut, "No resources found in %s namespace.\n", o.Namespace)
   291  		}
   292  	}
   294  	return nil
   295  }
   297  func (o LogsOptions) Validate() error {
   298  	if len(o.SinceTime) > 0 && o.SinceSeconds != 0 {
   299  		return fmt.Errorf("at most one of `sinceTime` or `sinceSeconds` may be specified")
   300  	}
   302  	logsOptions, ok := o.Options.(*corev1.PodLogOptions)
   303  	if !ok {
   304  		return errors.New("unexpected logs options object")
   305  	}
   306  	if o.AllContainers && len(logsOptions.Container) > 0 {
   307  		return fmt.Errorf("--all-containers=true should not be specified with container name %s", logsOptions.Container)
   308  	}
   310  	if o.ContainerNameSpecified && len(o.Resources) == 2 {
   311  		return fmt.Errorf("only one of -c or an inline [CONTAINER] arg is allowed")
   312  	}
   314  	if o.LimitBytes < 0 {
   315  		return fmt.Errorf("--limit-bytes must be greater than 0")
   316  	}
   318  	if logsOptions.SinceSeconds != nil && *logsOptions.SinceSeconds < int64(0) {
   319  		return fmt.Errorf("--since must be greater than 0")
   320  	}
   322  	if logsOptions.TailLines != nil && *logsOptions.TailLines < -1 {
   323  		return fmt.Errorf("--tail must be greater than or equal to -1")
   324  	}
   326  	return nil
   327  }
   329  // RunLogs retrieves a pod log
   330  func (o LogsOptions) RunLogs() error {
   331  	requests, err := o.LogsForObject(o.RESTClientGetter, o.Object, o.Options, o.GetPodTimeout, o.AllContainers)
   332  	if err != nil {
   333  		return err
   334  	}
   336  	if o.Follow && len(requests) > 1 {
   337  		if len(requests) > o.MaxFollowConcurrency {
   338  			return fmt.Errorf(
   339  				"you are attempting to follow %d log streams, but maximum allowed concurrency is %d, use --max-log-requests to increase the limit",
   340  				len(requests), o.MaxFollowConcurrency,
   341  			)
   342  		}
   344  		return o.parallelConsumeRequest(requests)
   345  	}
   347  	return o.sequentialConsumeRequest(requests)
   348  }
   350  func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
   351  	reader, writer := io.Pipe()
   352  	wg := &sync.WaitGroup{}
   353  	wg.Add(len(requests))
   354  	for objRef, request := range requests {
   355  		go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) {
   356  			defer wg.Done()
   357  			out := o.addPrefixIfNeeded(objRef, writer)
   358  			if err := o.ConsumeRequestFn(request, out); err != nil {
   359  				if !o.IgnoreLogErrors {
   360  					writer.CloseWithError(err)
   362  					// It's important to return here to propagate the error via the pipe
   363  					return
   364  				}
   366  				fmt.Fprintf(writer, "error: %v\n", err)
   367  			}
   369  		}(objRef, request)
   370  	}
   372  	go func() {
   373  		wg.Wait()
   374  		writer.Close()
   375  	}()
   377  	_, err := io.Copy(o.Out, reader)
   378  	return err
   379  }
   381  func (o LogsOptions) sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
   382  	for objRef, request := range requests {
   383  		out := o.addPrefixIfNeeded(objRef, o.Out)
   384  		if err := o.ConsumeRequestFn(request, out); err != nil {
   385  			if !o.IgnoreLogErrors {
   386  				return err
   387  			}
   389  			fmt.Fprintf(o.Out, "error: %v\n", err)
   390  		}
   391  	}
   393  	return nil
   394  }
   396  func (o LogsOptions) addPrefixIfNeeded(ref corev1.ObjectReference, writer io.Writer) io.Writer {
   397  	if !o.Prefix || ref.FieldPath == "" || ref.Name == "" {
   398  		return writer
   399  	}
   401  	// We rely on ref.FieldPath to contain a reference to a container
   402  	// including a container name (not an index) so we can get a container name
   403  	// without making an extra API request.
   404  	var containerName string
   405  	containerNameMatches := o.containerNameFromRefSpecRegexp.FindStringSubmatch(ref.FieldPath)
   406  	if len(containerNameMatches) == 2 {
   407  		containerName = containerNameMatches[1]
   408  	}
   410  	prefix := fmt.Sprintf("[pod/%s/%s] ", ref.Name, containerName)
   411  	return &prefixingWriter{
   412  		prefix: []byte(prefix),
   413  		writer: writer,
   414  	}
   415  }
   417  // DefaultConsumeRequest reads the data from request and writes into
   418  // the out writer. It buffers data from requests until the newline or io.EOF
   419  // occurs in the data, so it doesn't interleave logs sub-line
   420  // when running concurrently.
   421  //
   422  // A successful read returns err == nil, not err == io.EOF.
   423  // Because the function is defined to read from request until io.EOF, it does
   424  // not treat an io.EOF as an error to be reported.
   425  func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {
   426  	readCloser, err := request.Stream(context.TODO())
   427  	if err != nil {
   428  		return err
   429  	}
   430  	defer readCloser.Close()
   432  	r := bufio.NewReader(readCloser)
   433  	for {
   434  		bytes, err := r.ReadBytes('\n')
   435  		if _, err := out.Write(bytes); err != nil {
   436  			return err
   437  		}
   439  		if err != nil {
   440  			if err != io.EOF {
   441  				return err
   442  			}
   443  			return nil
   444  		}
   445  	}
   446  }
   448  type prefixingWriter struct {
   449  	prefix []byte
   450  	writer io.Writer
   451  }
   453  func (pw *prefixingWriter) Write(p []byte) (int, error) {
   454  	if len(p) == 0 {
   455  		return 0, nil
   456  	}
   458  	// Perform an "atomic" write of a prefix and p to make sure that it doesn't interleave
   459  	// sub-line when used concurrently with io.PipeWrite.
   460  	n, err := pw.writer.Write(append(pw.prefix, p...))
   461  	if n > len(p) {
   462  		// To comply with the io.Writer interface requirements we must
   463  		// return a number of bytes written from p (0 <= n <= len(p)),
   464  		// so we are ignoring the length of the prefix here.
   465  		return len(p), err
   466  	}
   467  	return n, err
   468  }

