...

Source file src/k8s.io/component-base/logs/api/v1/text.go

Documentation: k8s.io/component-base/logs/api/v1

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package v1
    18  
    19  import (
    20  	"bufio"
    21  	"fmt"
    22  	"io"
    23  	"sync"
    24  
    25  	"github.com/go-logr/logr"
    26  
    27  	"k8s.io/component-base/featuregate"
    28  	"k8s.io/klog/v2/textlogger"
    29  )
    30  
    31  // textFactory produces klog text logger instances.
    32  type textFactory struct{}
    33  
    34  var _ LogFormatFactory = textFactory{}
    35  
    36  func (f textFactory) Feature() featuregate.Feature {
    37  	return LoggingStableOptions
    38  }
    39  
    40  func (f textFactory) Create(c LoggingConfiguration, o LoggingOptions) (logr.Logger, RuntimeControl) {
    41  	output := o.ErrorStream
    42  	var flush func()
    43  	if c.Options.Text.SplitStream {
    44  		r := &klogMsgRouter{
    45  			info:  o.InfoStream,
    46  			error: o.ErrorStream,
    47  		}
    48  		size := c.Options.Text.InfoBufferSize.Value()
    49  		if size > 0 {
    50  			// Prevent integer overflow.
    51  			if size > 2*1024*1024*1024 {
    52  				size = 2 * 1024 * 1024 * 1024
    53  			}
    54  			info := newBufferedWriter(r.info, int(size))
    55  			flush = info.Flush
    56  			r.info = info
    57  		}
    58  		output = r
    59  	}
    60  
    61  	options := []textlogger.ConfigOption{
    62  		textlogger.Verbosity(int(c.Verbosity)),
    63  		textlogger.Output(output),
    64  	}
    65  	loggerConfig := textlogger.NewConfig(options...)
    66  
    67  	// This should never fail, we produce a valid string here.
    68  	_ = loggerConfig.VModule().Set(VModuleConfigurationPflag(&c.VModule).String())
    69  
    70  	return textlogger.NewLogger(loggerConfig),
    71  		RuntimeControl{
    72  			SetVerbosityLevel: func(v uint32) error {
    73  				return loggerConfig.Verbosity().Set(fmt.Sprintf("%d", v))
    74  			},
    75  			Flush: flush,
    76  		}
    77  }
    78  
    79  type klogMsgRouter struct {
    80  	info, error io.Writer
    81  }
    82  
    83  var _ io.Writer = &klogMsgRouter{}
    84  
    85  // Write redirects the message into either the info or error
    86  // stream, depending on its type as indicated in text format
    87  // by the first byte.
    88  func (r *klogMsgRouter) Write(p []byte) (int, error) {
    89  	if len(p) == 0 {
    90  		return 0, nil
    91  	}
    92  
    93  	if p[0] == 'I' {
    94  		return r.info.Write(p)
    95  	}
    96  	return r.error.Write(p)
    97  }
    98  
    99  // bufferedWriter is an io.Writer that buffers writes in-memory before
   100  // flushing them to a wrapped io.Writer after reaching some limit
   101  // or getting flushed.
   102  type bufferedWriter struct {
   103  	mu     sync.Mutex
   104  	writer *bufio.Writer
   105  	out    io.Writer
   106  }
   107  
   108  func newBufferedWriter(out io.Writer, size int) *bufferedWriter {
   109  	return &bufferedWriter{
   110  		writer: bufio.NewWriterSize(out, size),
   111  		out:    out,
   112  	}
   113  }
   114  
   115  func (b *bufferedWriter) Write(p []byte) (int, error) {
   116  	b.mu.Lock()
   117  	defer b.mu.Unlock()
   118  
   119  	// To avoid partial writes into the underlying writer, we ensure that
   120  	// the entire new data fits into the buffer or flush first.
   121  	if len(p) > b.writer.Available() && b.writer.Buffered() > 0 {
   122  		if err := b.writer.Flush(); err != nil {
   123  			return 0, err
   124  		}
   125  	}
   126  
   127  	// If it still doesn't fit, then we bypass the now empty buffer
   128  	// and write directly.
   129  	if len(p) > b.writer.Available() {
   130  		return b.out.Write(p)
   131  	}
   132  
   133  	// This goes into the buffer.
   134  	return b.writer.Write(p)
   135  }
   136  
   137  func (b *bufferedWriter) Flush() {
   138  	b.mu.Lock()
   139  	defer b.mu.Unlock()
   140  
   141  	_ = b.writer.Flush()
   142  }
   143  

View as plain text