...

Source file src/k8s.io/kubernetes/pkg/util/goroutinemap/goroutinemap_test.go

Documentation: k8s.io/kubernetes/pkg/util/goroutinemap

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package goroutinemap
    18  
    19  import (
    20  	"fmt"
    21  	"testing"
    22  	"time"
    23  
    24  	"k8s.io/apimachinery/pkg/util/wait"
    25  )
    26  
    27  const (
    28  	// testTimeout is a timeout of goroutines to finish. This _should_ be just a
    29  	// "context switch" and it should take several ms, however, Clayton says "We
    30  	// have had flakes due to tests that assumed that 15s is long enough to sleep")
    31  	testTimeout time.Duration = 1 * time.Minute
    32  
    33  	// initialOperationWaitTimeShort is the initial amount of time the test will
    34  	// wait for an operation to complete (each successive failure results in
    35  	// exponential backoff).
    36  	initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
    37  
    38  	// initialOperationWaitTimeLong is the initial amount of time the test will
    39  	// wait for an operation to complete (each successive failure results in
    40  	// exponential backoff).
    41  	initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
    42  )
    43  
    44  func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
    45  	// Arrange
    46  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
    47  	operationName := "operation-name"
    48  	operation := func() error { return nil }
    49  
    50  	// Act
    51  	err := grm.Run(operationName, operation)
    52  
    53  	// Assert
    54  	if err != nil {
    55  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
    56  	}
    57  }
    58  
    59  func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
    60  	// Arrange
    61  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
    62  	operation1Name := "operation1-name"
    63  	operation2Name := "operation2-name"
    64  	operation := func() error { return nil }
    65  
    66  	// Act
    67  	err1 := grm.Run(operation1Name, operation)
    68  	err2 := grm.Run(operation2Name, operation)
    69  
    70  	// Assert
    71  	if err1 != nil {
    72  		t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1Name, err1)
    73  	}
    74  
    75  	if err2 != nil {
    76  		t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2Name, err2)
    77  	}
    78  }
    79  
    80  func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
    81  	// Arrange
    82  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
    83  	operationName := "operation-name"
    84  	operation := func() error { return nil }
    85  
    86  	// Act
    87  	err := grm.Run(operationName, operation)
    88  
    89  	// Assert
    90  	if err != nil {
    91  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
    92  	}
    93  }
    94  
    95  func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
    96  	// Arrange
    97  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
    98  	operationName := "operation-name"
    99  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   100  	operation1 := generateCallbackFunc(operation1DoneCh)
   101  	err1 := grm.Run(operationName, operation1)
   102  	if err1 != nil {
   103  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   104  	}
   105  	operation2 := generateNoopFunc()
   106  	<-operation1DoneCh // Force operation1 to complete
   107  
   108  	// Act
   109  	err2 := retryWithExponentialBackOff(
   110  		time.Duration(initialOperationWaitTimeShort),
   111  		func() (bool, error) {
   112  			err := grm.Run(operationName, operation2)
   113  			if err != nil {
   114  				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
   115  				return false, nil
   116  			}
   117  			return true, nil
   118  		},
   119  	)
   120  
   121  	// Assert
   122  	if err2 != nil {
   123  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
   124  	}
   125  }
   126  
   127  func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
   128  	// Arrange
   129  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
   130  	operationName := "operation-name"
   131  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   132  	operation1 := generateCallbackFunc(operation1DoneCh)
   133  	err1 := grm.Run(operationName, operation1)
   134  	if err1 != nil {
   135  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   136  	}
   137  	operation2 := generateNoopFunc()
   138  	<-operation1DoneCh // Force operation1 to complete
   139  
   140  	// Act
   141  	err2 := retryWithExponentialBackOff(
   142  		time.Duration(initialOperationWaitTimeShort),
   143  		func() (bool, error) {
   144  			err := grm.Run(operationName, operation2)
   145  			if err != nil {
   146  				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
   147  				return false, nil
   148  			}
   149  			return true, nil
   150  		},
   151  	)
   152  
   153  	// Assert
   154  	if err2 != nil {
   155  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
   156  	}
   157  }
   158  
   159  func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
   160  	// Arrange
   161  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
   162  	operationName := "operation-name"
   163  	operation1 := generatePanicFunc()
   164  	err1 := grm.Run(operationName, operation1)
   165  	if err1 != nil {
   166  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   167  	}
   168  	operation2 := generateNoopFunc()
   169  
   170  	// Act
   171  	err2 := retryWithExponentialBackOff(
   172  		time.Duration(initialOperationWaitTimeShort),
   173  		func() (bool, error) {
   174  			err := grm.Run(operationName, operation2)
   175  			if err != nil {
   176  				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
   177  				return false, nil
   178  			}
   179  			return true, nil
   180  		},
   181  	)
   182  
   183  	// Assert
   184  	if err2 != nil {
   185  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
   186  	}
   187  }
   188  
   189  func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
   190  	// Arrange
   191  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
   192  	operationName := "operation-name"
   193  	operation1 := generatePanicFunc()
   194  	err1 := grm.Run(operationName, operation1)
   195  	if err1 != nil {
   196  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   197  	}
   198  	operation2 := generateNoopFunc()
   199  
   200  	// Act
   201  	err2 := retryWithExponentialBackOff(
   202  		time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
   203  		func() (bool, error) {
   204  			err := grm.Run(operationName, operation2)
   205  			if err != nil {
   206  				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
   207  				return false, nil
   208  			}
   209  			return true, nil
   210  		},
   211  	)
   212  
   213  	// Assert
   214  	if err2 != nil {
   215  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
   216  	}
   217  }
   218  
   219  func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
   220  	// Arrange
   221  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
   222  	operationName := "operation-name"
   223  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   224  	operation1 := generateWaitFunc(operation1DoneCh)
   225  	err1 := grm.Run(operationName, operation1)
   226  	if err1 != nil {
   227  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   228  	}
   229  	operation2 := generateNoopFunc()
   230  
   231  	// Act
   232  	err2 := grm.Run(operationName, operation2)
   233  
   234  	// Assert
   235  	if err2 == nil {
   236  		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
   237  	}
   238  	if !IsAlreadyExists(err2) {
   239  		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
   240  	}
   241  }
   242  
   243  func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
   244  	// Arrange
   245  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
   246  	operationName := "operation-name"
   247  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   248  	operation1 := generateWaitFunc(operation1DoneCh)
   249  	err1 := grm.Run(operationName, operation1)
   250  	if err1 != nil {
   251  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   252  	}
   253  	operation2 := generateNoopFunc()
   254  
   255  	// Act
   256  	err2 := grm.Run(operationName, operation2)
   257  
   258  	// Assert
   259  	if err2 == nil {
   260  		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
   261  	}
   262  	if !IsAlreadyExists(err2) {
   263  		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
   264  	}
   265  }
   266  
   267  func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
   268  	// Arrange
   269  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
   270  	operationName := "operation-name"
   271  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   272  	operation1 := generateWaitFunc(operation1DoneCh)
   273  	err1 := grm.Run(operationName, operation1)
   274  	if err1 != nil {
   275  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   276  	}
   277  	operation2 := generateNoopFunc()
   278  	operation3 := generateNoopFunc()
   279  
   280  	// Act
   281  	err2 := grm.Run(operationName, operation2)
   282  
   283  	// Assert
   284  	if err2 == nil {
   285  		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
   286  	}
   287  	if !IsAlreadyExists(err2) {
   288  		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
   289  	}
   290  
   291  	// Act
   292  	operation1DoneCh <- true // Force operation1 to complete
   293  	err3 := retryWithExponentialBackOff(
   294  		time.Duration(initialOperationWaitTimeShort),
   295  		func() (bool, error) {
   296  			err := grm.Run(operationName, operation3)
   297  			if err != nil {
   298  				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
   299  				return false, nil
   300  			}
   301  			return true, nil
   302  		},
   303  	)
   304  
   305  	// Assert
   306  	if err3 != nil {
   307  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
   308  	}
   309  }
   310  
   311  func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
   312  	// Arrange
   313  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
   314  	operationName := "operation-name"
   315  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   316  	operation1 := generateWaitFunc(operation1DoneCh)
   317  	err1 := grm.Run(operationName, operation1)
   318  	if err1 != nil {
   319  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
   320  	}
   321  	operation2 := generateNoopFunc()
   322  	operation3 := generateNoopFunc()
   323  
   324  	// Act
   325  	err2 := grm.Run(operationName, operation2)
   326  
   327  	// Assert
   328  	if err2 == nil {
   329  		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
   330  	}
   331  	if !IsAlreadyExists(err2) {
   332  		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
   333  	}
   334  
   335  	// Act
   336  	operation1DoneCh <- true // Force operation1 to complete
   337  	err3 := retryWithExponentialBackOff(
   338  		time.Duration(initialOperationWaitTimeShort),
   339  		func() (bool, error) {
   340  			err := grm.Run(operationName, operation3)
   341  			if err != nil {
   342  				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
   343  				return false, nil
   344  			}
   345  			return true, nil
   346  		},
   347  	)
   348  
   349  	// Assert
   350  	if err3 != nil {
   351  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
   352  	}
   353  }
   354  
   355  func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
   356  	// Test than Wait() on empty GoRoutineMap always succeeds without blocking
   357  	// Arrange
   358  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
   359  
   360  	// Act
   361  	waitDoneCh := make(chan interface{}, 1)
   362  	go func() {
   363  		grm.Wait()
   364  		waitDoneCh <- true
   365  	}()
   366  
   367  	// Assert
   368  	err := waitChannelWithTimeout(waitDoneCh, testTimeout)
   369  	if err != nil {
   370  		t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
   371  	}
   372  }
   373  
   374  func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
   375  	// Test than Wait() on empty GoRoutineMap always succeeds without blocking
   376  	// Arrange
   377  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
   378  
   379  	// Act
   380  	waitDoneCh := make(chan interface{}, 1)
   381  	go func() {
   382  		grm.Wait()
   383  		waitDoneCh <- true
   384  	}()
   385  
   386  	// Assert
   387  	err := waitChannelWithTimeout(waitDoneCh, testTimeout)
   388  	if err != nil {
   389  		t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
   390  	}
   391  }
   392  
   393  func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
   394  	// Test that Wait() really blocks until the last operation succeeds
   395  	// Arrange
   396  	grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
   397  	operationName := "operation-name"
   398  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   399  	operation1 := generateWaitFunc(operation1DoneCh)
   400  	err := grm.Run(operationName, operation1)
   401  	if err != nil {
   402  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
   403  	}
   404  
   405  	// Act
   406  	waitDoneCh := make(chan interface{}, 1)
   407  	go func() {
   408  		grm.Wait()
   409  		waitDoneCh <- true
   410  	}()
   411  
   412  	// Finish the operation
   413  	operation1DoneCh <- true
   414  
   415  	// Assert
   416  	err = waitChannelWithTimeout(waitDoneCh, testTimeout)
   417  	if err != nil {
   418  		t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
   419  	}
   420  }
   421  
   422  func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
   423  	// Test that Wait() really blocks until the last operation succeeds
   424  	// Arrange
   425  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
   426  	operationName := "operation-name"
   427  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   428  	operation1 := generateWaitFunc(operation1DoneCh)
   429  	err := grm.Run(operationName, operation1)
   430  	if err != nil {
   431  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
   432  	}
   433  
   434  	// Act
   435  	waitDoneCh := make(chan interface{}, 1)
   436  	go func() {
   437  		grm.Wait()
   438  		waitDoneCh <- true
   439  	}()
   440  
   441  	// Finish the operation
   442  	operation1DoneCh <- true
   443  
   444  	// Assert
   445  	err = waitChannelWithTimeout(waitDoneCh, testTimeout)
   446  	if err != nil {
   447  		t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
   448  	}
   449  }
   450  
   451  func Test_NewGoRoutineMap_WaitForCompletionWithExpBackoff(t *testing.T) {
   452  	grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
   453  	operationName := "operation-err"
   454  
   455  	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
   456  	operation1 := generateErrorFunc(operation1DoneCh)
   457  	err := grm.Run(operationName, operation1)
   458  	if err != nil {
   459  		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
   460  	}
   461  
   462  	// Act
   463  	waitDoneCh := make(chan interface{}, 1)
   464  	go func() {
   465  		grm.WaitForCompletion()
   466  		waitDoneCh <- true
   467  	}()
   468  
   469  	// Finish the operation
   470  	operation1DoneCh <- true
   471  
   472  	// Assert that WaitForCompletion returns even if scheduled op had error
   473  	err = waitChannelWithTimeout(waitDoneCh, testTimeout)
   474  	if err != nil {
   475  		t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
   476  	}
   477  }
   478  
   479  func generateCallbackFunc(done chan<- interface{}) func() error {
   480  	return func() error {
   481  		done <- true
   482  		return nil
   483  	}
   484  }
   485  
   486  func generateErrorFunc(done <-chan interface{}) func() error {
   487  	return func() error {
   488  		<-done
   489  		return fmt.Errorf("Generic error")
   490  	}
   491  }
   492  
   493  func generateWaitFunc(done <-chan interface{}) func() error {
   494  	return func() error {
   495  		<-done
   496  		return nil
   497  	}
   498  }
   499  
   500  func generatePanicFunc() func() error {
   501  	return func() error {
   502  		panic("testing panic")
   503  	}
   504  }
   505  
   506  func generateNoopFunc() func() error {
   507  	return func() error { return nil }
   508  }
   509  
   510  func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
   511  	backoff := wait.Backoff{
   512  		Duration: initialDuration,
   513  		Factor:   3,
   514  		Jitter:   0,
   515  		Steps:    4,
   516  	}
   517  	return wait.ExponentialBackoff(backoff, fn)
   518  }
   519  
   520  func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
   521  	timer := time.NewTimer(timeout)
   522  	defer timer.Stop()
   523  
   524  	select {
   525  	case <-ch:
   526  		// Success!
   527  		return nil
   528  	case <-timer.C:
   529  		return fmt.Errorf("timeout after %v", timeout)
   530  	}
   531  }
   532  

View as plain text