...

Source file src/sigs.k8s.io/controller-runtime/pkg/source/source.go

Documentation: sigs.k8s.io/controller-runtime/pkg/source

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package source
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"sync"
    24  
    25  	"k8s.io/client-go/util/workqueue"
    26  	"k8s.io/utils/ptr"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  	"sigs.k8s.io/controller-runtime/pkg/event"
    29  	"sigs.k8s.io/controller-runtime/pkg/handler"
    30  	internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
    31  
    32  	"sigs.k8s.io/controller-runtime/pkg/cache"
    33  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    34  )
    35  
    36  // Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
    37  // which should be processed by event.EventHandlers to enqueue reconcile.Requests.
    38  //
    39  // * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
    40  //
    41  // * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls).
    42  //
    43  // Users may build their own Source implementations.
    44  type Source interface {
    45  	// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
    46  	// to enqueue reconcile.Requests.
    47  	Start(context.Context, workqueue.RateLimitingInterface) error
    48  }
    49  
    50  // SyncingSource is a source that needs syncing prior to being usable. The controller
    51  // will call its WaitForSync prior to starting workers.
    52  type SyncingSource interface {
    53  	Source
    54  	WaitForSync(ctx context.Context) error
    55  }
    56  
    57  // Kind creates a KindSource with the given cache provider.
    58  func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource {
    59  	return &internal.Kind[T]{
    60  		Type:       object,
    61  		Cache:      cache,
    62  		Handler:    handler,
    63  		Predicates: predicates,
    64  	}
    65  }
    66  
    67  var _ Source = &channel[string]{}
    68  
    69  // ChannelOpt allows to configure a source.Channel.
    70  type ChannelOpt[T any] func(*channel[T])
    71  
    72  // WithPredicates adds the configured predicates to a source.Channel.
    73  func WithPredicates[T any](p ...predicate.TypedPredicate[T]) ChannelOpt[T] {
    74  	return func(c *channel[T]) {
    75  		c.predicates = append(c.predicates, p...)
    76  	}
    77  }
    78  
    79  // WithBufferSize configures the buffer size for a source.Channel. By
    80  // default, the buffer size is 1024.
    81  func WithBufferSize[T any](bufferSize int) ChannelOpt[T] {
    82  	return func(c *channel[T]) {
    83  		c.bufferSize = &bufferSize
    84  	}
    85  }
    86  
    87  // Channel is used to provide a source of events originating outside the cluster
    88  // (e.g. GitHub Webhook callback).  Channel requires the user to wire the external
    89  // source (e.g. http handler) to write GenericEvents to the underlying channel.
    90  func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], opts ...ChannelOpt[T]) Source {
    91  	c := &channel[T]{
    92  		source:  source,
    93  		handler: handler,
    94  	}
    95  	for _, opt := range opts {
    96  		opt(c)
    97  	}
    98  
    99  	return c
   100  }
   101  
   102  type channel[T any] struct {
   103  	// once ensures the event distribution goroutine will be performed only once
   104  	once sync.Once
   105  
   106  	// source is the source channel to fetch GenericEvents
   107  	source <-chan event.TypedGenericEvent[T]
   108  
   109  	handler handler.TypedEventHandler[T]
   110  
   111  	predicates []predicate.TypedPredicate[T]
   112  
   113  	bufferSize *int
   114  
   115  	// dest is the destination channels of the added event handlers
   116  	dest []chan event.TypedGenericEvent[T]
   117  
   118  	// destLock is to ensure the destination channels are safely added/removed
   119  	destLock sync.Mutex
   120  }
   121  
   122  func (cs *channel[T]) String() string {
   123  	return fmt.Sprintf("channel source: %p", cs)
   124  }
   125  
   126  // Start implements Source and should only be called by the Controller.
   127  func (cs *channel[T]) Start(
   128  	ctx context.Context,
   129  	queue workqueue.RateLimitingInterface,
   130  ) error {
   131  	// Source should have been specified by the user.
   132  	if cs.source == nil {
   133  		return fmt.Errorf("must specify Channel.Source")
   134  	}
   135  	if cs.handler == nil {
   136  		return errors.New("must specify Channel.Handler")
   137  	}
   138  
   139  	if cs.bufferSize == nil {
   140  		cs.bufferSize = ptr.To(1024)
   141  	}
   142  
   143  	dst := make(chan event.TypedGenericEvent[T], *cs.bufferSize)
   144  
   145  	cs.destLock.Lock()
   146  	cs.dest = append(cs.dest, dst)
   147  	cs.destLock.Unlock()
   148  
   149  	cs.once.Do(func() {
   150  		// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
   151  		go cs.syncLoop(ctx)
   152  	})
   153  
   154  	go func() {
   155  		for evt := range dst {
   156  			shouldHandle := true
   157  			for _, p := range cs.predicates {
   158  				if !p.Generic(evt) {
   159  					shouldHandle = false
   160  					break
   161  				}
   162  			}
   163  
   164  			if shouldHandle {
   165  				func() {
   166  					ctx, cancel := context.WithCancel(ctx)
   167  					defer cancel()
   168  					cs.handler.Generic(ctx, evt, queue)
   169  				}()
   170  			}
   171  		}
   172  	}()
   173  
   174  	return nil
   175  }
   176  
   177  func (cs *channel[T]) doStop() {
   178  	cs.destLock.Lock()
   179  	defer cs.destLock.Unlock()
   180  
   181  	for _, dst := range cs.dest {
   182  		close(dst)
   183  	}
   184  }
   185  
   186  func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) {
   187  	cs.destLock.Lock()
   188  	defer cs.destLock.Unlock()
   189  
   190  	for _, dst := range cs.dest {
   191  		// We cannot make it under goroutine here, or we'll meet the
   192  		// race condition of writing message to closed channels.
   193  		// To avoid blocking, the dest channels are expected to be of
   194  		// proper buffer size. If we still see it blocked, then
   195  		// the controller is thought to be in an abnormal state.
   196  		dst <- evt
   197  	}
   198  }
   199  
   200  func (cs *channel[T]) syncLoop(ctx context.Context) {
   201  	for {
   202  		select {
   203  		case <-ctx.Done():
   204  			// Close destination channels
   205  			cs.doStop()
   206  			return
   207  		case evt, stillOpen := <-cs.source:
   208  			if !stillOpen {
   209  				// if the source channel is closed, we're never gonna get
   210  				// anything more on it, so stop & bail
   211  				cs.doStop()
   212  				return
   213  			}
   214  			cs.distribute(evt)
   215  		}
   216  	}
   217  }
   218  
   219  // Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
   220  type Informer struct {
   221  	// Informer is the controller-runtime Informer
   222  	Informer   cache.Informer
   223  	Handler    handler.EventHandler
   224  	Predicates []predicate.Predicate
   225  }
   226  
   227  var _ Source = &Informer{}
   228  
   229  // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
   230  // to enqueue reconcile.Requests.
   231  func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
   232  	// Informer should have been specified by the user.
   233  	if is.Informer == nil {
   234  		return fmt.Errorf("must specify Informer.Informer")
   235  	}
   236  	if is.Handler == nil {
   237  		return errors.New("must specify Informer.Handler")
   238  	}
   239  
   240  	_, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs())
   241  	if err != nil {
   242  		return err
   243  	}
   244  	return nil
   245  }
   246  
   247  func (is *Informer) String() string {
   248  	return fmt.Sprintf("informer source: %p", is.Informer)
   249  }
   250  
   251  var _ Source = Func(nil)
   252  
   253  // Func is a function that implements Source.
   254  type Func func(context.Context, workqueue.RateLimitingInterface) error
   255  
   256  // Start implements Source.
   257  func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
   258  	return f(ctx, queue)
   259  }
   260  
   261  func (f Func) String() string {
   262  	return fmt.Sprintf("func source: %p", f)
   263  }
   264  

View as plain text