1
16
17 package logs
18
19 import (
20 "bufio"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "regexp"
26 "sync"
27 "time"
28
29 "github.com/spf13/cobra"
30
31 corev1 "k8s.io/api/core/v1"
32 apierrors "k8s.io/apimachinery/pkg/api/errors"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/runtime"
35 "k8s.io/cli-runtime/pkg/genericclioptions"
36 "k8s.io/cli-runtime/pkg/genericiooptions"
37 "k8s.io/client-go/rest"
38 cmdutil "k8s.io/kubectl/pkg/cmd/util"
39 "k8s.io/kubectl/pkg/polymorphichelpers"
40 "k8s.io/kubectl/pkg/scheme"
41 "k8s.io/kubectl/pkg/util"
42 "k8s.io/kubectl/pkg/util/completion"
43 "k8s.io/kubectl/pkg/util/i18n"
44 "k8s.io/kubectl/pkg/util/templates"
45 )
46
47 const (
48 logsUsageStr = "logs [-f] [-p] (POD | TYPE/NAME) [-c CONTAINER]"
49 )
50
51 var (
52 logsLong = templates.LongDesc(i18n.T(`
53 Print the logs for a container in a pod or specified resource.
54 If the pod has only one container, the container name is optional.`))
55
56 logsExample = templates.Examples(i18n.T(`
57 # Return snapshot logs from pod nginx with only one container
58 kubectl logs nginx
59
60 # Return snapshot logs from pod nginx with multi containers
61 kubectl logs nginx --all-containers=true
62
63 # Return snapshot logs from all containers in pods defined by label app=nginx
64 kubectl logs -l app=nginx --all-containers=true
65
66 # Return snapshot of previous terminated ruby container logs from pod web-1
67 kubectl logs -p -c ruby web-1
68
69 # Begin streaming the logs of the ruby container in pod web-1
70 kubectl logs -f -c ruby web-1
71
72 # Begin streaming the logs from all containers in pods defined by label app=nginx
73 kubectl logs -f -l app=nginx --all-containers=true
74
75 # Display only the most recent 20 lines of output in pod nginx
76 kubectl logs --tail=20 nginx
77
78 # Show all logs from pod nginx written in the last hour
79 kubectl logs --since=1h nginx
80
81 # Show logs from a kubelet with an expired serving certificate
82 kubectl logs --insecure-skip-tls-verify-backend nginx
83
84 # Return snapshot logs from first container of a job named hello
85 kubectl logs job/hello
86
87 # Return snapshot logs from container nginx-1 of a deployment named nginx
88 kubectl logs deployment/nginx -c nginx-1`))
89
90 selectorTail int64 = 10
91 logsUsageErrStr = fmt.Sprintf("expected '%s'.\nPOD or TYPE/NAME is a required argument for the logs command", logsUsageStr)
92 )
93
94 const (
95 defaultPodLogsTimeout = 20 * time.Second
96 )
97
98 type LogsOptions struct {
99 Namespace string
100 ResourceArg string
101 AllContainers bool
102 Options runtime.Object
103 Resources []string
104
105 ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error
106
107
108 SinceTime string
109 SinceSeconds time.Duration
110 Follow bool
111 Previous bool
112 Timestamps bool
113 IgnoreLogErrors bool
114 LimitBytes int64
115 Tail int64
116 Container string
117 InsecureSkipTLSVerifyBackend bool
118
119
120 ContainerNameSpecified bool
121 Selector string
122 MaxFollowConcurrency int
123 Prefix bool
124
125 Object runtime.Object
126 GetPodTimeout time.Duration
127 RESTClientGetter genericclioptions.RESTClientGetter
128 LogsForObject polymorphichelpers.LogsForObjectFunc
129
130 genericiooptions.IOStreams
131
132 TailSpecified bool
133
134 containerNameFromRefSpecRegexp *regexp.Regexp
135 }
136
137 func NewLogsOptions(streams genericiooptions.IOStreams, allContainers bool) *LogsOptions {
138 return &LogsOptions{
139 IOStreams: streams,
140 AllContainers: allContainers,
141 Tail: -1,
142 MaxFollowConcurrency: 5,
143
144 containerNameFromRefSpecRegexp: regexp.MustCompile(`spec\.(?:initContainers|containers|ephemeralContainers){(.+)}`),
145 }
146 }
147
148
149 func NewCmdLogs(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
150 o := NewLogsOptions(streams, false)
151
152 cmd := &cobra.Command{
153 Use: logsUsageStr,
154 DisableFlagsInUseLine: true,
155 Short: i18n.T("Print the logs for a container in a pod"),
156 Long: logsLong,
157 Example: logsExample,
158 ValidArgsFunction: completion.PodResourceNameAndContainerCompletionFunc(f),
159 Run: func(cmd *cobra.Command, args []string) {
160 cmdutil.CheckErr(o.Complete(f, cmd, args))
161 cmdutil.CheckErr(o.Validate())
162 cmdutil.CheckErr(o.RunLogs())
163 },
164 }
165 o.AddFlags(cmd)
166 return cmd
167 }
168
169 func (o *LogsOptions) AddFlags(cmd *cobra.Command) {
170 cmd.Flags().BoolVar(&o.AllContainers, "all-containers", o.AllContainers, "Get all containers' logs in the pod(s).")
171 cmd.Flags().BoolVarP(&o.Follow, "follow", "f", o.Follow, "Specify if the logs should be streamed.")
172 cmd.Flags().BoolVar(&o.Timestamps, "timestamps", o.Timestamps, "Include timestamps on each line in the log output")
173 cmd.Flags().Int64Var(&o.LimitBytes, "limit-bytes", o.LimitBytes, "Maximum bytes of logs to return. Defaults to no limit.")
174 cmd.Flags().BoolVarP(&o.Previous, "previous", "p", o.Previous, "If true, print the logs for the previous instance of the container in a pod if it exists.")
175 cmd.Flags().Int64Var(&o.Tail, "tail", o.Tail, "Lines of recent log file to display. Defaults to -1 with no selector, showing all log lines otherwise 10, if a selector is provided.")
176 cmd.Flags().BoolVar(&o.IgnoreLogErrors, "ignore-errors", o.IgnoreLogErrors, "If watching / following pod logs, allow for any errors that occur to be non-fatal")
177 cmd.Flags().StringVar(&o.SinceTime, "since-time", o.SinceTime, i18n.T("Only return logs after a specific date (RFC3339). Defaults to all logs. Only one of since-time / since may be used."))
178 cmd.Flags().DurationVar(&o.SinceSeconds, "since", o.SinceSeconds, "Only return logs newer than a relative duration like 5s, 2m, or 3h. Defaults to all logs. Only one of since-time / since may be used.")
179 cmd.Flags().StringVarP(&o.Container, "container", "c", o.Container, "Print the logs of this container")
180 cmd.Flags().BoolVar(&o.InsecureSkipTLSVerifyBackend, "insecure-skip-tls-verify-backend", o.InsecureSkipTLSVerifyBackend,
181 "Skip verifying the identity of the kubelet that logs are requested from. In theory, an attacker could provide invalid log content back. You might want to use this if your kubelet serving certificates have expired.")
182 cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodLogsTimeout)
183 cmdutil.AddLabelSelectorFlagVar(cmd, &o.Selector)
184 cmd.Flags().IntVar(&o.MaxFollowConcurrency, "max-log-requests", o.MaxFollowConcurrency, "Specify maximum number of concurrent logs to follow when using by a selector. Defaults to 5.")
185 cmd.Flags().BoolVar(&o.Prefix, "prefix", o.Prefix, "Prefix each log line with the log source (pod name and container name)")
186 }
187
188 func (o *LogsOptions) ToLogOptions() (*corev1.PodLogOptions, error) {
189 logOptions := &corev1.PodLogOptions{
190 Container: o.Container,
191 Follow: o.Follow,
192 Previous: o.Previous,
193 Timestamps: o.Timestamps,
194 InsecureSkipTLSVerifyBackend: o.InsecureSkipTLSVerifyBackend,
195 }
196
197 if len(o.SinceTime) > 0 {
198 t, err := util.ParseRFC3339(o.SinceTime, metav1.Now)
199 if err != nil {
200 return nil, err
201 }
202
203 logOptions.SinceTime = &t
204 }
205
206 if o.LimitBytes != 0 {
207 logOptions.LimitBytes = &o.LimitBytes
208 }
209
210 if o.SinceSeconds != 0 {
211
212 sec := int64(o.SinceSeconds.Round(time.Second).Seconds())
213 logOptions.SinceSeconds = &sec
214 }
215
216 if len(o.Selector) > 0 && o.Tail == -1 && !o.TailSpecified {
217 logOptions.TailLines = &selectorTail
218 } else if o.Tail != -1 {
219 logOptions.TailLines = &o.Tail
220 }
221
222 return logOptions, nil
223 }
224
225 func (o *LogsOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
226 o.ContainerNameSpecified = cmd.Flag("container").Changed
227 o.TailSpecified = cmd.Flag("tail").Changed
228 o.Resources = args
229
230 switch len(args) {
231 case 0:
232 if len(o.Selector) == 0 {
233 return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)
234 }
235 case 1:
236 o.ResourceArg = args[0]
237 if len(o.Selector) != 0 {
238 return cmdutil.UsageErrorf(cmd, "only a selector (-l) or a POD name is allowed")
239 }
240 case 2:
241 o.ResourceArg = args[0]
242 o.Container = args[1]
243 default:
244 return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)
245 }
246 var err error
247 o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()
248 if err != nil {
249 return err
250 }
251
252 o.ConsumeRequestFn = DefaultConsumeRequest
253
254 o.GetPodTimeout, err = cmdutil.GetPodRunningTimeoutFlag(cmd)
255 if err != nil {
256 return err
257 }
258
259 o.Options, err = o.ToLogOptions()
260 if err != nil {
261 return err
262 }
263
264 o.RESTClientGetter = f
265 o.LogsForObject = polymorphichelpers.LogsForObjectFn
266
267 if o.Object == nil {
268 builder := f.NewBuilder().
269 WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
270 NamespaceParam(o.Namespace).DefaultNamespace().
271 SingleResourceType()
272 if o.ResourceArg != "" {
273 builder.ResourceNames("pods", o.ResourceArg)
274 }
275 if o.Selector != "" {
276 builder.ResourceTypes("pods").LabelSelectorParam(o.Selector)
277 }
278 infos, err := builder.Do().Infos()
279 if err != nil {
280 if apierrors.IsNotFound(err) {
281 err = fmt.Errorf("error from server (NotFound): %w in namespace %q", err, o.Namespace)
282 }
283 return err
284 }
285 if o.Selector == "" && len(infos) != 1 {
286 return errors.New("expected a resource")
287 }
288 o.Object = infos[0].Object
289 if o.Selector != "" && len(o.Object.(*corev1.PodList).Items) == 0 {
290 fmt.Fprintf(o.ErrOut, "No resources found in %s namespace.\n", o.Namespace)
291 }
292 }
293
294 return nil
295 }
296
297 func (o LogsOptions) Validate() error {
298 if len(o.SinceTime) > 0 && o.SinceSeconds != 0 {
299 return fmt.Errorf("at most one of `sinceTime` or `sinceSeconds` may be specified")
300 }
301
302 logsOptions, ok := o.Options.(*corev1.PodLogOptions)
303 if !ok {
304 return errors.New("unexpected logs options object")
305 }
306 if o.AllContainers && len(logsOptions.Container) > 0 {
307 return fmt.Errorf("--all-containers=true should not be specified with container name %s", logsOptions.Container)
308 }
309
310 if o.ContainerNameSpecified && len(o.Resources) == 2 {
311 return fmt.Errorf("only one of -c or an inline [CONTAINER] arg is allowed")
312 }
313
314 if o.LimitBytes < 0 {
315 return fmt.Errorf("--limit-bytes must be greater than 0")
316 }
317
318 if logsOptions.SinceSeconds != nil && *logsOptions.SinceSeconds < int64(0) {
319 return fmt.Errorf("--since must be greater than 0")
320 }
321
322 if logsOptions.TailLines != nil && *logsOptions.TailLines < -1 {
323 return fmt.Errorf("--tail must be greater than or equal to -1")
324 }
325
326 return nil
327 }
328
329
330 func (o LogsOptions) RunLogs() error {
331 requests, err := o.LogsForObject(o.RESTClientGetter, o.Object, o.Options, o.GetPodTimeout, o.AllContainers)
332 if err != nil {
333 return err
334 }
335
336 if o.Follow && len(requests) > 1 {
337 if len(requests) > o.MaxFollowConcurrency {
338 return fmt.Errorf(
339 "you are attempting to follow %d log streams, but maximum allowed concurrency is %d, use --max-log-requests to increase the limit",
340 len(requests), o.MaxFollowConcurrency,
341 )
342 }
343
344 return o.parallelConsumeRequest(requests)
345 }
346
347 return o.sequentialConsumeRequest(requests)
348 }
349
350 func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
351 reader, writer := io.Pipe()
352 wg := &sync.WaitGroup{}
353 wg.Add(len(requests))
354 for objRef, request := range requests {
355 go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) {
356 defer wg.Done()
357 out := o.addPrefixIfNeeded(objRef, writer)
358 if err := o.ConsumeRequestFn(request, out); err != nil {
359 if !o.IgnoreLogErrors {
360 writer.CloseWithError(err)
361
362
363 return
364 }
365
366 fmt.Fprintf(writer, "error: %v\n", err)
367 }
368
369 }(objRef, request)
370 }
371
372 go func() {
373 wg.Wait()
374 writer.Close()
375 }()
376
377 _, err := io.Copy(o.Out, reader)
378 return err
379 }
380
381 func (o LogsOptions) sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
382 for objRef, request := range requests {
383 out := o.addPrefixIfNeeded(objRef, o.Out)
384 if err := o.ConsumeRequestFn(request, out); err != nil {
385 if !o.IgnoreLogErrors {
386 return err
387 }
388
389 fmt.Fprintf(o.Out, "error: %v\n", err)
390 }
391 }
392
393 return nil
394 }
395
396 func (o LogsOptions) addPrefixIfNeeded(ref corev1.ObjectReference, writer io.Writer) io.Writer {
397 if !o.Prefix || ref.FieldPath == "" || ref.Name == "" {
398 return writer
399 }
400
401
402
403
404 var containerName string
405 containerNameMatches := o.containerNameFromRefSpecRegexp.FindStringSubmatch(ref.FieldPath)
406 if len(containerNameMatches) == 2 {
407 containerName = containerNameMatches[1]
408 }
409
410 prefix := fmt.Sprintf("[pod/%s/%s] ", ref.Name, containerName)
411 return &prefixingWriter{
412 prefix: []byte(prefix),
413 writer: writer,
414 }
415 }
416
417
418
419
420
421
422
423
424
425 func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {
426 readCloser, err := request.Stream(context.TODO())
427 if err != nil {
428 return err
429 }
430 defer readCloser.Close()
431
432 r := bufio.NewReader(readCloser)
433 for {
434 bytes, err := r.ReadBytes('\n')
435 if _, err := out.Write(bytes); err != nil {
436 return err
437 }
438
439 if err != nil {
440 if err != io.EOF {
441 return err
442 }
443 return nil
444 }
445 }
446 }
447
448 type prefixingWriter struct {
449 prefix []byte
450 writer io.Writer
451 }
452
453 func (pw *prefixingWriter) Write(p []byte) (int, error) {
454 if len(p) == 0 {
455 return 0, nil
456 }
457
458
459
460 n, err := pw.writer.Write(append(pw.prefix, p...))
461 if n > len(p) {
462
463
464
465 return len(p), err
466 }
467 return n, err
468 }
469
View as plain text