...

Source file src/sigs.k8s.io/controller-runtime/pkg/internal/syncs/syncs.go

Documentation: sigs.k8s.io/controller-runtime/pkg/internal/syncs

     1  package syncs
     2  
     3  import (
     4  	"context"
     5  	"reflect"
     6  	"sync"
     7  )
     8  
     9  // MergeChans returns a channel that is closed when any of the input channels are signaled.
    10  // The caller must call the returned CancelFunc to ensure no resources are leaked.
    11  func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
    12  	var once sync.Once
    13  	out := make(chan T)
    14  	cancel := make(chan T)
    15  	cancelFunc := func() {
    16  		once.Do(func() {
    17  			close(cancel)
    18  		})
    19  		<-out
    20  	}
    21  	cases := make([]reflect.SelectCase, len(chans)+1)
    22  	for i := range chans {
    23  		cases[i] = reflect.SelectCase{
    24  			Dir:  reflect.SelectRecv,
    25  			Chan: reflect.ValueOf(chans[i]),
    26  		}
    27  	}
    28  	cases[len(cases)-1] = reflect.SelectCase{
    29  		Dir:  reflect.SelectRecv,
    30  		Chan: reflect.ValueOf(cancel),
    31  	}
    32  	go func() {
    33  		defer close(out)
    34  		_, _, _ = reflect.Select(cases)
    35  	}()
    36  
    37  	return out, cancelFunc
    38  }
    39  

View as plain text