...

Source file src/k8s.io/kubernetes/test/integration/logs/benchmark/benchmark_test.go

Documentation: k8s.io/kubernetes/test/integration/logs/benchmark

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package benchmark
    18  
    19  import (
    20  	"errors"
    21  	"io"
    22  	"io/fs"
    23  	"os"
    24  	"path/filepath"
    25  	"regexp"
    26  	"strconv"
    27  	"strings"
    28  	"sync"
    29  	"testing"
    30  	"time"
    31  
    32  	"k8s.io/component-base/featuregate"
    33  	logsapi "k8s.io/component-base/logs/api/v1"
    34  	_ "k8s.io/component-base/logs/json/register"
    35  	"k8s.io/klog/v2"
    36  )
    37  
    38  func BenchmarkEncoding(b *testing.B) {
    39  	seen := map[string]bool{}
    40  
    41  	// Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log
    42  	// messages. We generate one sub-benchmark for each file where logging
    43  	// is tested with the log level from the directory.
    44  	if err := filepath.Walk("data", func(path string, info fs.FileInfo, err error) error {
    45  		if err != nil {
    46  			return err
    47  		}
    48  		if !strings.HasSuffix(path, ".log") {
    49  			return nil
    50  		}
    51  		messages, stats, err := loadLog(path)
    52  		if err != nil {
    53  			return err
    54  		}
    55  		// Only print unique file statistics. They get shown for the
    56  		// first file where new statistics are encountered. The
    57  		// assumption here is that the there are no files with
    58  		// different content and exactly the same statistics.
    59  		statsStr := stats.String()
    60  		if !seen[statsStr] {
    61  			b.Log(path + "\n" + statsStr)
    62  			seen[statsStr] = true
    63  		}
    64  		b.Run(strings.TrimSuffix(strings.TrimPrefix(path, "data/"), ".log"), func(b *testing.B) {
    65  			// Take verbosity threshold from directory, if present.
    66  			vMatch := regexp.MustCompile(`/v(\d+)/`).FindStringSubmatch(path)
    67  			v := 0
    68  			if vMatch != nil {
    69  				v, _ = strconv.Atoi(vMatch[1])
    70  			}
    71  
    72  			fileSizes := map[string]int{}
    73  			test := func(b *testing.B, format string, print func(logger klog.Logger, item logMessage)) {
    74  				state := klog.CaptureState()
    75  				defer state.Restore()
    76  
    77  				// To make the tests a bit more realistic, at
    78  				// least do system calls during each write.
    79  				output := newBytesWritten(b, "/dev/null")
    80  				c := logsapi.NewLoggingConfiguration()
    81  				c.Format = format
    82  				o := logsapi.LoggingOptions{
    83  					ErrorStream: output,
    84  					InfoStream:  output,
    85  				}
    86  				klog.SetOutput(output)
    87  				defer func() {
    88  					if err := logsapi.ResetForTest(nil); err != nil {
    89  						b.Errorf("error resetting logsapi: %v", err)
    90  					}
    91  				}()
    92  				if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil {
    93  					b.Fatalf("Unexpected error configuring logging: %v", err)
    94  				}
    95  				logger := klog.Background()
    96  				b.ResetTimer()
    97  				start := time.Now()
    98  				total := int64(0)
    99  				for i := 0; i < b.N; i++ {
   100  					for _, item := range messages {
   101  						if item.verbosity <= v {
   102  							print(logger, item)
   103  							total++
   104  						}
   105  					}
   106  				}
   107  				end := time.Now()
   108  				duration := end.Sub(start)
   109  
   110  				// Report messages/s instead of ns/op because "op" varies.
   111  				b.ReportMetric(0, "ns/op")
   112  				b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s")
   113  				fileSizes[filepath.Base(b.Name())] = int(output.bytesWritten)
   114  			}
   115  
   116  			b.Run("printf", func(b *testing.B) {
   117  				test(b, "text", func(_ klog.Logger, item logMessage) {
   118  					printf(item)
   119  				})
   120  			})
   121  			b.Run("structured", func(b *testing.B) {
   122  				test(b, "text", prints)
   123  			})
   124  			b.Run("JSON", func(b *testing.B) {
   125  				test(b, "json", prints)
   126  			})
   127  
   128  			b.Logf("%s: file sizes: %v\n", path, fileSizes)
   129  		})
   130  		return nil
   131  	}); err != nil {
   132  		b.Fatalf("reading 'data' directory: %v", err)
   133  	}
   134  }
   135  
   136  type loadGeneratorConfig struct {
   137  	// Length of the message written in each log entry.
   138  	messageLength int
   139  
   140  	// Percentage of error log entries written.
   141  	errorPercentage float64
   142  
   143  	// Number of concurrent goroutines that generate log entries.
   144  	workers int
   145  }
   146  
   147  // BenchmarkWriting simulates writing of a stream which mixes info and error log
   148  // messages at a certain ratio. In contrast to BenchmarkEncoding, this stresses
   149  // the output handling and includes the usual additional information (caller,
   150  // time stamp).
   151  //
   152  // See https://github.com/kubernetes/kubernetes/issues/107029 for the
   153  // motivation.
   154  func BenchmarkWriting(b *testing.B) {
   155  	// This could be made configurable and/or we could benchmark different
   156  	// configurations automatically.
   157  	config := loadGeneratorConfig{
   158  		messageLength:   300,
   159  		errorPercentage: 1.0,
   160  		workers:         100,
   161  	}
   162  
   163  	benchmarkWriting(b, config)
   164  }
   165  
   166  func benchmarkWriting(b *testing.B, config loadGeneratorConfig) {
   167  	b.Run("discard", func(b *testing.B) {
   168  		benchmarkOutputFormats(b, config, true)
   169  	})
   170  	b.Run("tmp-files", func(b *testing.B) {
   171  		benchmarkOutputFormats(b, config, false)
   172  	})
   173  }
   174  
   175  func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) {
   176  	b.Run("structured", func(b *testing.B) {
   177  		benchmarkOutputFormat(b, config, discard, "text")
   178  	})
   179  	b.Run("JSON", func(b *testing.B) {
   180  		benchmarkOutputFormat(b, config, discard, "json")
   181  	})
   182  }
   183  
   184  func benchmarkOutputFormat(b *testing.B, config loadGeneratorConfig, discard bool, format string) {
   185  	b.Run("single-stream", func(b *testing.B) {
   186  		benchmarkOutputFormatStream(b, config, discard, format, false)
   187  	})
   188  	b.Run("split-stream", func(b *testing.B) {
   189  		benchmarkOutputFormatStream(b, config, discard, format, true)
   190  	})
   191  }
   192  
   193  func benchmarkOutputFormatStream(b *testing.B, config loadGeneratorConfig, discard bool, format string, splitStreams bool) {
   194  	tmpDir := b.TempDir()
   195  	state := klog.CaptureState()
   196  	defer state.Restore()
   197  
   198  	featureGate := featuregate.NewFeatureGate()
   199  	logsapi.AddFeatureGates(featureGate)
   200  	if err := featureGate.SetFromMap(map[string]bool{
   201  		string(logsapi.LoggingAlphaOptions): true,
   202  		string(logsapi.LoggingBetaOptions):  true,
   203  	}); err != nil {
   204  		b.Fatalf("Set feature gates: %v", err)
   205  	}
   206  
   207  	// Create a logging configuration using the exact same code as a normal
   208  	// component. In order to redirect output, we provide a LoggingOptions
   209  	// instance.
   210  	var o logsapi.LoggingOptions
   211  	c := logsapi.NewLoggingConfiguration()
   212  	c.Format = format
   213  	if splitStreams {
   214  		c.Options.JSON.SplitStream = true
   215  		if err := c.Options.JSON.InfoBufferSize.Set("64Ki"); err != nil {
   216  			b.Fatalf("Error setting buffer size: %v", err)
   217  		}
   218  		c.Options.Text.SplitStream = true
   219  		if err := c.Options.Text.InfoBufferSize.Set("64Ki"); err != nil {
   220  			b.Fatalf("Error setting buffer size: %v", err)
   221  		}
   222  	}
   223  	var files []*os.File
   224  	if discard {
   225  		o.ErrorStream = io.Discard
   226  		o.InfoStream = io.Discard
   227  	} else {
   228  		out1, err := os.Create(filepath.Join(tmpDir, "stream-1.log"))
   229  		if err != nil {
   230  			b.Fatal(err)
   231  		}
   232  		defer out1.Close()
   233  		out2, err := os.Create(filepath.Join(tmpDir, "stream-2.log"))
   234  		if err != nil {
   235  			b.Fatal(err)
   236  		}
   237  		defer out2.Close()
   238  
   239  		if splitStreams {
   240  			files = append(files, out1, out2)
   241  			o.ErrorStream = out1
   242  			o.InfoStream = out2
   243  		} else {
   244  			files = append(files, out1)
   245  			o.ErrorStream = out1
   246  			o.InfoStream = out1
   247  		}
   248  	}
   249  
   250  	klog.SetOutput(o.ErrorStream)
   251  	defer func() {
   252  		if err := logsapi.ResetForTest(nil); err != nil {
   253  			b.Errorf("error resetting logsapi: %v", err)
   254  		}
   255  	}()
   256  	if err := logsapi.ValidateAndApplyWithOptions(c, &o, featureGate); err != nil {
   257  		b.Fatalf("Unexpected error configuring logging: %v", err)
   258  	}
   259  
   260  	generateOutput(b, config, files...)
   261  }
   262  
   263  func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) {
   264  	msg := strings.Repeat("X", config.messageLength)
   265  	err := errors.New("fail")
   266  	start := time.Now()
   267  
   268  	// Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low.
   269  	n := b.N * 1000
   270  	total := config.workers * n
   271  
   272  	b.ResetTimer()
   273  	var wg sync.WaitGroup
   274  	for i := 0; i < config.workers; i++ {
   275  		wg.Add(1)
   276  		go func() {
   277  			defer wg.Done()
   278  
   279  			acc := 0.0
   280  			for i := 0; i < n; i++ {
   281  				if acc > 100 {
   282  					klog.ErrorS(err, msg, "key", "value")
   283  					acc -= 100
   284  				} else {
   285  					klog.InfoS(msg, "key", "value")
   286  				}
   287  				acc += config.errorPercentage
   288  			}
   289  		}()
   290  	}
   291  	wg.Wait()
   292  	klog.Flush()
   293  	b.StopTimer()
   294  	end := time.Now()
   295  	duration := end.Sub(start)
   296  
   297  	// Report messages/s instead of ns/op because "op" varies.
   298  	b.ReportMetric(0, "ns/op")
   299  	b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s")
   300  
   301  	// Print some information about the result.
   302  	b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds())
   303  	for i, file := range files {
   304  		if file != nil {
   305  			pos, err := file.Seek(0, io.SeekEnd)
   306  			if err != nil {
   307  				b.Fatal(err)
   308  			}
   309  			if _, err := file.Seek(0, io.SeekStart); err != nil {
   310  				b.Fatal(err)
   311  			}
   312  			max := 50
   313  			buffer := make([]byte, max)
   314  			actual, err := file.Read(buffer)
   315  			if err != nil {
   316  				if err != io.EOF {
   317  					b.Fatal(err)
   318  				}
   319  				buffer = nil
   320  			}
   321  			if actual == max {
   322  				buffer[max-3] = '.'
   323  				buffer[max-2] = '.'
   324  				buffer[max-1] = '.'
   325  			}
   326  			b.Logf("      %d bytes to file #%d -> %.1fMiB/s (starts with: %s)", pos, i, float64(pos)/duration.Seconds()/1024/1024, string(buffer))
   327  		}
   328  	}
   329  }
   330  

View as plain text