...

Source file src/k8s.io/client-go/util/workqueue/queue_test.go

Documentation: k8s.io/client-go/util/workqueue

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package workqueue_test
    18  
    19  import (
    20  	"runtime"
    21  	"sync"
    22  	"sync/atomic"
    23  	"testing"
    24  	"time"
    25  
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	"k8s.io/client-go/util/workqueue"
    28  )
    29  
    30  func TestBasic(t *testing.T) {
    31  	tests := []struct {
    32  		queue         *workqueue.Type
    33  		queueShutDown func(workqueue.Interface)
    34  	}{
    35  		{
    36  			queue:         workqueue.New(),
    37  			queueShutDown: workqueue.Interface.ShutDown,
    38  		},
    39  		{
    40  			queue:         workqueue.New(),
    41  			queueShutDown: workqueue.Interface.ShutDownWithDrain,
    42  		},
    43  	}
    44  	for _, test := range tests {
    45  		// If something is seriously wrong this test will never complete.
    46  
    47  		// Start producers
    48  		const producers = 50
    49  		producerWG := sync.WaitGroup{}
    50  		producerWG.Add(producers)
    51  		for i := 0; i < producers; i++ {
    52  			go func(i int) {
    53  				defer producerWG.Done()
    54  				for j := 0; j < 50; j++ {
    55  					test.queue.Add(i)
    56  					time.Sleep(time.Millisecond)
    57  				}
    58  			}(i)
    59  		}
    60  
    61  		// Start consumers
    62  		const consumers = 10
    63  		consumerWG := sync.WaitGroup{}
    64  		consumerWG.Add(consumers)
    65  		for i := 0; i < consumers; i++ {
    66  			go func(i int) {
    67  				defer consumerWG.Done()
    68  				for {
    69  					item, quit := test.queue.Get()
    70  					if item == "added after shutdown!" {
    71  						t.Errorf("Got an item added after shutdown.")
    72  					}
    73  					if quit {
    74  						return
    75  					}
    76  					t.Logf("Worker %v: begin processing %v", i, item)
    77  					time.Sleep(3 * time.Millisecond)
    78  					t.Logf("Worker %v: done processing %v", i, item)
    79  					test.queue.Done(item)
    80  				}
    81  			}(i)
    82  		}
    83  
    84  		producerWG.Wait()
    85  		test.queueShutDown(test.queue)
    86  		test.queue.Add("added after shutdown!")
    87  		consumerWG.Wait()
    88  		if test.queue.Len() != 0 {
    89  			t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
    90  		}
    91  	}
    92  }
    93  
    94  func TestAddWhileProcessing(t *testing.T) {
    95  	tests := []struct {
    96  		queue         *workqueue.Type
    97  		queueShutDown func(workqueue.Interface)
    98  	}{
    99  		{
   100  			queue:         workqueue.New(),
   101  			queueShutDown: workqueue.Interface.ShutDown,
   102  		},
   103  		{
   104  			queue:         workqueue.New(),
   105  			queueShutDown: workqueue.Interface.ShutDownWithDrain,
   106  		},
   107  	}
   108  	for _, test := range tests {
   109  
   110  		// Start producers
   111  		const producers = 50
   112  		producerWG := sync.WaitGroup{}
   113  		producerWG.Add(producers)
   114  		for i := 0; i < producers; i++ {
   115  			go func(i int) {
   116  				defer producerWG.Done()
   117  				test.queue.Add(i)
   118  			}(i)
   119  		}
   120  
   121  		// Start consumers
   122  		const consumers = 10
   123  		consumerWG := sync.WaitGroup{}
   124  		consumerWG.Add(consumers)
   125  		for i := 0; i < consumers; i++ {
   126  			go func(i int) {
   127  				defer consumerWG.Done()
   128  				// Every worker will re-add every item up to two times.
   129  				// This tests the dirty-while-processing case.
   130  				counters := map[interface{}]int{}
   131  				for {
   132  					item, quit := test.queue.Get()
   133  					if quit {
   134  						return
   135  					}
   136  					counters[item]++
   137  					if counters[item] < 2 {
   138  						test.queue.Add(item)
   139  					}
   140  					test.queue.Done(item)
   141  				}
   142  			}(i)
   143  		}
   144  
   145  		producerWG.Wait()
   146  		test.queueShutDown(test.queue)
   147  		consumerWG.Wait()
   148  		if test.queue.Len() != 0 {
   149  			t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
   150  		}
   151  	}
   152  }
   153  
   154  func TestLen(t *testing.T) {
   155  	q := workqueue.New()
   156  	q.Add("foo")
   157  	if e, a := 1, q.Len(); e != a {
   158  		t.Errorf("Expected %v, got %v", e, a)
   159  	}
   160  	q.Add("bar")
   161  	if e, a := 2, q.Len(); e != a {
   162  		t.Errorf("Expected %v, got %v", e, a)
   163  	}
   164  	q.Add("foo") // should not increase the queue length.
   165  	if e, a := 2, q.Len(); e != a {
   166  		t.Errorf("Expected %v, got %v", e, a)
   167  	}
   168  }
   169  
   170  func TestReinsert(t *testing.T) {
   171  	q := workqueue.New()
   172  	q.Add("foo")
   173  
   174  	// Start processing
   175  	i, _ := q.Get()
   176  	if i != "foo" {
   177  		t.Errorf("Expected %v, got %v", "foo", i)
   178  	}
   179  
   180  	// Add it back while processing
   181  	q.Add(i)
   182  
   183  	// Finish it up
   184  	q.Done(i)
   185  
   186  	// It should be back on the queue
   187  	i, _ = q.Get()
   188  	if i != "foo" {
   189  		t.Errorf("Expected %v, got %v", "foo", i)
   190  	}
   191  
   192  	// Finish that one up
   193  	q.Done(i)
   194  
   195  	if a := q.Len(); a != 0 {
   196  		t.Errorf("Expected queue to be empty. Has %v items", a)
   197  	}
   198  }
   199  
   200  func TestCollapse(t *testing.T) {
   201  	q := workqueue.New()
   202  	// Add a new one twice
   203  	q.Add("bar")
   204  	q.Add("bar")
   205  
   206  	// It should get the new one
   207  	i, _ := q.Get()
   208  	if i != "bar" {
   209  		t.Errorf("Expected %v, got %v", "bar", i)
   210  	}
   211  
   212  	// Finish that one up
   213  	q.Done(i)
   214  
   215  	// There should be no more objects in the queue
   216  	if a := q.Len(); a != 0 {
   217  		t.Errorf("Expected queue to be empty. Has %v items", a)
   218  	}
   219  }
   220  
   221  func TestCollapseWhileProcessing(t *testing.T) {
   222  	q := workqueue.New()
   223  	q.Add("foo")
   224  
   225  	// Start processing
   226  	i, _ := q.Get()
   227  	if i != "foo" {
   228  		t.Errorf("Expected %v, got %v", "foo", i)
   229  	}
   230  
   231  	// Add the same one twice
   232  	q.Add("foo")
   233  	q.Add("foo")
   234  
   235  	waitCh := make(chan struct{})
   236  	// simulate another worker consuming the queue
   237  	go func() {
   238  		defer close(waitCh)
   239  		i, _ := q.Get()
   240  		if i != "foo" {
   241  			t.Errorf("Expected %v, got %v", "foo", i)
   242  		}
   243  		// Finish that one up
   244  		q.Done(i)
   245  	}()
   246  
   247  	// give the worker some head start to avoid races
   248  	// on the select statement that cause flakiness
   249  	time.Sleep(100 * time.Millisecond)
   250  	// Finish the first one to unblock the other worker
   251  	select {
   252  	case <-waitCh:
   253  		t.Errorf("worker should be blocked until we are done")
   254  	default:
   255  		q.Done("foo")
   256  	}
   257  
   258  	// wait for the worker to consume the new object
   259  	// There should be no more objects in the queue
   260  	<-waitCh
   261  	if a := q.Len(); a != 0 {
   262  		t.Errorf("Expected queue to be empty. Has %v items", a)
   263  	}
   264  }
   265  
   266  func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
   267  
   268  	q := workqueue.New()
   269  
   270  	q.Add("foo")
   271  	q.Add("bar")
   272  
   273  	firstItem, _ := q.Get()
   274  	secondItem, _ := q.Get()
   275  
   276  	finishedWG := sync.WaitGroup{}
   277  	finishedWG.Add(1)
   278  	go func() {
   279  		defer finishedWG.Done()
   280  		q.ShutDownWithDrain()
   281  	}()
   282  
   283  	// This is done as to simulate a sequence of events where ShutDownWithDrain
   284  	// is called before we start marking all items as done - thus simulating a
   285  	// drain where we wait for all items to finish processing.
   286  	shuttingDown := false
   287  	for !shuttingDown {
   288  		_, shuttingDown = q.Get()
   289  	}
   290  
   291  	// Mark the first two items as done, as to finish up
   292  	q.Done(firstItem)
   293  	q.Done(secondItem)
   294  
   295  	finishedWG.Wait()
   296  }
   297  
   298  func TestNoQueueDrainageUsingShutDown(t *testing.T) {
   299  
   300  	q := workqueue.New()
   301  
   302  	q.Add("foo")
   303  	q.Add("bar")
   304  
   305  	q.Get()
   306  	q.Get()
   307  
   308  	finishedWG := sync.WaitGroup{}
   309  	finishedWG.Add(1)
   310  	go func() {
   311  		defer finishedWG.Done()
   312  		// Invoke ShutDown: suspending the execution immediately.
   313  		q.ShutDown()
   314  	}()
   315  
   316  	// We can now do this and not have the test timeout because we didn't call
   317  	// Done on the first two items before arriving here.
   318  	finishedWG.Wait()
   319  }
   320  
   321  func TestForceQueueShutdownUsingShutDown(t *testing.T) {
   322  
   323  	q := workqueue.New()
   324  
   325  	q.Add("foo")
   326  	q.Add("bar")
   327  
   328  	q.Get()
   329  	q.Get()
   330  
   331  	finishedWG := sync.WaitGroup{}
   332  	finishedWG.Add(1)
   333  	go func() {
   334  		defer finishedWG.Done()
   335  		q.ShutDownWithDrain()
   336  	}()
   337  
   338  	// This is done as to simulate a sequence of events where ShutDownWithDrain
   339  	// is called before ShutDown
   340  	shuttingDown := false
   341  	for !shuttingDown {
   342  		_, shuttingDown = q.Get()
   343  	}
   344  
   345  	// Use ShutDown to force the queue to shut down (simulating a caller
   346  	// which can invoke this function on a second SIGTERM/SIGINT)
   347  	q.ShutDown()
   348  
   349  	// We can now do this and not have the test timeout because we didn't call
   350  	// done on any of the items before arriving here.
   351  	finishedWG.Wait()
   352  }
   353  
   354  func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) {
   355  	q := workqueue.New()
   356  
   357  	q.Add("foo")
   358  	gotten, _ := q.Get()
   359  	q.Add("foo")
   360  
   361  	finishedWG := sync.WaitGroup{}
   362  	finishedWG.Add(1)
   363  	go func() {
   364  		defer finishedWG.Done()
   365  		q.ShutDownWithDrain()
   366  	}()
   367  
   368  	// Ensure that ShutDownWithDrain has started and is blocked.
   369  	shuttingDown := false
   370  	for !shuttingDown {
   371  		_, shuttingDown = q.Get()
   372  	}
   373  
   374  	// Finish "working".
   375  	q.Done(gotten)
   376  
   377  	// `shuttingDown` becomes false because Done caused an item to go back into
   378  	// the queue.
   379  	again, shuttingDown := q.Get()
   380  	if shuttingDown {
   381  		t.Fatalf("should not have been done")
   382  	}
   383  	q.Done(again)
   384  
   385  	// Now we are really done.
   386  	_, shuttingDown = q.Get()
   387  	if !shuttingDown {
   388  		t.Fatalf("should have been done")
   389  	}
   390  
   391  	finishedWG.Wait()
   392  }
   393  
   394  // TestGarbageCollection ensures that objects that are added then removed from the queue are
   395  // able to be garbage collected.
   396  func TestGarbageCollection(t *testing.T) {
   397  	type bigObject struct {
   398  		data []byte
   399  	}
   400  	leakQueue := workqueue.New()
   401  	t.Cleanup(func() {
   402  		// Make sure leakQueue doesn't go out of scope too early
   403  		runtime.KeepAlive(leakQueue)
   404  	})
   405  	c := &bigObject{data: []byte("hello")}
   406  	mustGarbageCollect(t, c)
   407  	leakQueue.Add(c)
   408  	o, _ := leakQueue.Get()
   409  	leakQueue.Done(o)
   410  }
   411  
   412  // mustGarbageCollect asserts than an object was garbage collected by the end of the test.
   413  // The input must be a pointer to an object.
   414  func mustGarbageCollect(t *testing.T, i interface{}) {
   415  	t.Helper()
   416  	var collected int32 = 0
   417  	runtime.SetFinalizer(i, func(x interface{}) {
   418  		atomic.StoreInt32(&collected, 1)
   419  	})
   420  	t.Cleanup(func() {
   421  		if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
   422  			// Trigger GC explicitly, otherwise we may need to wait a long time for it to run
   423  			runtime.GC()
   424  			return atomic.LoadInt32(&collected) == 1, nil
   425  		}); err != nil {
   426  			t.Errorf("object was not garbage collected")
   427  		}
   428  	})
   429  }
   430  

View as plain text