...

Source file src/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go

Documentation: sigs.k8s.io/controller-runtime/pkg/cluster

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cluster
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"net/http"
    23  	"time"
    24  
    25  	"github.com/go-logr/logr"
    26  	"k8s.io/apimachinery/pkg/api/meta"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/client-go/kubernetes/scheme"
    29  	"k8s.io/client-go/rest"
    30  	"k8s.io/client-go/tools/record"
    31  
    32  	"sigs.k8s.io/controller-runtime/pkg/cache"
    33  	"sigs.k8s.io/controller-runtime/pkg/client"
    34  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    35  	logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
    36  	intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
    37  )
    38  
    39  // Cluster provides various methods to interact with a cluster.
    40  type Cluster interface {
    41  	// GetHTTPClient returns an HTTP client that can be used to talk to the apiserver
    42  	GetHTTPClient() *http.Client
    43  
    44  	// GetConfig returns an initialized Config
    45  	GetConfig() *rest.Config
    46  
    47  	// GetCache returns a cache.Cache
    48  	GetCache() cache.Cache
    49  
    50  	// GetScheme returns an initialized Scheme
    51  	GetScheme() *runtime.Scheme
    52  
    53  	// GetClient returns a client configured with the Config. This client may
    54  	// not be a fully "direct" client -- it may read from a cache, for
    55  	// instance.  See Options.NewClient for more information on how the default
    56  	// implementation works.
    57  	GetClient() client.Client
    58  
    59  	// GetFieldIndexer returns a client.FieldIndexer configured with the client
    60  	GetFieldIndexer() client.FieldIndexer
    61  
    62  	// GetEventRecorderFor returns a new EventRecorder for the provided name
    63  	GetEventRecorderFor(name string) record.EventRecorder
    64  
    65  	// GetRESTMapper returns a RESTMapper
    66  	GetRESTMapper() meta.RESTMapper
    67  
    68  	// GetAPIReader returns a reader that will be configured to use the API server.
    69  	// This should be used sparingly and only when the client does not fit your
    70  	// use case.
    71  	GetAPIReader() client.Reader
    72  
    73  	// Start starts the cluster
    74  	Start(ctx context.Context) error
    75  }
    76  
    77  // Options are the possible options that can be configured for a Cluster.
    78  type Options struct {
    79  	// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
    80  	// Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
    81  	// idea to pass your own scheme in.  See the documentation in pkg/scheme for more information.
    82  	Scheme *runtime.Scheme
    83  
    84  	// MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
    85  	MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)
    86  
    87  	// Logger is the logger that should be used by this Cluster.
    88  	// If none is set, it defaults to log.Log global logger.
    89  	Logger logr.Logger
    90  
    91  	// SyncPeriod determines the minimum frequency at which watched resources are
    92  	// reconciled. A lower period will correct entropy more quickly, but reduce
    93  	// responsiveness to change if there are many watched resources. Change this
    94  	// value only if you know what you are doing. Defaults to 10 hours if unset.
    95  	// there will a 10 percent jitter between the SyncPeriod of all controllers
    96  	// so that all controllers will not send list requests simultaneously.
    97  	//
    98  	// Deprecated: Use Cache.SyncPeriod instead.
    99  	SyncPeriod *time.Duration
   100  
   101  	// HTTPClient is the http client that will be used to create the default
   102  	// Cache and Client. If not set the rest.HTTPClientFor function will be used
   103  	// to create the http client.
   104  	HTTPClient *http.Client
   105  
   106  	// Cache is the cache.Options that will be used to create the default Cache.
   107  	// By default, the cache will watch and list requested objects in all namespaces.
   108  	Cache cache.Options
   109  
   110  	// NewCache is the function that will create the cache to be used
   111  	// by the manager. If not set this will use the default new cache function.
   112  	//
   113  	// When using a custom NewCache, the Cache options will be passed to the
   114  	// NewCache function.
   115  	//
   116  	// NOTE: LOW LEVEL PRIMITIVE!
   117  	// Only use a custom NewCache if you know what you are doing.
   118  	NewCache cache.NewCacheFunc
   119  
   120  	// Client is the client.Options that will be used to create the default Client.
   121  	// By default, the client will use the cache for reads and direct calls for writes.
   122  	Client client.Options
   123  
   124  	// NewClient is the func that creates the client to be used by the manager.
   125  	// If not set this will create a Client backed by a Cache for read operations
   126  	// and a direct Client for write operations.
   127  	//
   128  	// When using a custom NewClient, the Client options will be passed to the
   129  	// NewClient function.
   130  	//
   131  	// NOTE: LOW LEVEL PRIMITIVE!
   132  	// Only use a custom NewClient if you know what you are doing.
   133  	NewClient client.NewClientFunc
   134  
   135  	// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
   136  	// Use this to customize the event correlator and spam filter
   137  	//
   138  	// Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers
   139  	// is shorter than the lifetime of your process.
   140  	EventBroadcaster record.EventBroadcaster
   141  
   142  	// makeBroadcaster allows deferring the creation of the broadcaster to
   143  	// avoid leaking goroutines if we never call Start on this manager.  It also
   144  	// returns whether or not this is a "owned" broadcaster, and as such should be
   145  	// stopped with the manager.
   146  	makeBroadcaster intrec.EventBroadcasterProducer
   147  
   148  	// Dependency injection for testing
   149  	newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
   150  }
   151  
   152  // Option can be used to manipulate Options.
   153  type Option func(*Options)
   154  
   155  // New constructs a brand new cluster.
   156  func New(config *rest.Config, opts ...Option) (Cluster, error) {
   157  	if config == nil {
   158  		return nil, errors.New("must specify Config")
   159  	}
   160  
   161  	originalConfig := config
   162  
   163  	config = rest.CopyConfig(config)
   164  	if config.UserAgent == "" {
   165  		config.UserAgent = rest.DefaultKubernetesUserAgent()
   166  	}
   167  
   168  	options := Options{}
   169  	for _, opt := range opts {
   170  		opt(&options)
   171  	}
   172  	options, err := setOptionsDefaults(options, config)
   173  	if err != nil {
   174  		options.Logger.Error(err, "Failed to set defaults")
   175  		return nil, err
   176  	}
   177  
   178  	// Create the mapper provider
   179  	mapper, err := options.MapperProvider(config, options.HTTPClient)
   180  	if err != nil {
   181  		options.Logger.Error(err, "Failed to get API Group-Resources")
   182  		return nil, err
   183  	}
   184  
   185  	// Create the cache for the cached read client and registering informers
   186  	cacheOpts := options.Cache
   187  	{
   188  		if cacheOpts.Scheme == nil {
   189  			cacheOpts.Scheme = options.Scheme
   190  		}
   191  		if cacheOpts.Mapper == nil {
   192  			cacheOpts.Mapper = mapper
   193  		}
   194  		if cacheOpts.HTTPClient == nil {
   195  			cacheOpts.HTTPClient = options.HTTPClient
   196  		}
   197  		if cacheOpts.SyncPeriod == nil {
   198  			cacheOpts.SyncPeriod = options.SyncPeriod
   199  		}
   200  	}
   201  	cache, err := options.NewCache(config, cacheOpts)
   202  	if err != nil {
   203  		return nil, err
   204  	}
   205  
   206  	// Create the client, and default its options.
   207  	clientOpts := options.Client
   208  	{
   209  		if clientOpts.Scheme == nil {
   210  			clientOpts.Scheme = options.Scheme
   211  		}
   212  		if clientOpts.Mapper == nil {
   213  			clientOpts.Mapper = mapper
   214  		}
   215  		if clientOpts.HTTPClient == nil {
   216  			clientOpts.HTTPClient = options.HTTPClient
   217  		}
   218  		if clientOpts.Cache == nil {
   219  			clientOpts.Cache = &client.CacheOptions{
   220  				Unstructured: false,
   221  			}
   222  		}
   223  		if clientOpts.Cache.Reader == nil {
   224  			clientOpts.Cache.Reader = cache
   225  		}
   226  	}
   227  	clientWriter, err := options.NewClient(config, clientOpts)
   228  	if err != nil {
   229  		return nil, err
   230  	}
   231  
   232  	// Create the API Reader, a client with no cache.
   233  	clientReader, err := client.New(config, client.Options{
   234  		HTTPClient: options.HTTPClient,
   235  		Scheme:     options.Scheme,
   236  		Mapper:     mapper,
   237  	})
   238  	if err != nil {
   239  		return nil, err
   240  	}
   241  
   242  	// Create the recorder provider to inject event recorders for the components.
   243  	// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
   244  	// to the particular controller that it's being injected into, rather than a generic one like is here.
   245  	recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
   246  	if err != nil {
   247  		return nil, err
   248  	}
   249  
   250  	return &cluster{
   251  		config:           originalConfig,
   252  		httpClient:       options.HTTPClient,
   253  		scheme:           options.Scheme,
   254  		cache:            cache,
   255  		fieldIndexes:     cache,
   256  		client:           clientWriter,
   257  		apiReader:        clientReader,
   258  		recorderProvider: recorderProvider,
   259  		mapper:           mapper,
   260  		logger:           options.Logger,
   261  	}, nil
   262  }
   263  
   264  // setOptionsDefaults set default values for Options fields.
   265  func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
   266  	if options.HTTPClient == nil {
   267  		var err error
   268  		options.HTTPClient, err = rest.HTTPClientFor(config)
   269  		if err != nil {
   270  			return options, err
   271  		}
   272  	}
   273  
   274  	// Use the Kubernetes client-go scheme if none is specified
   275  	if options.Scheme == nil {
   276  		options.Scheme = scheme.Scheme
   277  	}
   278  
   279  	if options.MapperProvider == nil {
   280  		options.MapperProvider = apiutil.NewDynamicRESTMapper
   281  	}
   282  
   283  	// Allow users to define how to create a new client
   284  	if options.NewClient == nil {
   285  		options.NewClient = client.New
   286  	}
   287  
   288  	// Allow newCache to be mocked
   289  	if options.NewCache == nil {
   290  		options.NewCache = cache.New
   291  	}
   292  
   293  	// Allow newRecorderProvider to be mocked
   294  	if options.newRecorderProvider == nil {
   295  		options.newRecorderProvider = intrec.NewProvider
   296  	}
   297  
   298  	// This is duplicated with pkg/manager, we need it here to provide
   299  	// the user with an EventBroadcaster and there for the Leader election
   300  	if options.EventBroadcaster == nil {
   301  		// defer initialization to avoid leaking by default
   302  		options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
   303  			return record.NewBroadcaster(), true
   304  		}
   305  	} else {
   306  		options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
   307  			return options.EventBroadcaster, false
   308  		}
   309  	}
   310  
   311  	if options.Logger.GetSink() == nil {
   312  		options.Logger = logf.RuntimeLog.WithName("cluster")
   313  	}
   314  
   315  	return options, nil
   316  }
   317  

View as plain text