1
16
17 package benchmark
18
19 import (
20 "errors"
21 "io"
22 "io/fs"
23 "os"
24 "path/filepath"
25 "regexp"
26 "strconv"
27 "strings"
28 "sync"
29 "testing"
30 "time"
31
32 "k8s.io/component-base/featuregate"
33 logsapi "k8s.io/component-base/logs/api/v1"
34 _ "k8s.io/component-base/logs/json/register"
35 "k8s.io/klog/v2"
36 )
37
38 func BenchmarkEncoding(b *testing.B) {
39 seen := map[string]bool{}
40
41
42
43
44 if err := filepath.Walk("data", func(path string, info fs.FileInfo, err error) error {
45 if err != nil {
46 return err
47 }
48 if !strings.HasSuffix(path, ".log") {
49 return nil
50 }
51 messages, stats, err := loadLog(path)
52 if err != nil {
53 return err
54 }
55
56
57
58
59 statsStr := stats.String()
60 if !seen[statsStr] {
61 b.Log(path + "\n" + statsStr)
62 seen[statsStr] = true
63 }
64 b.Run(strings.TrimSuffix(strings.TrimPrefix(path, "data/"), ".log"), func(b *testing.B) {
65
66 vMatch := regexp.MustCompile(`/v(\d+)/`).FindStringSubmatch(path)
67 v := 0
68 if vMatch != nil {
69 v, _ = strconv.Atoi(vMatch[1])
70 }
71
72 fileSizes := map[string]int{}
73 test := func(b *testing.B, format string, print func(logger klog.Logger, item logMessage)) {
74 state := klog.CaptureState()
75 defer state.Restore()
76
77
78
79 output := newBytesWritten(b, "/dev/null")
80 c := logsapi.NewLoggingConfiguration()
81 c.Format = format
82 o := logsapi.LoggingOptions{
83 ErrorStream: output,
84 InfoStream: output,
85 }
86 klog.SetOutput(output)
87 defer func() {
88 if err := logsapi.ResetForTest(nil); err != nil {
89 b.Errorf("error resetting logsapi: %v", err)
90 }
91 }()
92 if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil {
93 b.Fatalf("Unexpected error configuring logging: %v", err)
94 }
95 logger := klog.Background()
96 b.ResetTimer()
97 start := time.Now()
98 total := int64(0)
99 for i := 0; i < b.N; i++ {
100 for _, item := range messages {
101 if item.verbosity <= v {
102 print(logger, item)
103 total++
104 }
105 }
106 }
107 end := time.Now()
108 duration := end.Sub(start)
109
110
111 b.ReportMetric(0, "ns/op")
112 b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s")
113 fileSizes[filepath.Base(b.Name())] = int(output.bytesWritten)
114 }
115
116 b.Run("printf", func(b *testing.B) {
117 test(b, "text", func(_ klog.Logger, item logMessage) {
118 printf(item)
119 })
120 })
121 b.Run("structured", func(b *testing.B) {
122 test(b, "text", prints)
123 })
124 b.Run("JSON", func(b *testing.B) {
125 test(b, "json", prints)
126 })
127
128 b.Logf("%s: file sizes: %v\n", path, fileSizes)
129 })
130 return nil
131 }); err != nil {
132 b.Fatalf("reading 'data' directory: %v", err)
133 }
134 }
135
136 type loadGeneratorConfig struct {
137
138 messageLength int
139
140
141 errorPercentage float64
142
143
144 workers int
145 }
146
147
148
149
150
151
152
153
154 func BenchmarkWriting(b *testing.B) {
155
156
157 config := loadGeneratorConfig{
158 messageLength: 300,
159 errorPercentage: 1.0,
160 workers: 100,
161 }
162
163 benchmarkWriting(b, config)
164 }
165
166 func benchmarkWriting(b *testing.B, config loadGeneratorConfig) {
167 b.Run("discard", func(b *testing.B) {
168 benchmarkOutputFormats(b, config, true)
169 })
170 b.Run("tmp-files", func(b *testing.B) {
171 benchmarkOutputFormats(b, config, false)
172 })
173 }
174
175 func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) {
176 b.Run("structured", func(b *testing.B) {
177 benchmarkOutputFormat(b, config, discard, "text")
178 })
179 b.Run("JSON", func(b *testing.B) {
180 benchmarkOutputFormat(b, config, discard, "json")
181 })
182 }
183
184 func benchmarkOutputFormat(b *testing.B, config loadGeneratorConfig, discard bool, format string) {
185 b.Run("single-stream", func(b *testing.B) {
186 benchmarkOutputFormatStream(b, config, discard, format, false)
187 })
188 b.Run("split-stream", func(b *testing.B) {
189 benchmarkOutputFormatStream(b, config, discard, format, true)
190 })
191 }
192
193 func benchmarkOutputFormatStream(b *testing.B, config loadGeneratorConfig, discard bool, format string, splitStreams bool) {
194 tmpDir := b.TempDir()
195 state := klog.CaptureState()
196 defer state.Restore()
197
198 featureGate := featuregate.NewFeatureGate()
199 logsapi.AddFeatureGates(featureGate)
200 if err := featureGate.SetFromMap(map[string]bool{
201 string(logsapi.LoggingAlphaOptions): true,
202 string(logsapi.LoggingBetaOptions): true,
203 }); err != nil {
204 b.Fatalf("Set feature gates: %v", err)
205 }
206
207
208
209
210 var o logsapi.LoggingOptions
211 c := logsapi.NewLoggingConfiguration()
212 c.Format = format
213 if splitStreams {
214 c.Options.JSON.SplitStream = true
215 if err := c.Options.JSON.InfoBufferSize.Set("64Ki"); err != nil {
216 b.Fatalf("Error setting buffer size: %v", err)
217 }
218 c.Options.Text.SplitStream = true
219 if err := c.Options.Text.InfoBufferSize.Set("64Ki"); err != nil {
220 b.Fatalf("Error setting buffer size: %v", err)
221 }
222 }
223 var files []*os.File
224 if discard {
225 o.ErrorStream = io.Discard
226 o.InfoStream = io.Discard
227 } else {
228 out1, err := os.Create(filepath.Join(tmpDir, "stream-1.log"))
229 if err != nil {
230 b.Fatal(err)
231 }
232 defer out1.Close()
233 out2, err := os.Create(filepath.Join(tmpDir, "stream-2.log"))
234 if err != nil {
235 b.Fatal(err)
236 }
237 defer out2.Close()
238
239 if splitStreams {
240 files = append(files, out1, out2)
241 o.ErrorStream = out1
242 o.InfoStream = out2
243 } else {
244 files = append(files, out1)
245 o.ErrorStream = out1
246 o.InfoStream = out1
247 }
248 }
249
250 klog.SetOutput(o.ErrorStream)
251 defer func() {
252 if err := logsapi.ResetForTest(nil); err != nil {
253 b.Errorf("error resetting logsapi: %v", err)
254 }
255 }()
256 if err := logsapi.ValidateAndApplyWithOptions(c, &o, featureGate); err != nil {
257 b.Fatalf("Unexpected error configuring logging: %v", err)
258 }
259
260 generateOutput(b, config, files...)
261 }
262
263 func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) {
264 msg := strings.Repeat("X", config.messageLength)
265 err := errors.New("fail")
266 start := time.Now()
267
268
269 n := b.N * 1000
270 total := config.workers * n
271
272 b.ResetTimer()
273 var wg sync.WaitGroup
274 for i := 0; i < config.workers; i++ {
275 wg.Add(1)
276 go func() {
277 defer wg.Done()
278
279 acc := 0.0
280 for i := 0; i < n; i++ {
281 if acc > 100 {
282 klog.ErrorS(err, msg, "key", "value")
283 acc -= 100
284 } else {
285 klog.InfoS(msg, "key", "value")
286 }
287 acc += config.errorPercentage
288 }
289 }()
290 }
291 wg.Wait()
292 klog.Flush()
293 b.StopTimer()
294 end := time.Now()
295 duration := end.Sub(start)
296
297
298 b.ReportMetric(0, "ns/op")
299 b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s")
300
301
302 b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds())
303 for i, file := range files {
304 if file != nil {
305 pos, err := file.Seek(0, io.SeekEnd)
306 if err != nil {
307 b.Fatal(err)
308 }
309 if _, err := file.Seek(0, io.SeekStart); err != nil {
310 b.Fatal(err)
311 }
312 max := 50
313 buffer := make([]byte, max)
314 actual, err := file.Read(buffer)
315 if err != nil {
316 if err != io.EOF {
317 b.Fatal(err)
318 }
319 buffer = nil
320 }
321 if actual == max {
322 buffer[max-3] = '.'
323 buffer[max-2] = '.'
324 buffer[max-1] = '.'
325 }
326 b.Logf(" %d bytes to file #%d -> %.1fMiB/s (starts with: %s)", pos, i, float64(pos)/duration.Seconds()/1024/1024, string(buffer))
327 }
328 }
329 }
330
View as plain text