...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/event_funnel.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/watcher

     1  // Copyright 2022 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package watcher
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  
    10  	"k8s.io/klog/v2"
    11  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    12  )
    13  
    14  // eventFunnel wraps a list of event channels and multiplexes them down to a
    15  // single event channel. New input channels can be added at runtime, and the
    16  // output channel will remain open until all input channels are closed.
    17  type eventFunnel struct {
    18  	// ctx closure triggers shutdown
    19  	ctx context.Context
    20  	// outCh is the funnel that consumes all events from input channels
    21  	outCh chan event.Event
    22  	// doneCh is closed after outCh is closed.
    23  	// This allows blocking until done without consuming events.
    24  	doneCh chan struct{}
    25  	// counterCh is used to track the number of open input channels.
    26  	counterCh chan int
    27  }
    28  
    29  func newEventFunnel(ctx context.Context) *eventFunnel {
    30  	funnel := &eventFunnel{
    31  		ctx:       ctx,
    32  		outCh:     make(chan event.Event),
    33  		doneCh:    make(chan struct{}),
    34  		counterCh: make(chan int),
    35  	}
    36  	// Wait until the context is done and all input channels are closed.
    37  	// Then close out and done channels to signal completion.
    38  	go func() {
    39  		defer func() {
    40  			// Don't close counterCh, otherwise AddInputChannel may panic.
    41  			klog.V(5).Info("Closing funnel")
    42  			close(funnel.outCh)
    43  			close(funnel.doneCh)
    44  		}()
    45  		ctxDoneCh := ctx.Done()
    46  
    47  		// Count input channels that have been added and not closed.
    48  		inputs := 0
    49  		for {
    50  			select {
    51  			case delta := <-funnel.counterCh:
    52  				inputs += delta
    53  				klog.V(5).Infof("Funnel input channels (%+d): %d", delta, inputs)
    54  			case <-ctxDoneCh:
    55  				// Stop waiting for context closure.
    56  				// Nil channel avoids busy waiting.
    57  				ctxDoneCh = nil
    58  			}
    59  			if ctxDoneCh == nil && inputs <= 0 {
    60  				// Context is closed and all input channels are closed.
    61  				break
    62  			}
    63  		}
    64  	}()
    65  	return funnel
    66  }
    67  
    68  // Add a new input channel to the multiplexer.
    69  func (m *eventFunnel) AddInputChannel(inCh <-chan event.Event) error {
    70  	select {
    71  	case <-m.ctx.Done(): // skip, if context is closed
    72  		return &EventFunnelClosedError{ContextError: m.ctx.Err()}
    73  	case m.counterCh <- 1: // increment counter
    74  	}
    75  
    76  	// Create a multiplexer for each new event channel.
    77  	go m.drain(inCh, m.outCh)
    78  	return nil
    79  }
    80  
    81  // OutputChannel channel receives all events sent to input channels.
    82  // This channel is closed after all input channels are closed.
    83  func (m *eventFunnel) OutputChannel() <-chan event.Event {
    84  	return m.outCh
    85  }
    86  
    87  // Done channel is closed after the Output channel is closed.
    88  // This allows blocking until done without consuming events.
    89  // If no input channels have been added yet, the done channel will be nil.
    90  func (m *eventFunnel) Done() <-chan struct{} {
    91  	return m.doneCh
    92  }
    93  
    94  // drain a single input channel to a single output channel.
    95  func (m *eventFunnel) drain(inCh <-chan event.Event, outCh chan<- event.Event) {
    96  	defer func() {
    97  		m.counterCh <- -1 // decrement counter
    98  	}()
    99  	for event := range inCh {
   100  		outCh <- event
   101  	}
   102  }
   103  
   104  type EventFunnelClosedError struct {
   105  	ContextError error
   106  }
   107  
   108  func (e *EventFunnelClosedError) Error() string {
   109  	return fmt.Sprintf("event funnel closed: %v", e.ContextError)
   110  }
   111  
   112  func (e *EventFunnelClosedError) Is(err error) bool {
   113  	fcErr, ok := err.(*EventFunnelClosedError)
   114  	if !ok {
   115  		return false
   116  	}
   117  	return e.ContextError == fcErr.ContextError
   118  }
   119  
   120  func (e *EventFunnelClosedError) Unwrap() error {
   121  	return e.ContextError
   122  }
   123  

View as plain text