...

Source file src/k8s.io/kubernetes/pkg/kubelet/logs/container_log_manager_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/logs

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package logs
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"os"
    25  	"path/filepath"
    26  	"sync"
    27  	"testing"
    28  	"time"
    29  
    30  	"github.com/stretchr/testify/assert"
    31  	"github.com/stretchr/testify/require"
    32  	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	"k8s.io/client-go/util/workqueue"
    35  	"k8s.io/kubernetes/pkg/kubelet/container"
    36  
    37  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    38  	critest "k8s.io/cri-api/pkg/apis/testing"
    39  	testingclock "k8s.io/utils/clock/testing"
    40  )
    41  
    42  func TestGetAllLogs(t *testing.T) {
    43  	dir, err := os.MkdirTemp("", "test-get-all-logs")
    44  	require.NoError(t, err)
    45  	defer os.RemoveAll(dir)
    46  	testLogs := []string{
    47  		"test-log.11111111-111111.gz",
    48  		"test-log",
    49  		"test-log.00000000-000000.gz",
    50  		"test-log.19900322-000000.gz",
    51  		"test-log.19900322-111111.gz",
    52  		"test-log.19880620-000000", // unused log
    53  		"test-log.19880620-000000.gz",
    54  		"test-log.19880620-111111.gz",
    55  		"test-log.20180101-000000",
    56  		"test-log.20180101-000000.tmp", // temporary log
    57  	}
    58  	expectLogs := []string{
    59  		"test-log.00000000-000000.gz",
    60  		"test-log.11111111-111111.gz",
    61  		"test-log.19880620-000000.gz",
    62  		"test-log.19880620-111111.gz",
    63  		"test-log.19900322-000000.gz",
    64  		"test-log.19900322-111111.gz",
    65  		"test-log.20180101-000000",
    66  		"test-log",
    67  	}
    68  	for i := range testLogs {
    69  		f, err := os.Create(filepath.Join(dir, testLogs[i]))
    70  		require.NoError(t, err)
    71  		f.Close()
    72  	}
    73  	got, err := GetAllLogs(filepath.Join(dir, "test-log"))
    74  	assert.NoError(t, err)
    75  	for i := range expectLogs {
    76  		expectLogs[i] = filepath.Join(dir, expectLogs[i])
    77  	}
    78  	assert.Equal(t, expectLogs, got)
    79  }
    80  
    81  func TestRotateLogs(t *testing.T) {
    82  	ctx := context.Background()
    83  	dir, err := os.MkdirTemp("", "test-rotate-logs")
    84  	require.NoError(t, err)
    85  	defer os.RemoveAll(dir)
    86  
    87  	const (
    88  		testMaxFiles = 3
    89  		testMaxSize  = 10
    90  	)
    91  	now := time.Now()
    92  	f := critest.NewFakeRuntimeService()
    93  	c := &containerLogManager{
    94  		runtimeService: f,
    95  		policy: LogRotatePolicy{
    96  			MaxSize:  testMaxSize,
    97  			MaxFiles: testMaxFiles,
    98  		},
    99  		osInterface:      container.RealOS{},
   100  		clock:            testingclock.NewFakeClock(now),
   101  		mutex:            sync.Mutex{},
   102  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
   103  		maxWorkers:       10,
   104  		monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
   105  	}
   106  	testLogs := []string{
   107  		"test-log-1",
   108  		"test-log-2",
   109  		"test-log-3",
   110  		"test-log-4",
   111  		"test-log-3.00000000-000001",
   112  		"test-log-3.00000000-000000.gz",
   113  	}
   114  	testContent := []string{
   115  		"short",
   116  		"longer than 10 bytes",
   117  		"longer than 10 bytes",
   118  		"longer than 10 bytes",
   119  		"the length doesn't matter",
   120  		"the length doesn't matter",
   121  	}
   122  	for i := range testLogs {
   123  		f, err := os.Create(filepath.Join(dir, testLogs[i]))
   124  		require.NoError(t, err)
   125  		_, err = f.Write([]byte(testContent[i]))
   126  		require.NoError(t, err)
   127  		f.Close()
   128  	}
   129  	testContainers := []*critest.FakeContainer{
   130  		{
   131  			ContainerStatus: runtimeapi.ContainerStatus{
   132  				Id:      "container-not-need-rotate",
   133  				State:   runtimeapi.ContainerState_CONTAINER_RUNNING,
   134  				LogPath: filepath.Join(dir, testLogs[0]),
   135  			},
   136  		},
   137  		{
   138  			ContainerStatus: runtimeapi.ContainerStatus{
   139  				Id:      "container-need-rotate",
   140  				State:   runtimeapi.ContainerState_CONTAINER_RUNNING,
   141  				LogPath: filepath.Join(dir, testLogs[1]),
   142  			},
   143  		},
   144  		{
   145  			ContainerStatus: runtimeapi.ContainerStatus{
   146  				Id:      "container-has-excess-log",
   147  				State:   runtimeapi.ContainerState_CONTAINER_RUNNING,
   148  				LogPath: filepath.Join(dir, testLogs[2]),
   149  			},
   150  		},
   151  		{
   152  			ContainerStatus: runtimeapi.ContainerStatus{
   153  				Id:      "container-is-not-running",
   154  				State:   runtimeapi.ContainerState_CONTAINER_EXITED,
   155  				LogPath: filepath.Join(dir, testLogs[3]),
   156  			},
   157  		},
   158  	}
   159  	f.SetFakeContainers(testContainers)
   160  
   161  	// Push the items into the queue for before starting the worker to avoid issue with the queue being empty.
   162  	require.NoError(t, c.rotateLogs(ctx))
   163  
   164  	// Start a routine that can monitor the queue and shutdown the queue to trigger the retrun from the processQueueItems
   165  	// Keeping the monitor duration smaller in order to keep the unwanted delay in the test to a minimal.
   166  	go func() {
   167  		pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
   168  		defer cancel()
   169  		err = wait.PollUntilContextCancel(pollTimeoutCtx, 5*time.Millisecond, false, func(ctx context.Context) (done bool, err error) {
   170  			return c.queue.Len() == 0, nil
   171  		})
   172  		require.NoError(t, err)
   173  		c.queue.ShutDown()
   174  	}()
   175  	// This is a blocking call. But the above routine takes care of ensuring that this is terminated once the queue is shutdown
   176  	c.processQueueItems(ctx, 1)
   177  
   178  	timestamp := now.Format(timestampFormat)
   179  	logs, err := os.ReadDir(dir)
   180  	require.NoError(t, err)
   181  	assert.Len(t, logs, 5)
   182  	assert.Equal(t, testLogs[0], logs[0].Name())
   183  	assert.Equal(t, testLogs[1]+"."+timestamp, logs[1].Name())
   184  	assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name())
   185  	assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name())
   186  	assert.Equal(t, testLogs[3], logs[4].Name())
   187  }
   188  
   189  func TestClean(t *testing.T) {
   190  	ctx := context.Background()
   191  	dir, err := os.MkdirTemp("", "test-clean")
   192  	require.NoError(t, err)
   193  	defer os.RemoveAll(dir)
   194  
   195  	const (
   196  		testMaxFiles = 3
   197  		testMaxSize  = 10
   198  	)
   199  	now := time.Now()
   200  	f := critest.NewFakeRuntimeService()
   201  	c := &containerLogManager{
   202  		runtimeService: f,
   203  		policy: LogRotatePolicy{
   204  			MaxSize:  testMaxSize,
   205  			MaxFiles: testMaxFiles,
   206  		},
   207  		osInterface:      container.RealOS{},
   208  		clock:            testingclock.NewFakeClock(now),
   209  		mutex:            sync.Mutex{},
   210  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
   211  		maxWorkers:       10,
   212  		monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
   213  	}
   214  	testLogs := []string{
   215  		"test-log-1",
   216  		"test-log-2",
   217  		"test-log-3",
   218  		"test-log-2.00000000-000000.gz",
   219  		"test-log-2.00000000-000001",
   220  		"test-log-3.00000000-000000.gz",
   221  		"test-log-3.00000000-000001",
   222  	}
   223  	for i := range testLogs {
   224  		f, err := os.Create(filepath.Join(dir, testLogs[i]))
   225  		require.NoError(t, err)
   226  		f.Close()
   227  	}
   228  	testContainers := []*critest.FakeContainer{
   229  		{
   230  			ContainerStatus: runtimeapi.ContainerStatus{
   231  				Id:      "container-1",
   232  				State:   runtimeapi.ContainerState_CONTAINER_RUNNING,
   233  				LogPath: filepath.Join(dir, testLogs[0]),
   234  			},
   235  		},
   236  		{
   237  			ContainerStatus: runtimeapi.ContainerStatus{
   238  				Id:      "container-2",
   239  				State:   runtimeapi.ContainerState_CONTAINER_RUNNING,
   240  				LogPath: filepath.Join(dir, testLogs[1]),
   241  			},
   242  		},
   243  		{
   244  			ContainerStatus: runtimeapi.ContainerStatus{
   245  				Id:      "container-3",
   246  				State:   runtimeapi.ContainerState_CONTAINER_EXITED,
   247  				LogPath: filepath.Join(dir, testLogs[2]),
   248  			},
   249  		},
   250  	}
   251  	f.SetFakeContainers(testContainers)
   252  
   253  	err = c.Clean(ctx, "container-3")
   254  	require.NoError(t, err)
   255  
   256  	logs, err := os.ReadDir(dir)
   257  	require.NoError(t, err)
   258  	assert.Len(t, logs, 4)
   259  	assert.Equal(t, testLogs[0], logs[0].Name())
   260  	assert.Equal(t, testLogs[1], logs[1].Name())
   261  	assert.Equal(t, testLogs[3], logs[2].Name())
   262  	assert.Equal(t, testLogs[4], logs[3].Name())
   263  }
   264  
   265  func TestCleanupUnusedLog(t *testing.T) {
   266  	dir, err := os.MkdirTemp("", "test-cleanup-unused-log")
   267  	require.NoError(t, err)
   268  	defer os.RemoveAll(dir)
   269  
   270  	testLogs := []string{
   271  		"test-log-1",     // regular log
   272  		"test-log-1.tmp", // temporary log
   273  		"test-log-2",     // unused log
   274  		"test-log-2.gz",  // compressed log
   275  	}
   276  
   277  	for i := range testLogs {
   278  		testLogs[i] = filepath.Join(dir, testLogs[i])
   279  		f, err := os.Create(testLogs[i])
   280  		require.NoError(t, err)
   281  		f.Close()
   282  	}
   283  
   284  	c := &containerLogManager{
   285  		osInterface: container.RealOS{},
   286  	}
   287  	got, err := c.cleanupUnusedLogs(testLogs)
   288  	require.NoError(t, err)
   289  	assert.Len(t, got, 2)
   290  	assert.Equal(t, []string{testLogs[0], testLogs[3]}, got)
   291  
   292  	logs, err := os.ReadDir(dir)
   293  	require.NoError(t, err)
   294  	assert.Len(t, logs, 2)
   295  	assert.Equal(t, testLogs[0], filepath.Join(dir, logs[0].Name()))
   296  	assert.Equal(t, testLogs[3], filepath.Join(dir, logs[1].Name()))
   297  }
   298  
   299  func TestRemoveExcessLog(t *testing.T) {
   300  	for desc, test := range map[string]struct {
   301  		max    int
   302  		expect []string
   303  	}{
   304  		"MaxFiles equal to 2": {
   305  			max:    2,
   306  			expect: []string{},
   307  		},
   308  		"MaxFiles more than 2": {
   309  			max:    3,
   310  			expect: []string{"test-log-4"},
   311  		},
   312  		"MaxFiles more than log file number": {
   313  			max:    6,
   314  			expect: []string{"test-log-1", "test-log-2", "test-log-3", "test-log-4"},
   315  		},
   316  	} {
   317  		t.Logf("TestCase %q", desc)
   318  		dir, err := os.MkdirTemp("", "test-remove-excess-log")
   319  		require.NoError(t, err)
   320  		defer os.RemoveAll(dir)
   321  
   322  		testLogs := []string{"test-log-3", "test-log-1", "test-log-2", "test-log-4"}
   323  
   324  		for i := range testLogs {
   325  			testLogs[i] = filepath.Join(dir, testLogs[i])
   326  			f, err := os.Create(testLogs[i])
   327  			require.NoError(t, err)
   328  			f.Close()
   329  		}
   330  
   331  		c := &containerLogManager{
   332  			policy:      LogRotatePolicy{MaxFiles: test.max},
   333  			osInterface: container.RealOS{},
   334  		}
   335  		got, err := c.removeExcessLogs(testLogs)
   336  		require.NoError(t, err)
   337  		require.Len(t, got, len(test.expect))
   338  		for i, name := range test.expect {
   339  			assert.Equal(t, name, filepath.Base(got[i]))
   340  		}
   341  
   342  		logs, err := os.ReadDir(dir)
   343  		require.NoError(t, err)
   344  		require.Len(t, logs, len(test.expect))
   345  		for i, name := range test.expect {
   346  			assert.Equal(t, name, logs[i].Name())
   347  		}
   348  	}
   349  }
   350  
   351  func TestCompressLog(t *testing.T) {
   352  	dir, err := os.MkdirTemp("", "test-compress-log")
   353  	require.NoError(t, err)
   354  	defer os.RemoveAll(dir)
   355  
   356  	testFile, err := os.CreateTemp(dir, "test-rotate-latest-log")
   357  	require.NoError(t, err)
   358  	defer testFile.Close()
   359  	testContent := "test log content"
   360  	_, err = testFile.Write([]byte(testContent))
   361  	require.NoError(t, err)
   362  	testFile.Close()
   363  
   364  	testLog := testFile.Name()
   365  	c := &containerLogManager{osInterface: container.RealOS{}}
   366  	require.NoError(t, c.compressLog(testLog))
   367  	_, err = os.Stat(testLog + compressSuffix)
   368  	assert.NoError(t, err, "log should be compressed")
   369  	_, err = os.Stat(testLog + tmpSuffix)
   370  	assert.Error(t, err, "temporary log should be renamed")
   371  	_, err = os.Stat(testLog)
   372  	assert.Error(t, err, "original log should be removed")
   373  
   374  	rc, err := UncompressLog(testLog + compressSuffix)
   375  	require.NoError(t, err)
   376  	defer rc.Close()
   377  	var buf bytes.Buffer
   378  	_, err = io.Copy(&buf, rc)
   379  	require.NoError(t, err)
   380  	assert.Equal(t, testContent, buf.String())
   381  }
   382  
   383  func TestRotateLatestLog(t *testing.T) {
   384  	ctx := context.Background()
   385  	dir, err := os.MkdirTemp("", "test-rotate-latest-log")
   386  	require.NoError(t, err)
   387  	defer os.RemoveAll(dir)
   388  
   389  	for desc, test := range map[string]struct {
   390  		runtimeError   error
   391  		maxFiles       int
   392  		expectError    bool
   393  		expectOriginal bool
   394  		expectRotated  bool
   395  	}{
   396  		"should successfully rotate log when MaxFiles is 2": {
   397  			maxFiles:       2,
   398  			expectError:    false,
   399  			expectOriginal: false,
   400  			expectRotated:  true,
   401  		},
   402  		"should restore original log when ReopenContainerLog fails": {
   403  			runtimeError:   fmt.Errorf("random error"),
   404  			maxFiles:       2,
   405  			expectError:    true,
   406  			expectOriginal: true,
   407  			expectRotated:  false,
   408  		},
   409  	} {
   410  		t.Logf("TestCase %q", desc)
   411  		now := time.Now()
   412  		f := critest.NewFakeRuntimeService()
   413  		c := &containerLogManager{
   414  			runtimeService:   f,
   415  			policy:           LogRotatePolicy{MaxFiles: test.maxFiles},
   416  			osInterface:      container.RealOS{},
   417  			clock:            testingclock.NewFakeClock(now),
   418  			mutex:            sync.Mutex{},
   419  			queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
   420  			maxWorkers:       10,
   421  			monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
   422  		}
   423  		if test.runtimeError != nil {
   424  			f.InjectError("ReopenContainerLog", test.runtimeError)
   425  		}
   426  		testFile, err := os.CreateTemp(dir, "test-rotate-latest-log")
   427  		require.NoError(t, err)
   428  		testFile.Close()
   429  		defer testFile.Close()
   430  		testLog := testFile.Name()
   431  		rotatedLog := fmt.Sprintf("%s.%s", testLog, now.Format(timestampFormat))
   432  		err = c.rotateLatestLog(ctx, "test-id", testLog)
   433  		assert.Equal(t, test.expectError, err != nil)
   434  		_, err = os.Stat(testLog)
   435  		assert.Equal(t, test.expectOriginal, err == nil)
   436  		_, err = os.Stat(rotatedLog)
   437  		assert.Equal(t, test.expectRotated, err == nil)
   438  		assert.NoError(t, f.AssertCalls([]string{"ReopenContainerLog"}))
   439  	}
   440  }
   441  

View as plain text