...

Source file src/k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go

Documentation: k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"reflect"
    21  	"testing"
    22  	"time"
    23  
    24  	"k8s.io/apimachinery/pkg/util/sets"
    25  	"k8s.io/client-go/util/flowcontrol"
    26  	"k8s.io/klog/v2/ktesting"
    27  )
    28  
    29  func CheckQueueEq(lhs []string, rhs TimedQueue) bool {
    30  	for i := 0; i < len(lhs); i++ {
    31  		if rhs[i].Value != lhs[i] {
    32  			return false
    33  		}
    34  	}
    35  	return true
    36  }
    37  
    38  func CheckSetEq(lhs, rhs sets.String) bool {
    39  	return lhs.IsSuperset(rhs) && rhs.IsSuperset(lhs)
    40  }
    41  
    42  func TestAddNode(t *testing.T) {
    43  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
    44  	evictor.Add("first", "11111")
    45  	evictor.Add("second", "22222")
    46  	evictor.Add("third", "33333")
    47  
    48  	queuePattern := []string{"first", "second", "third"}
    49  	if len(evictor.queue.queue) != len(queuePattern) {
    50  		t.Fatalf("Queue %v should have length %d", evictor.queue.queue, len(queuePattern))
    51  	}
    52  	if !CheckQueueEq(queuePattern, evictor.queue.queue) {
    53  		t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
    54  	}
    55  
    56  	setPattern := sets.NewString("first", "second", "third")
    57  	if len(evictor.queue.set) != len(setPattern) {
    58  		t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
    59  	}
    60  	if !CheckSetEq(setPattern, evictor.queue.set) {
    61  		t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
    62  	}
    63  }
    64  
    65  func TestDelNode(t *testing.T) {
    66  	defer func() { now = time.Now }()
    67  	var tick int64
    68  	now = func() time.Time {
    69  		t := time.Unix(tick, 0)
    70  		tick++
    71  		return t
    72  	}
    73  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
    74  	evictor.Add("first", "11111")
    75  	evictor.Add("second", "22222")
    76  	evictor.Add("third", "33333")
    77  	evictor.Remove("first")
    78  
    79  	queuePattern := []string{"second", "third"}
    80  	if len(evictor.queue.queue) != len(queuePattern) {
    81  		t.Fatalf("Queue %v should have length %d", evictor.queue.queue, len(queuePattern))
    82  	}
    83  	if !CheckQueueEq(queuePattern, evictor.queue.queue) {
    84  		t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
    85  	}
    86  
    87  	setPattern := sets.NewString("second", "third")
    88  	if len(evictor.queue.set) != len(setPattern) {
    89  		t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
    90  	}
    91  	if !CheckSetEq(setPattern, evictor.queue.set) {
    92  		t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
    93  	}
    94  
    95  	evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
    96  	evictor.Add("first", "11111")
    97  	evictor.Add("second", "22222")
    98  	evictor.Add("third", "33333")
    99  	evictor.Remove("second")
   100  
   101  	queuePattern = []string{"first", "third"}
   102  	if len(evictor.queue.queue) != len(queuePattern) {
   103  		t.Fatalf("Queue %v should have length %d", evictor.queue.queue, len(queuePattern))
   104  	}
   105  	if !CheckQueueEq(queuePattern, evictor.queue.queue) {
   106  		t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
   107  	}
   108  
   109  	setPattern = sets.NewString("first", "third")
   110  	if len(evictor.queue.set) != len(setPattern) {
   111  		t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
   112  	}
   113  	if !CheckSetEq(setPattern, evictor.queue.set) {
   114  		t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
   115  	}
   116  
   117  	evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
   118  	evictor.Add("first", "11111")
   119  	evictor.Add("second", "22222")
   120  	evictor.Add("third", "33333")
   121  	evictor.Remove("third")
   122  
   123  	queuePattern = []string{"first", "second"}
   124  	if len(evictor.queue.queue) != len(queuePattern) {
   125  		t.Fatalf("Queue %v should have length %d", evictor.queue.queue, len(queuePattern))
   126  	}
   127  	if !CheckQueueEq(queuePattern, evictor.queue.queue) {
   128  		t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
   129  	}
   130  
   131  	setPattern = sets.NewString("first", "second")
   132  	if len(evictor.queue.set) != len(setPattern) {
   133  		t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
   134  	}
   135  	if !CheckSetEq(setPattern, evictor.queue.set) {
   136  		t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
   137  	}
   138  }
   139  
   140  func TestTry(t *testing.T) {
   141  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
   142  	evictor.Add("first", "11111")
   143  	evictor.Add("second", "22222")
   144  	evictor.Add("third", "33333")
   145  	evictor.Remove("second")
   146  
   147  	deletedMap := sets.NewString()
   148  	logger, _ := ktesting.NewTestContext(t)
   149  	evictor.Try(logger, func(value TimedValue) (bool, time.Duration) {
   150  		deletedMap.Insert(value.Value)
   151  		return true, 0
   152  	})
   153  
   154  	setPattern := sets.NewString("first", "third")
   155  	if len(deletedMap) != len(setPattern) {
   156  		t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
   157  	}
   158  	if !CheckSetEq(setPattern, deletedMap) {
   159  		t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern)
   160  	}
   161  }
   162  
   163  func TestTryOrdering(t *testing.T) {
   164  	defer func() { now = time.Now }()
   165  	current := time.Unix(0, 0)
   166  	delay := 0
   167  	// the current time is incremented by 1ms every time now is invoked
   168  	now = func() time.Time {
   169  		if delay > 0 {
   170  			delay--
   171  		} else {
   172  			current = current.Add(time.Millisecond)
   173  		}
   174  		t.Logf("time %d", current.UnixNano())
   175  		return current
   176  	}
   177  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
   178  	evictor.Add("first", "11111")
   179  	evictor.Add("second", "22222")
   180  	evictor.Add("third", "33333")
   181  
   182  	order := []string{}
   183  	count := 0
   184  	hasQueued := false
   185  	logger, _ := ktesting.NewTestContext(t)
   186  	evictor.Try(logger, func(value TimedValue) (bool, time.Duration) {
   187  		count++
   188  		t.Logf("eviction %d", count)
   189  		if value.ProcessAt.IsZero() {
   190  			t.Fatalf("processAt should not be zero")
   191  		}
   192  		switch value.Value {
   193  		case "first":
   194  			if !value.AddedAt.Equal(time.Unix(0, time.Millisecond.Nanoseconds())) {
   195  				t.Fatalf("added time for %s is %v", value.Value, value.AddedAt)
   196  			}
   197  
   198  		case "second":
   199  			if !value.AddedAt.Equal(time.Unix(0, 2*time.Millisecond.Nanoseconds())) {
   200  				t.Fatalf("added time for %s is %v", value.Value, value.AddedAt)
   201  			}
   202  			if hasQueued {
   203  				if !value.ProcessAt.Equal(time.Unix(0, 6*time.Millisecond.Nanoseconds())) {
   204  					t.Fatalf("process time for %s is %v", value.Value, value.ProcessAt)
   205  				}
   206  				break
   207  			}
   208  			hasQueued = true
   209  			delay = 1
   210  			t.Logf("going to delay")
   211  			return false, 2 * time.Millisecond
   212  
   213  		case "third":
   214  			if !value.AddedAt.Equal(time.Unix(0, 3*time.Millisecond.Nanoseconds())) {
   215  				t.Fatalf("added time for %s is %v", value.Value, value.AddedAt)
   216  			}
   217  		}
   218  		order = append(order, value.Value)
   219  		return true, 0
   220  	})
   221  	if !reflect.DeepEqual(order, []string{"first", "third"}) {
   222  		t.Fatalf("order was wrong: %v", order)
   223  	}
   224  	if count != 3 {
   225  		t.Fatalf("unexpected iterations: %d", count)
   226  	}
   227  }
   228  
   229  func TestTryRemovingWhileTry(t *testing.T) {
   230  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
   231  	evictor.Add("first", "11111")
   232  	evictor.Add("second", "22222")
   233  	evictor.Add("third", "33333")
   234  
   235  	processing := make(chan struct{})
   236  	wait := make(chan struct{})
   237  	order := []string{}
   238  	count := 0
   239  	queued := false
   240  
   241  	// while the Try function is processing "second", remove it from the queue
   242  	// we should not see "second" retried.
   243  	go func() {
   244  		<-processing
   245  		evictor.Remove("second")
   246  		close(wait)
   247  	}()
   248  	logger, _ := ktesting.NewTestContext(t)
   249  	evictor.Try(logger, func(value TimedValue) (bool, time.Duration) {
   250  		count++
   251  		if value.AddedAt.IsZero() {
   252  			t.Fatalf("added should not be zero")
   253  		}
   254  		if value.ProcessAt.IsZero() {
   255  			t.Fatalf("next should not be zero")
   256  		}
   257  		if !queued && value.Value == "second" {
   258  			queued = true
   259  			close(processing)
   260  			<-wait
   261  			return false, time.Millisecond
   262  		}
   263  		order = append(order, value.Value)
   264  		return true, 0
   265  	})
   266  
   267  	if !reflect.DeepEqual(order, []string{"first", "third"}) {
   268  		t.Fatalf("order was wrong: %v", order)
   269  	}
   270  	if count != 3 {
   271  		t.Fatalf("unexpected iterations: %d", count)
   272  	}
   273  }
   274  
   275  func TestClear(t *testing.T) {
   276  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
   277  	evictor.Add("first", "11111")
   278  	evictor.Add("second", "22222")
   279  	evictor.Add("third", "33333")
   280  
   281  	evictor.Clear()
   282  
   283  	if len(evictor.queue.queue) != 0 {
   284  		t.Fatalf("Clear should remove all elements from the queue.")
   285  	}
   286  }
   287  
   288  func TestSwapLimiter(t *testing.T) {
   289  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
   290  	fakeAlways := flowcontrol.NewFakeAlwaysRateLimiter()
   291  	qps := evictor.limiter.QPS()
   292  	if qps != fakeAlways.QPS() {
   293  		t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeAlways.QPS())
   294  	}
   295  
   296  	evictor.SwapLimiter(0)
   297  	qps = evictor.limiter.QPS()
   298  	fakeNever := flowcontrol.NewFakeNeverRateLimiter()
   299  	if qps != fakeNever.QPS() {
   300  		t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeNever.QPS())
   301  	}
   302  
   303  	createdQPS := float32(5.5)
   304  	evictor.SwapLimiter(createdQPS)
   305  	qps = evictor.limiter.QPS()
   306  	if qps != createdQPS {
   307  		t.Fatalf("QPS does not match create one: %v instead of %v", qps, createdQPS)
   308  	}
   309  }
   310  
   311  func TestAddAfterTry(t *testing.T) {
   312  	evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
   313  	evictor.Add("first", "11111")
   314  	evictor.Add("second", "22222")
   315  	evictor.Add("third", "33333")
   316  	evictor.Remove("second")
   317  
   318  	deletedMap := sets.NewString()
   319  	logger, _ := ktesting.NewTestContext(t)
   320  	evictor.Try(logger, func(value TimedValue) (bool, time.Duration) {
   321  		deletedMap.Insert(value.Value)
   322  		return true, 0
   323  	})
   324  
   325  	setPattern := sets.NewString("first", "third")
   326  	if len(deletedMap) != len(setPattern) {
   327  		t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
   328  	}
   329  	if !CheckSetEq(setPattern, deletedMap) {
   330  		t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern)
   331  	}
   332  
   333  	evictor.Add("first", "11111")
   334  	evictor.Try(logger, func(value TimedValue) (bool, time.Duration) {
   335  		t.Errorf("We shouldn't process the same value if the explicit remove wasn't called.")
   336  		return true, 0
   337  	})
   338  }
   339  

View as plain text