...

Source file src/k8s.io/kubernetes/test/e2e/chaosmonkey/chaosmonkey.go

Documentation: k8s.io/kubernetes/test/e2e/chaosmonkey

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package chaosmonkey
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	"github.com/onsi/ginkgo/v2"
    24  )
    25  
    26  // Disruption is the type to construct a Chaosmonkey with; see Do for more information.
    27  type Disruption func(ctx context.Context)
    28  
    29  // Test is the type to register with a Chaosmonkey.  A test will run asynchronously across the
    30  // Chaosmonkey's Disruption.  A Test takes a Semaphore as an argument.  It should call sem.Ready()
    31  // once it's ready for the disruption to start and should then wait until sem.StopCh (which is a
    32  // <-chan struct{}) is closed, which signals that the disruption is over.  It should then clean up
    33  // and return.  See Do and Semaphore for more information.
    34  type Test func(ctx context.Context, sem *Semaphore)
    35  
    36  // Interface can be implemented if you prefer to define tests without dealing with a Semaphore.  You
    37  // may define a struct that implements Interface's three methods (Setup, Test, and Teardown) and
    38  // RegisterInterface.  See RegisterInterface for more information.
    39  type Interface interface {
    40  	Setup()
    41  	Test(stopCh <-chan struct{})
    42  	Teardown()
    43  }
    44  
    45  // Chaosmonkey is the type that holds the necessary content for chaosmonkey test
    46  type Chaosmonkey struct {
    47  	disruption Disruption
    48  	tests      []Test
    49  }
    50  
    51  // New creates and returns a Chaosmonkey, with which the caller should register Tests and call Do.
    52  // See Do for more information.
    53  func New(disruption Disruption) *Chaosmonkey {
    54  	return &Chaosmonkey{
    55  		disruption,
    56  		[]Test{},
    57  	}
    58  }
    59  
    60  // Register registers the given Test with the Chaosmonkey, so that the test will run over the
    61  // Disruption.
    62  func (cm *Chaosmonkey) Register(test Test) {
    63  	cm.tests = append(cm.tests, test)
    64  }
    65  
    66  // RegisterInterface registers the given Interface with the Chaosmonkey, so the Chaosmonkey will
    67  // call Setup, Test, and Teardown properly.  Test can tell that the Disruption is finished when
    68  // stopCh is closed.
    69  func (cm *Chaosmonkey) RegisterInterface(in Interface) {
    70  	cm.Register(func(ctx context.Context, sem *Semaphore) {
    71  		in.Setup()
    72  		sem.Ready()
    73  		in.Test(sem.StopCh)
    74  		in.Teardown()
    75  	})
    76  }
    77  
    78  // Do performs the Disruption while testing the registered Tests.  Once the caller has registered
    79  // all Tests with the Chaosmonkey, they call Do.  Do starts each registered test asynchronously and
    80  // waits for each test to signal that it is ready by calling sem.Ready().  Do will then do the
    81  // Disruption, and when it's complete, close sem.StopCh to signal to the registered Tests that the
    82  // Disruption is over, and wait for all Tests to return.
    83  func (cm *Chaosmonkey) Do(ctx context.Context) {
    84  	sems := []*Semaphore{}
    85  	// All semaphores have the same StopCh.
    86  	stopCh := make(chan struct{})
    87  
    88  	for _, test := range cm.tests {
    89  		test := test
    90  		sem := newSemaphore(stopCh)
    91  		sems = append(sems, sem)
    92  		go func() {
    93  			defer ginkgo.GinkgoRecover()
    94  			defer sem.done()
    95  			test(ctx, sem)
    96  		}()
    97  	}
    98  
    99  	fmt.Println("Waiting for all async tests to be ready")
   100  	for _, sem := range sems {
   101  		// Wait for test to be ready.  We have to wait for ready *or done* because a test
   102  		// may panic before signaling that its ready, and we shouldn't block.  Since we
   103  		// deferred sem.done() above, if a test panics, it's marked as done.
   104  		sem.waitForReadyOrDone()
   105  	}
   106  
   107  	defer func() {
   108  		close(stopCh)
   109  		fmt.Println("Waiting for async validations to complete")
   110  		for _, sem := range sems {
   111  			sem.waitForDone()
   112  		}
   113  	}()
   114  
   115  	fmt.Println("Starting disruption")
   116  	cm.disruption(ctx)
   117  	fmt.Println("Disruption complete; stopping async validations")
   118  }
   119  
   120  // Semaphore is taken by a Test and provides: Ready(), for the Test to call when it's ready for the
   121  // disruption to start; and StopCh, the closure of which signals to the Test that the disruption is
   122  // finished.
   123  type Semaphore struct {
   124  	readyCh chan struct{}
   125  	StopCh  <-chan struct{}
   126  	doneCh  chan struct{}
   127  }
   128  
   129  func newSemaphore(stopCh <-chan struct{}) *Semaphore {
   130  	// We don't want to block on Ready() or done()
   131  	return &Semaphore{
   132  		make(chan struct{}, 1),
   133  		stopCh,
   134  		make(chan struct{}, 1),
   135  	}
   136  }
   137  
   138  // Ready is called by the Test to signal that the Test is ready for the disruption to start.
   139  func (sem *Semaphore) Ready() {
   140  	close(sem.readyCh)
   141  }
   142  
   143  // done is an internal method for Go to defer, both to wait for all tests to return, but also to
   144  // sense if a test panicked before calling Ready.  See waitForReadyOrDone.
   145  func (sem *Semaphore) done() {
   146  	close(sem.doneCh)
   147  }
   148  
   149  // We would like to just check if all tests are ready, but if they fail (which Ginkgo implements as
   150  // a panic), they may not have called Ready().  We check done as well to see if the function has
   151  // already returned; if it has, we don't care if it's ready, and just continue.
   152  func (sem *Semaphore) waitForReadyOrDone() {
   153  	select {
   154  	case <-sem.readyCh:
   155  	case <-sem.doneCh:
   156  	}
   157  }
   158  
   159  // waitForDone is an internal method for Go to wait on all Tests returning.
   160  func (sem *Semaphore) waitForDone() {
   161  	<-sem.doneCh
   162  }
   163  

View as plain text