...

Source file src/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group_test.go

Documentation: sigs.k8s.io/controller-runtime/pkg/manager

     1  package manager
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"sync/atomic"
     8  	"time"
     9  
    10  	. "github.com/onsi/ginkgo/v2"
    11  	. "github.com/onsi/gomega"
    12  	"k8s.io/utils/ptr"
    13  
    14  	"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
    15  	"sigs.k8s.io/controller-runtime/pkg/webhook"
    16  )
    17  
    18  var _ = Describe("runnables", func() {
    19  	errCh := make(chan error)
    20  
    21  	It("should be able to create a new runnables object", func() {
    22  		Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil())
    23  	})
    24  
    25  	It("should add HTTP servers to the appropriate group", func() {
    26  		server := &Server{}
    27  		r := newRunnables(defaultBaseContext, errCh)
    28  		Expect(r.Add(server)).To(Succeed())
    29  		Expect(r.HTTPServers.startQueue).To(HaveLen(1))
    30  	})
    31  
    32  	It("should add caches to the appropriate group", func() {
    33  		cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}
    34  		r := newRunnables(defaultBaseContext, errCh)
    35  		Expect(r.Add(cache)).To(Succeed())
    36  		Expect(r.Caches.startQueue).To(HaveLen(1))
    37  	})
    38  
    39  	It("should add webhooks to the appropriate group", func() {
    40  		webhook := webhook.NewServer(webhook.Options{})
    41  		r := newRunnables(defaultBaseContext, errCh)
    42  		Expect(r.Add(webhook)).To(Succeed())
    43  		Expect(r.Webhooks.startQueue).To(HaveLen(1))
    44  	})
    45  
    46  	It("should add any runnable to the leader election group", func() {
    47  		err := errors.New("runnable func")
    48  		runnable := RunnableFunc(func(c context.Context) error {
    49  			return err
    50  		})
    51  
    52  		r := newRunnables(defaultBaseContext, errCh)
    53  		Expect(r.Add(runnable)).To(Succeed())
    54  		Expect(r.LeaderElection.startQueue).To(HaveLen(1))
    55  	})
    56  })
    57  
    58  var _ = Describe("runnableGroup", func() {
    59  	errCh := make(chan error)
    60  
    61  	It("should be able to add new runnables before it starts", func() {
    62  		ctx, cancel := context.WithCancel(context.Background())
    63  		defer cancel()
    64  		rg := newRunnableGroup(defaultBaseContext, errCh)
    65  		Expect(rg.Add(RunnableFunc(func(c context.Context) error {
    66  			<-ctx.Done()
    67  			return nil
    68  		}), nil)).To(Succeed())
    69  
    70  		Expect(rg.Started()).To(BeFalse())
    71  	})
    72  
    73  	It("should be able to add new runnables before and after start", func() {
    74  		ctx, cancel := context.WithCancel(context.Background())
    75  		defer cancel()
    76  		rg := newRunnableGroup(defaultBaseContext, errCh)
    77  		Expect(rg.Add(RunnableFunc(func(c context.Context) error {
    78  			<-ctx.Done()
    79  			return nil
    80  		}), nil)).To(Succeed())
    81  		Expect(rg.Start(ctx)).To(Succeed())
    82  		Expect(rg.Started()).To(BeTrue())
    83  		Expect(rg.Add(RunnableFunc(func(c context.Context) error {
    84  			<-ctx.Done()
    85  			return nil
    86  		}), nil)).To(Succeed())
    87  	})
    88  
    89  	It("should be able to add new runnables before and after start concurrently", func() {
    90  		ctx, cancel := context.WithCancel(context.Background())
    91  		defer cancel()
    92  		rg := newRunnableGroup(defaultBaseContext, errCh)
    93  
    94  		go func() {
    95  			defer GinkgoRecover()
    96  			<-time.After(50 * time.Millisecond)
    97  			Expect(rg.Start(ctx)).To(Succeed())
    98  		}()
    99  
   100  		for i := 0; i < 20; i++ {
   101  			go func(i int) {
   102  				defer GinkgoRecover()
   103  
   104  				<-time.After(time.Duration(i) * 10 * time.Millisecond)
   105  				Expect(rg.Add(RunnableFunc(func(c context.Context) error {
   106  					<-ctx.Done()
   107  					return nil
   108  				}), nil)).To(Succeed())
   109  			}(i)
   110  		}
   111  	})
   112  
   113  	It("should be able to close the group and wait for all runnables to finish", func() {
   114  		ctx, cancel := context.WithCancel(context.Background())
   115  
   116  		exited := ptr.To(int64(0))
   117  		rg := newRunnableGroup(defaultBaseContext, errCh)
   118  		for i := 0; i < 10; i++ {
   119  			Expect(rg.Add(RunnableFunc(func(c context.Context) error {
   120  				defer atomic.AddInt64(exited, 1)
   121  				<-ctx.Done()
   122  				<-time.After(time.Duration(i) * 10 * time.Millisecond)
   123  				return nil
   124  			}), nil)).To(Succeed())
   125  		}
   126  		Expect(rg.Start(ctx)).To(Succeed())
   127  
   128  		// Cancel the context, asking the runnables to exit.
   129  		cancel()
   130  		rg.StopAndWait(context.Background())
   131  
   132  		Expect(rg.Add(RunnableFunc(func(c context.Context) error {
   133  			return nil
   134  		}), nil)).ToNot(Succeed())
   135  
   136  		Expect(atomic.LoadInt64(exited)).To(BeNumerically("==", 10))
   137  	})
   138  
   139  	It("should be able to wait for all runnables to be ready at different intervals", func() {
   140  		ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
   141  		defer cancel()
   142  		rg := newRunnableGroup(defaultBaseContext, errCh)
   143  
   144  		go func() {
   145  			defer GinkgoRecover()
   146  			<-time.After(50 * time.Millisecond)
   147  			Expect(rg.Start(ctx)).To(Succeed())
   148  		}()
   149  
   150  		for i := 0; i < 20; i++ {
   151  			go func(i int) {
   152  				defer GinkgoRecover()
   153  
   154  				Expect(rg.Add(RunnableFunc(func(c context.Context) error {
   155  					<-ctx.Done()
   156  					return nil
   157  				}), func(_ context.Context) bool {
   158  					<-time.After(time.Duration(i) * 10 * time.Millisecond)
   159  					return true
   160  				})).To(Succeed())
   161  			}(i)
   162  		}
   163  	})
   164  
   165  	It("should be able to handle adding runnables while stopping", func() {
   166  		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   167  		defer cancel()
   168  		rg := newRunnableGroup(defaultBaseContext, errCh)
   169  
   170  		go func() {
   171  			defer GinkgoRecover()
   172  			<-time.After(1 * time.Millisecond)
   173  			Expect(rg.Start(ctx)).To(Succeed())
   174  		}()
   175  		go func() {
   176  			defer GinkgoRecover()
   177  			<-time.After(1 * time.Millisecond)
   178  			ctx, cancel := context.WithCancel(context.Background())
   179  			cancel()
   180  			rg.StopAndWait(ctx)
   181  		}()
   182  
   183  		for i := 0; i < 200; i++ {
   184  			go func(i int) {
   185  				defer GinkgoRecover()
   186  
   187  				<-time.After(time.Duration(i) * time.Microsecond)
   188  				Expect(rg.Add(RunnableFunc(func(c context.Context) error {
   189  					<-ctx.Done()
   190  					return nil
   191  				}), func(_ context.Context) bool {
   192  					return true
   193  				})).To(SatisfyAny(
   194  					Succeed(),
   195  					Equal(errRunnableGroupStopped),
   196  				))
   197  			}(i)
   198  		}
   199  	})
   200  
   201  	It("should not turn ready if some readiness check fail", func() {
   202  		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
   203  		defer cancel()
   204  		rg := newRunnableGroup(defaultBaseContext, errCh)
   205  
   206  		go func() {
   207  			defer GinkgoRecover()
   208  			<-time.After(50 * time.Millisecond)
   209  			Expect(rg.Start(ctx)).To(Succeed())
   210  		}()
   211  
   212  		for i := 0; i < 20; i++ {
   213  			go func(i int) {
   214  				defer GinkgoRecover()
   215  
   216  				Expect(rg.Add(RunnableFunc(func(c context.Context) error {
   217  					<-ctx.Done()
   218  					return nil
   219  				}), func(_ context.Context) bool {
   220  					<-time.After(time.Duration(i) * 10 * time.Millisecond)
   221  					return i%2 == 0 // Return false readiness all uneven indexes.
   222  				})).To(Succeed())
   223  			}(i)
   224  		}
   225  	})
   226  })
   227  

View as plain text