...

Source file src/k8s.io/client-go/util/workqueue/delaying_queue_test.go

Documentation: k8s.io/client-go/util/workqueue

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package workqueue
    18  
    19  import (
    20  	"fmt"
    21  	"math/rand"
    22  	"reflect"
    23  	"testing"
    24  	"time"
    25  
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	testingclock "k8s.io/utils/clock/testing"
    28  )
    29  
    30  func TestSimpleQueue(t *testing.T) {
    31  	fakeClock := testingclock.NewFakeClock(time.Now())
    32  	q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
    33  
    34  	first := "foo"
    35  
    36  	q.AddAfter(first, 50*time.Millisecond)
    37  	if err := waitForWaitingQueueToFill(q); err != nil {
    38  		t.Fatalf("unexpected err: %v", err)
    39  	}
    40  
    41  	if q.Len() != 0 {
    42  		t.Errorf("should not have added")
    43  	}
    44  
    45  	fakeClock.Step(60 * time.Millisecond)
    46  
    47  	if err := waitForAdded(q, 1); err != nil {
    48  		t.Errorf("should have added")
    49  	}
    50  	item, _ := q.Get()
    51  	q.Done(item)
    52  
    53  	// step past the next heartbeat
    54  	fakeClock.Step(10 * time.Second)
    55  
    56  	err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
    57  		if q.Len() > 0 {
    58  			return false, fmt.Errorf("added to queue")
    59  		}
    60  
    61  		return false, nil
    62  	})
    63  	if err != wait.ErrWaitTimeout {
    64  		t.Errorf("expected timeout, got: %v", err)
    65  	}
    66  
    67  	if q.Len() != 0 {
    68  		t.Errorf("should not have added")
    69  	}
    70  }
    71  
    72  func TestDeduping(t *testing.T) {
    73  	fakeClock := testingclock.NewFakeClock(time.Now())
    74  	q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
    75  
    76  	first := "foo"
    77  
    78  	q.AddAfter(first, 50*time.Millisecond)
    79  	if err := waitForWaitingQueueToFill(q); err != nil {
    80  		t.Fatalf("unexpected err: %v", err)
    81  	}
    82  	q.AddAfter(first, 70*time.Millisecond)
    83  	if err := waitForWaitingQueueToFill(q); err != nil {
    84  		t.Fatalf("unexpected err: %v", err)
    85  	}
    86  	if q.Len() != 0 {
    87  		t.Errorf("should not have added")
    88  	}
    89  
    90  	// step past the first block, we should receive now
    91  	fakeClock.Step(60 * time.Millisecond)
    92  	if err := waitForAdded(q, 1); err != nil {
    93  		t.Errorf("should have added")
    94  	}
    95  	item, _ := q.Get()
    96  	q.Done(item)
    97  
    98  	// step past the second add
    99  	fakeClock.Step(20 * time.Millisecond)
   100  	if q.Len() != 0 {
   101  		t.Errorf("should not have added")
   102  	}
   103  
   104  	// test again, but this time the earlier should override
   105  	q.AddAfter(first, 50*time.Millisecond)
   106  	q.AddAfter(first, 30*time.Millisecond)
   107  	if err := waitForWaitingQueueToFill(q); err != nil {
   108  		t.Fatalf("unexpected err: %v", err)
   109  	}
   110  	if q.Len() != 0 {
   111  		t.Errorf("should not have added")
   112  	}
   113  
   114  	fakeClock.Step(40 * time.Millisecond)
   115  	if err := waitForAdded(q, 1); err != nil {
   116  		t.Errorf("should have added")
   117  	}
   118  	item, _ = q.Get()
   119  	q.Done(item)
   120  
   121  	// step past the second add
   122  	fakeClock.Step(20 * time.Millisecond)
   123  	if q.Len() != 0 {
   124  		t.Errorf("should not have added")
   125  	}
   126  }
   127  
   128  func TestAddTwoFireEarly(t *testing.T) {
   129  	fakeClock := testingclock.NewFakeClock(time.Now())
   130  	q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
   131  
   132  	first := "foo"
   133  	second := "bar"
   134  	third := "baz"
   135  
   136  	q.AddAfter(first, 1*time.Second)
   137  	q.AddAfter(second, 50*time.Millisecond)
   138  	if err := waitForWaitingQueueToFill(q); err != nil {
   139  		t.Fatalf("unexpected err: %v", err)
   140  	}
   141  
   142  	if q.Len() != 0 {
   143  		t.Errorf("should not have added")
   144  	}
   145  
   146  	fakeClock.Step(60 * time.Millisecond)
   147  
   148  	if err := waitForAdded(q, 1); err != nil {
   149  		t.Fatalf("unexpected err: %v", err)
   150  	}
   151  	item, _ := q.Get()
   152  	if !reflect.DeepEqual(item, second) {
   153  		t.Errorf("expected %v, got %v", second, item)
   154  	}
   155  
   156  	q.AddAfter(third, 2*time.Second)
   157  
   158  	fakeClock.Step(1 * time.Second)
   159  	if err := waitForAdded(q, 1); err != nil {
   160  		t.Fatalf("unexpected err: %v", err)
   161  	}
   162  	item, _ = q.Get()
   163  	if !reflect.DeepEqual(item, first) {
   164  		t.Errorf("expected %v, got %v", first, item)
   165  	}
   166  
   167  	fakeClock.Step(2 * time.Second)
   168  	if err := waitForAdded(q, 1); err != nil {
   169  		t.Fatalf("unexpected err: %v", err)
   170  	}
   171  	item, _ = q.Get()
   172  	if !reflect.DeepEqual(item, third) {
   173  		t.Errorf("expected %v, got %v", third, item)
   174  	}
   175  }
   176  
   177  func TestCopyShifting(t *testing.T) {
   178  	fakeClock := testingclock.NewFakeClock(time.Now())
   179  	q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
   180  
   181  	first := "foo"
   182  	second := "bar"
   183  	third := "baz"
   184  
   185  	q.AddAfter(first, 1*time.Second)
   186  	q.AddAfter(second, 500*time.Millisecond)
   187  	q.AddAfter(third, 250*time.Millisecond)
   188  	if err := waitForWaitingQueueToFill(q); err != nil {
   189  		t.Fatalf("unexpected err: %v", err)
   190  	}
   191  
   192  	if q.Len() != 0 {
   193  		t.Errorf("should not have added")
   194  	}
   195  
   196  	fakeClock.Step(2 * time.Second)
   197  
   198  	if err := waitForAdded(q, 3); err != nil {
   199  		t.Fatalf("unexpected err: %v", err)
   200  	}
   201  	actualFirst, _ := q.Get()
   202  	if !reflect.DeepEqual(actualFirst, third) {
   203  		t.Errorf("expected %v, got %v", third, actualFirst)
   204  	}
   205  	actualSecond, _ := q.Get()
   206  	if !reflect.DeepEqual(actualSecond, second) {
   207  		t.Errorf("expected %v, got %v", second, actualSecond)
   208  	}
   209  	actualThird, _ := q.Get()
   210  	if !reflect.DeepEqual(actualThird, first) {
   211  		t.Errorf("expected %v, got %v", first, actualThird)
   212  	}
   213  }
   214  
   215  func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
   216  	fakeClock := testingclock.NewFakeClock(time.Now())
   217  	q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
   218  
   219  	// Add items
   220  	for n := 0; n < b.N; n++ {
   221  		data := fmt.Sprintf("%d", n)
   222  		q.AddAfter(data, time.Duration(rand.Int63n(int64(10*time.Minute))))
   223  	}
   224  
   225  	// Exercise item removal as well
   226  	fakeClock.Step(11 * time.Minute)
   227  	for n := 0; n < b.N; n++ {
   228  		_, _ = q.Get()
   229  	}
   230  }
   231  
   232  func waitForAdded(q DelayingInterface, depth int) error {
   233  	return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
   234  		if q.Len() == depth {
   235  			return true, nil
   236  		}
   237  
   238  		return false, nil
   239  	})
   240  }
   241  
   242  func waitForWaitingQueueToFill(q DelayingInterface) error {
   243  	return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
   244  		if len(q.(*delayingType).waitingForAddCh) == 0 {
   245  			return true, nil
   246  		}
   247  
   248  		return false, nil
   249  	})
   250  }
   251  

View as plain text