1
16
17 package logs
18
19 import (
20 "compress/gzip"
21 "context"
22 "fmt"
23 "io"
24 "os"
25 "path/filepath"
26 "sort"
27 "strings"
28 "sync"
29
30 "k8s.io/client-go/util/workqueue"
31 "k8s.io/klog/v2"
32
33 "k8s.io/apimachinery/pkg/api/resource"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/util/wait"
36 internalapi "k8s.io/cri-api/pkg/apis"
37 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
38 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
39 "k8s.io/utils/clock"
40 )
41
42 const (
43
44
45 timestampFormat = "20060102-150405"
46
47 compressSuffix = ".gz"
48
49 tmpSuffix = ".tmp"
50 )
51
52
53
54
55 type ContainerLogManager interface {
56
57
58 Start()
59
60 Clean(ctx context.Context, containerID string) error
61 }
62
63
64
65 type LogRotatePolicy struct {
66
67
68 MaxSize int64
69
70
71 MaxFiles int
72 }
73
74
75
76
77 func GetAllLogs(log string) ([]string, error) {
78
79 pattern := fmt.Sprintf("%s.*", log)
80 logs, err := filepath.Glob(pattern)
81 if err != nil {
82 return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
83 }
84 inuse, _ := filterUnusedLogs(logs)
85 sort.Strings(inuse)
86 return append(inuse, log), nil
87 }
88
89
90 type compressReadCloser struct {
91 f *os.File
92 *gzip.Reader
93 }
94
95 func (rc *compressReadCloser) Close() error {
96 ferr := rc.f.Close()
97 rerr := rc.Reader.Close()
98 if ferr != nil {
99 return ferr
100 }
101 if rerr != nil {
102 return rerr
103 }
104 return nil
105 }
106
107
108
109
110 func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
111 if !strings.HasSuffix(log, compressSuffix) {
112 return nil, fmt.Errorf("log is not compressed")
113 }
114 f, err := os.Open(log)
115 if err != nil {
116 return nil, fmt.Errorf("failed to open log: %v", err)
117 }
118 defer func() {
119 if retErr != nil {
120 f.Close()
121 }
122 }()
123 r, err := gzip.NewReader(f)
124 if err != nil {
125 return nil, fmt.Errorf("failed to create gzip reader: %v", err)
126 }
127 return &compressReadCloser{f: f, Reader: r}, nil
128 }
129
130
131 func parseMaxSize(size string) (int64, error) {
132 quantity, err := resource.ParseQuantity(size)
133 if err != nil {
134 return 0, err
135 }
136 maxSize, ok := quantity.AsInt64()
137 if !ok {
138 return 0, fmt.Errorf("invalid max log size")
139 }
140 return maxSize, nil
141 }
142
143 type containerLogManager struct {
144 runtimeService internalapi.RuntimeService
145 osInterface kubecontainer.OSInterface
146 policy LogRotatePolicy
147 clock clock.Clock
148 mutex sync.Mutex
149 queue workqueue.RateLimitingInterface
150 maxWorkers int
151 monitoringPeriod metav1.Duration
152 }
153
154
155 func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int, maxWorkers int, monitorInterval metav1.Duration) (ContainerLogManager, error) {
156 if maxFiles <= 1 {
157 return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
158 }
159 parsedMaxSize, err := parseMaxSize(maxSize)
160 if err != nil {
161 return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
162 }
163
164 if parsedMaxSize < 0 {
165 return NewStubContainerLogManager(), nil
166 }
167
168 return &containerLogManager{
169 osInterface: osInterface,
170 runtimeService: runtimeService,
171 policy: LogRotatePolicy{
172 MaxSize: parsedMaxSize,
173 MaxFiles: maxFiles,
174 },
175 clock: clock.RealClock{},
176 mutex: sync.Mutex{},
177 maxWorkers: maxWorkers,
178 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
179 monitoringPeriod: monitorInterval,
180 }, nil
181 }
182
183
184 func (c *containerLogManager) Start() {
185 ctx := context.Background()
186 klog.InfoS("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod)
187 for i := 0; i < c.maxWorkers; i++ {
188 worker := i + 1
189 go c.processQueueItems(ctx, worker)
190 }
191
192 go wait.Forever(func() {
193 if err := c.rotateLogs(ctx); err != nil {
194 klog.ErrorS(err, "Failed to rotate container logs")
195 }
196 }, c.monitoringPeriod.Duration)
197 }
198
199
200 func (c *containerLogManager) Clean(ctx context.Context, containerID string) error {
201 c.mutex.Lock()
202 defer c.mutex.Unlock()
203 resp, err := c.runtimeService.ContainerStatus(ctx, containerID, false)
204 if err != nil {
205 return fmt.Errorf("failed to get container status %q: %v", containerID, err)
206 }
207 if resp.GetStatus() == nil {
208 return fmt.Errorf("container status is nil for %q", containerID)
209 }
210 pattern := fmt.Sprintf("%s*", resp.GetStatus().GetLogPath())
211 logs, err := c.osInterface.Glob(pattern)
212 if err != nil {
213 return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
214 }
215
216 for _, l := range logs {
217 if err := c.osInterface.Remove(l); err != nil && !os.IsNotExist(err) {
218 return fmt.Errorf("failed to remove container %q log %q: %v", containerID, l, err)
219 }
220 }
221
222 return nil
223 }
224
225 func (c *containerLogManager) processQueueItems(ctx context.Context, worker int) {
226 klog.V(4).InfoS("Starting container log rotation worker", "workerID", worker)
227 for c.processContainer(ctx, worker) {
228 }
229 klog.V(4).InfoS("Terminating container log rotation worker", "workerID", worker)
230 }
231
232 func (c *containerLogManager) rotateLogs(ctx context.Context) error {
233 c.mutex.Lock()
234 defer c.mutex.Unlock()
235 klog.V(4).InfoS("Starting container log rotation sequence")
236
237 containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
238 if err != nil {
239 return fmt.Errorf("failed to list containers: %v", err)
240 }
241 for _, container := range containers {
242
243
244 if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
245 continue
246 }
247
248 if v := klog.V(4); v.Enabled() {
249 klog.V(4).InfoS("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels())
250 }
251 c.queue.Add(container.GetId())
252 }
253 return nil
254 }
255
256 func (c *containerLogManager) processContainer(ctx context.Context, worker int) (ok bool) {
257 key, quit := c.queue.Get()
258 if quit {
259 return false
260 }
261 defer func() {
262 c.queue.Done(key)
263 c.queue.Forget(key)
264 }()
265
266 ok = true
267 id := key.(string)
268
269 resp, err := c.runtimeService.ContainerStatus(ctx, id, false)
270 if err != nil {
271 klog.ErrorS(err, "Failed to get container status", "worker", worker, "containerID", id)
272 return
273 }
274 if resp.GetStatus() == nil {
275 klog.ErrorS(err, "Container status is nil", "worker", worker, "containerID", id)
276 return
277 }
278 path := resp.GetStatus().GetLogPath()
279 info, err := c.osInterface.Stat(path)
280
281 if err != nil {
282 if !os.IsNotExist(err) {
283 klog.ErrorS(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path)
284 return
285 }
286
287 if err = c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
288 klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path)
289 return
290 }
291
292 info, err = c.osInterface.Stat(path)
293 if err != nil {
294 klog.ErrorS(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path)
295 return
296 }
297 }
298 if info.Size() < c.policy.MaxSize {
299 klog.V(7).InfoS("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
300 return
301 }
302
303 if err := c.rotateLog(ctx, id, path); err != nil {
304 klog.ErrorS(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
305 return
306 }
307 return
308 }
309
310 func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) error {
311
312 pattern := fmt.Sprintf("%s.*", log)
313 logs, err := filepath.Glob(pattern)
314 if err != nil {
315 return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
316 }
317
318 logs, err = c.cleanupUnusedLogs(logs)
319 if err != nil {
320 return fmt.Errorf("failed to cleanup logs: %v", err)
321 }
322
323 logs, err = c.removeExcessLogs(logs)
324 if err != nil {
325 return fmt.Errorf("failed to remove excess logs: %v", err)
326 }
327
328
329 for _, l := range logs {
330 if strings.HasSuffix(l, compressSuffix) {
331 continue
332 }
333 if err := c.compressLog(l); err != nil {
334 return fmt.Errorf("failed to compress log %q: %v", l, err)
335 }
336 }
337
338 if err := c.rotateLatestLog(ctx, id, log); err != nil {
339 return fmt.Errorf("failed to rotate log %q: %v", log, err)
340 }
341
342 return nil
343 }
344
345
346
347 func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
348 inuse, unused := filterUnusedLogs(logs)
349 for _, l := range unused {
350 if err := c.osInterface.Remove(l); err != nil {
351 return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
352 }
353 }
354 return inuse, nil
355 }
356
357
358
359 func filterUnusedLogs(logs []string) (inuse []string, unused []string) {
360 for _, l := range logs {
361 if isInUse(l, logs) {
362 inuse = append(inuse, l)
363 } else {
364 unused = append(unused, l)
365 }
366 }
367 return inuse, unused
368 }
369
370
371 func isInUse(l string, logs []string) bool {
372
373 if strings.HasSuffix(l, tmpSuffix) {
374 return false
375 }
376
377 if strings.HasSuffix(l, compressSuffix) {
378 return true
379 }
380
381 for _, another := range logs {
382 if l+compressSuffix == another {
383 return false
384 }
385 }
386 return true
387 }
388
389
390 func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) {
391
392 sort.Strings(logs)
393
394
395
396
397 maxRotatedFiles := c.policy.MaxFiles - 2
398 if maxRotatedFiles < 0 {
399 maxRotatedFiles = 0
400 }
401 i := 0
402 for ; i < len(logs)-maxRotatedFiles; i++ {
403 if err := c.osInterface.Remove(logs[i]); err != nil {
404 return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
405 }
406 }
407 logs = logs[i:]
408 return logs, nil
409 }
410
411
412 func (c *containerLogManager) compressLog(log string) error {
413 r, err := c.osInterface.Open(log)
414 if err != nil {
415 return fmt.Errorf("failed to open log %q: %v", log, err)
416 }
417 defer r.Close()
418 tmpLog := log + tmpSuffix
419 f, err := c.osInterface.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
420 if err != nil {
421 return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
422 }
423 defer func() {
424
425 c.osInterface.Remove(tmpLog)
426 }()
427 defer f.Close()
428 w := gzip.NewWriter(f)
429 defer w.Close()
430 if _, err := io.Copy(w, r); err != nil {
431 return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
432 }
433
434 w.Close()
435 f.Close()
436 compressedLog := log + compressSuffix
437 if err := c.osInterface.Rename(tmpLog, compressedLog); err != nil {
438 return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
439 }
440
441 r.Close()
442 if err := c.osInterface.Remove(log); err != nil {
443 return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
444 }
445 return nil
446 }
447
448
449
450 func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log string) error {
451 timestamp := c.clock.Now().Format(timestampFormat)
452 rotated := fmt.Sprintf("%s.%s", log, timestamp)
453 if err := c.osInterface.Rename(log, rotated); err != nil {
454 return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
455 }
456 if err := c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
457
458
459
460 if renameErr := c.osInterface.Rename(rotated, log); renameErr != nil {
461
462
463
464 klog.ErrorS(renameErr, "Failed to rename rotated log", "rotatedLog", rotated, "newLog", log, "containerID", id)
465 }
466 return fmt.Errorf("failed to reopen container log %q: %v", id, err)
467 }
468 return nil
469 }
470
View as plain text