...

Source file src/k8s.io/kubernetes/pkg/kubelet/logs/container_log_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/logs

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package logs
    18  
    19  import (
    20  	"compress/gzip"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"os"
    25  	"path/filepath"
    26  	"sort"
    27  	"strings"
    28  	"sync"
    29  
    30  	"k8s.io/client-go/util/workqueue"
    31  	"k8s.io/klog/v2"
    32  
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  	internalapi "k8s.io/cri-api/pkg/apis"
    37  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    38  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    39  	"k8s.io/utils/clock"
    40  )
    41  
    42  const (
    43  	// timestampFormat is format of the timestamp suffix for rotated log.
    44  	// See https://golang.org/pkg/time/#Time.Format.
    45  	timestampFormat = "20060102-150405"
    46  	// compressSuffix is the suffix for compressed log.
    47  	compressSuffix = ".gz"
    48  	// tmpSuffix is the suffix for temporary file.
    49  	tmpSuffix = ".tmp"
    50  )
    51  
    52  // ContainerLogManager manages lifecycle of all container logs.
    53  //
    54  // Implementation is thread-safe.
    55  type ContainerLogManager interface {
    56  	// TODO(random-liu): Add RotateLogs function and call it under disk pressure.
    57  	// Start container log manager.
    58  	Start()
    59  	// Clean removes all logs of specified container.
    60  	Clean(ctx context.Context, containerID string) error
    61  }
    62  
    63  // LogRotatePolicy is a policy for container log rotation. The policy applies to all
    64  // containers managed by kubelet.
    65  type LogRotatePolicy struct {
    66  	// MaxSize in bytes of the container log file before it is rotated. Negative
    67  	// number means to disable container log rotation.
    68  	MaxSize int64
    69  	// MaxFiles is the maximum number of log files that can be present.
    70  	// If rotating the logs creates excess files, the oldest file is removed.
    71  	MaxFiles int
    72  }
    73  
    74  // GetAllLogs gets all inuse (rotated/compressed) logs for a specific container log.
    75  // Returned logs are sorted in oldest to newest order.
    76  // TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
    77  func GetAllLogs(log string) ([]string, error) {
    78  	// pattern is used to match all rotated files.
    79  	pattern := fmt.Sprintf("%s.*", log)
    80  	logs, err := filepath.Glob(pattern)
    81  	if err != nil {
    82  		return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
    83  	}
    84  	inuse, _ := filterUnusedLogs(logs)
    85  	sort.Strings(inuse)
    86  	return append(inuse, log), nil
    87  }
    88  
    89  // compressReadCloser wraps gzip.Reader with a function to close file handler.
    90  type compressReadCloser struct {
    91  	f *os.File
    92  	*gzip.Reader
    93  }
    94  
    95  func (rc *compressReadCloser) Close() error {
    96  	ferr := rc.f.Close()
    97  	rerr := rc.Reader.Close()
    98  	if ferr != nil {
    99  		return ferr
   100  	}
   101  	if rerr != nil {
   102  		return rerr
   103  	}
   104  	return nil
   105  }
   106  
   107  // UncompressLog compresses a compressed log and return a readcloser for the
   108  // stream of the uncompressed content.
   109  // TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
   110  func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
   111  	if !strings.HasSuffix(log, compressSuffix) {
   112  		return nil, fmt.Errorf("log is not compressed")
   113  	}
   114  	f, err := os.Open(log)
   115  	if err != nil {
   116  		return nil, fmt.Errorf("failed to open log: %v", err)
   117  	}
   118  	defer func() {
   119  		if retErr != nil {
   120  			f.Close()
   121  		}
   122  	}()
   123  	r, err := gzip.NewReader(f)
   124  	if err != nil {
   125  		return nil, fmt.Errorf("failed to create gzip reader: %v", err)
   126  	}
   127  	return &compressReadCloser{f: f, Reader: r}, nil
   128  }
   129  
   130  // parseMaxSize parses quantity string to int64 max size in bytes.
   131  func parseMaxSize(size string) (int64, error) {
   132  	quantity, err := resource.ParseQuantity(size)
   133  	if err != nil {
   134  		return 0, err
   135  	}
   136  	maxSize, ok := quantity.AsInt64()
   137  	if !ok {
   138  		return 0, fmt.Errorf("invalid max log size")
   139  	}
   140  	return maxSize, nil
   141  }
   142  
   143  type containerLogManager struct {
   144  	runtimeService   internalapi.RuntimeService
   145  	osInterface      kubecontainer.OSInterface
   146  	policy           LogRotatePolicy
   147  	clock            clock.Clock
   148  	mutex            sync.Mutex
   149  	queue            workqueue.RateLimitingInterface
   150  	maxWorkers       int
   151  	monitoringPeriod metav1.Duration
   152  }
   153  
   154  // NewContainerLogManager creates a new container log manager.
   155  func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int, maxWorkers int, monitorInterval metav1.Duration) (ContainerLogManager, error) {
   156  	if maxFiles <= 1 {
   157  		return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
   158  	}
   159  	parsedMaxSize, err := parseMaxSize(maxSize)
   160  	if err != nil {
   161  		return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
   162  	}
   163  	// Negative number means to disable container log rotation
   164  	if parsedMaxSize < 0 {
   165  		return NewStubContainerLogManager(), nil
   166  	}
   167  	// policy LogRotatePolicy
   168  	return &containerLogManager{
   169  		osInterface:    osInterface,
   170  		runtimeService: runtimeService,
   171  		policy: LogRotatePolicy{
   172  			MaxSize:  parsedMaxSize,
   173  			MaxFiles: maxFiles,
   174  		},
   175  		clock:            clock.RealClock{},
   176  		mutex:            sync.Mutex{},
   177  		maxWorkers:       maxWorkers,
   178  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
   179  		monitoringPeriod: monitorInterval,
   180  	}, nil
   181  }
   182  
   183  // Start the container log manager.
   184  func (c *containerLogManager) Start() {
   185  	ctx := context.Background()
   186  	klog.InfoS("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod)
   187  	for i := 0; i < c.maxWorkers; i++ {
   188  		worker := i + 1
   189  		go c.processQueueItems(ctx, worker)
   190  	}
   191  	// Start a goroutine periodically does container log rotation.
   192  	go wait.Forever(func() {
   193  		if err := c.rotateLogs(ctx); err != nil {
   194  			klog.ErrorS(err, "Failed to rotate container logs")
   195  		}
   196  	}, c.monitoringPeriod.Duration)
   197  }
   198  
   199  // Clean removes all logs of specified container (including rotated one).
   200  func (c *containerLogManager) Clean(ctx context.Context, containerID string) error {
   201  	c.mutex.Lock()
   202  	defer c.mutex.Unlock()
   203  	resp, err := c.runtimeService.ContainerStatus(ctx, containerID, false)
   204  	if err != nil {
   205  		return fmt.Errorf("failed to get container status %q: %v", containerID, err)
   206  	}
   207  	if resp.GetStatus() == nil {
   208  		return fmt.Errorf("container status is nil for %q", containerID)
   209  	}
   210  	pattern := fmt.Sprintf("%s*", resp.GetStatus().GetLogPath())
   211  	logs, err := c.osInterface.Glob(pattern)
   212  	if err != nil {
   213  		return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
   214  	}
   215  
   216  	for _, l := range logs {
   217  		if err := c.osInterface.Remove(l); err != nil && !os.IsNotExist(err) {
   218  			return fmt.Errorf("failed to remove container %q log %q: %v", containerID, l, err)
   219  		}
   220  	}
   221  
   222  	return nil
   223  }
   224  
   225  func (c *containerLogManager) processQueueItems(ctx context.Context, worker int) {
   226  	klog.V(4).InfoS("Starting container log rotation worker", "workerID", worker)
   227  	for c.processContainer(ctx, worker) {
   228  	}
   229  	klog.V(4).InfoS("Terminating container log rotation worker", "workerID", worker)
   230  }
   231  
   232  func (c *containerLogManager) rotateLogs(ctx context.Context) error {
   233  	c.mutex.Lock()
   234  	defer c.mutex.Unlock()
   235  	klog.V(4).InfoS("Starting container log rotation sequence")
   236  	// TODO(#59998): Use kubelet pod cache.
   237  	containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
   238  	if err != nil {
   239  		return fmt.Errorf("failed to list containers: %v", err)
   240  	}
   241  	for _, container := range containers {
   242  		// Only rotate logs for running containers. Non-running containers won't
   243  		// generate new output, it doesn't make sense to keep an empty latest log.
   244  		if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
   245  			continue
   246  		}
   247  		// Doing this to avoid additional overhead with logging of label like arguments that can prove costly
   248  		if v := klog.V(4); v.Enabled() {
   249  			klog.V(4).InfoS("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels())
   250  		}
   251  		c.queue.Add(container.GetId())
   252  	}
   253  	return nil
   254  }
   255  
   256  func (c *containerLogManager) processContainer(ctx context.Context, worker int) (ok bool) {
   257  	key, quit := c.queue.Get()
   258  	if quit {
   259  		return false
   260  	}
   261  	defer func() {
   262  		c.queue.Done(key)
   263  		c.queue.Forget(key)
   264  	}()
   265  	// Always default the return to true to keep the processing of Queue ongoing
   266  	ok = true
   267  	id := key.(string)
   268  
   269  	resp, err := c.runtimeService.ContainerStatus(ctx, id, false)
   270  	if err != nil {
   271  		klog.ErrorS(err, "Failed to get container status", "worker", worker, "containerID", id)
   272  		return
   273  	}
   274  	if resp.GetStatus() == nil {
   275  		klog.ErrorS(err, "Container status is nil", "worker", worker, "containerID", id)
   276  		return
   277  	}
   278  	path := resp.GetStatus().GetLogPath()
   279  	info, err := c.osInterface.Stat(path)
   280  
   281  	if err != nil {
   282  		if !os.IsNotExist(err) {
   283  			klog.ErrorS(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path)
   284  			return
   285  		}
   286  
   287  		if err = c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
   288  			klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path)
   289  			return
   290  		}
   291  
   292  		info, err = c.osInterface.Stat(path)
   293  		if err != nil {
   294  			klog.ErrorS(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path)
   295  			return
   296  		}
   297  	}
   298  	if info.Size() < c.policy.MaxSize {
   299  		klog.V(7).InfoS("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
   300  		return
   301  	}
   302  
   303  	if err := c.rotateLog(ctx, id, path); err != nil {
   304  		klog.ErrorS(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
   305  		return
   306  	}
   307  	return
   308  }
   309  
   310  func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) error {
   311  	// pattern is used to match all rotated files.
   312  	pattern := fmt.Sprintf("%s.*", log)
   313  	logs, err := filepath.Glob(pattern)
   314  	if err != nil {
   315  		return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
   316  	}
   317  
   318  	logs, err = c.cleanupUnusedLogs(logs)
   319  	if err != nil {
   320  		return fmt.Errorf("failed to cleanup logs: %v", err)
   321  	}
   322  
   323  	logs, err = c.removeExcessLogs(logs)
   324  	if err != nil {
   325  		return fmt.Errorf("failed to remove excess logs: %v", err)
   326  	}
   327  
   328  	// Compress uncompressed log files.
   329  	for _, l := range logs {
   330  		if strings.HasSuffix(l, compressSuffix) {
   331  			continue
   332  		}
   333  		if err := c.compressLog(l); err != nil {
   334  			return fmt.Errorf("failed to compress log %q: %v", l, err)
   335  		}
   336  	}
   337  
   338  	if err := c.rotateLatestLog(ctx, id, log); err != nil {
   339  		return fmt.Errorf("failed to rotate log %q: %v", log, err)
   340  	}
   341  
   342  	return nil
   343  }
   344  
   345  // cleanupUnusedLogs cleans up temporary or unused log files generated by previous log rotation
   346  // failure.
   347  func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
   348  	inuse, unused := filterUnusedLogs(logs)
   349  	for _, l := range unused {
   350  		if err := c.osInterface.Remove(l); err != nil {
   351  			return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
   352  		}
   353  	}
   354  	return inuse, nil
   355  }
   356  
   357  // filterUnusedLogs splits logs into 2 groups, the 1st group is in used logs,
   358  // the second group is unused logs.
   359  func filterUnusedLogs(logs []string) (inuse []string, unused []string) {
   360  	for _, l := range logs {
   361  		if isInUse(l, logs) {
   362  			inuse = append(inuse, l)
   363  		} else {
   364  			unused = append(unused, l)
   365  		}
   366  	}
   367  	return inuse, unused
   368  }
   369  
   370  // isInUse checks whether a container log file is still inuse.
   371  func isInUse(l string, logs []string) bool {
   372  	// All temporary files are not in use.
   373  	if strings.HasSuffix(l, tmpSuffix) {
   374  		return false
   375  	}
   376  	// All compressed logs are in use.
   377  	if strings.HasSuffix(l, compressSuffix) {
   378  		return true
   379  	}
   380  	// Files has already been compressed are not in use.
   381  	for _, another := range logs {
   382  		if l+compressSuffix == another {
   383  			return false
   384  		}
   385  	}
   386  	return true
   387  }
   388  
   389  // removeExcessLogs removes old logs to make sure there are only at most MaxFiles log files.
   390  func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) {
   391  	// Sort log files in oldest to newest order.
   392  	sort.Strings(logs)
   393  	// Container will create a new log file, and we'll rotate the latest log file.
   394  	// Other than those 2 files, we can have at most MaxFiles-2 rotated log files.
   395  	// Keep MaxFiles-2 files by removing old files.
   396  	// We should remove from oldest to newest, so as not to break ongoing `kubectl logs`.
   397  	maxRotatedFiles := c.policy.MaxFiles - 2
   398  	if maxRotatedFiles < 0 {
   399  		maxRotatedFiles = 0
   400  	}
   401  	i := 0
   402  	for ; i < len(logs)-maxRotatedFiles; i++ {
   403  		if err := c.osInterface.Remove(logs[i]); err != nil {
   404  			return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
   405  		}
   406  	}
   407  	logs = logs[i:]
   408  	return logs, nil
   409  }
   410  
   411  // compressLog compresses a log to log.gz with gzip.
   412  func (c *containerLogManager) compressLog(log string) error {
   413  	r, err := c.osInterface.Open(log)
   414  	if err != nil {
   415  		return fmt.Errorf("failed to open log %q: %v", log, err)
   416  	}
   417  	defer r.Close()
   418  	tmpLog := log + tmpSuffix
   419  	f, err := c.osInterface.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
   420  	if err != nil {
   421  		return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
   422  	}
   423  	defer func() {
   424  		// Best effort cleanup of tmpLog.
   425  		c.osInterface.Remove(tmpLog)
   426  	}()
   427  	defer f.Close()
   428  	w := gzip.NewWriter(f)
   429  	defer w.Close()
   430  	if _, err := io.Copy(w, r); err != nil {
   431  		return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
   432  	}
   433  	// The archive needs to be closed before renaming, otherwise an error will occur on Windows.
   434  	w.Close()
   435  	f.Close()
   436  	compressedLog := log + compressSuffix
   437  	if err := c.osInterface.Rename(tmpLog, compressedLog); err != nil {
   438  		return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
   439  	}
   440  	// Remove old log file.
   441  	r.Close()
   442  	if err := c.osInterface.Remove(log); err != nil {
   443  		return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
   444  	}
   445  	return nil
   446  }
   447  
   448  // rotateLatestLog rotates latest log without compression, so that container can still write
   449  // and fluentd can finish reading.
   450  func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log string) error {
   451  	timestamp := c.clock.Now().Format(timestampFormat)
   452  	rotated := fmt.Sprintf("%s.%s", log, timestamp)
   453  	if err := c.osInterface.Rename(log, rotated); err != nil {
   454  		return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
   455  	}
   456  	if err := c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
   457  		// Rename the rotated log back, so that we can try rotating it again
   458  		// next round.
   459  		// If kubelet gets restarted at this point, we'll lose original log.
   460  		if renameErr := c.osInterface.Rename(rotated, log); renameErr != nil {
   461  			// This shouldn't happen.
   462  			// Report an error if this happens, because we will lose original
   463  			// log.
   464  			klog.ErrorS(renameErr, "Failed to rename rotated log", "rotatedLog", rotated, "newLog", log, "containerID", id)
   465  		}
   466  		return fmt.Errorf("failed to reopen container log %q: %v", id, err)
   467  	}
   468  	return nil
   469  }
   470  

View as plain text