...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/cmd/webhook/main.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/cmd/webhook

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package main
    16  
    17  import (
    18  	goflag "flag"
    19  	"fmt"
    20  	"log"
    21  	"net"
    22  	"net/http"
    23  	_ "net/http/pprof" // Needed to allow pprof server to accept requests
    24  	"time"
    25  
    26  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis"
    27  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/gcp/profiler"
    28  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    29  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/logging"
    30  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/ready"
    31  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/webhook"
    32  
    33  	flag "github.com/spf13/pflag"
    34  	apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    35  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    36  	"k8s.io/apimachinery/pkg/labels"
    37  	"k8s.io/apimachinery/pkg/runtime/schema"
    38  	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    39  	"k8s.io/client-go/rest"
    40  	"sigs.k8s.io/controller-runtime/pkg/cache"
    41  	"sigs.k8s.io/controller-runtime/pkg/client"
    42  	"sigs.k8s.io/controller-runtime/pkg/client/config"
    43  	klog "sigs.k8s.io/controller-runtime/pkg/log"
    44  	"sigs.k8s.io/controller-runtime/pkg/manager"
    45  	"sigs.k8s.io/controller-runtime/pkg/manager/signals"
    46  )
    47  
    48  var logger = klog.Log.WithName("setup")
    49  
    50  func main() {
    51  	stop := signals.SetupSignalHandler()
    52  
    53  	var enablePprof bool
    54  	var pprofPort int
    55  
    56  	profiler.AddFlag(flag.CommandLine)
    57  	flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    58  	flag.BoolVar(&enablePprof, "enable-pprof", false, "Enable the pprof server.")
    59  	flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof server binds to if enabled.")
    60  	flag.Parse()
    61  
    62  	// this enables packages using the kubernetes controller-runtime logging package to log
    63  	logging.SetupLogger()
    64  
    65  	// Start pprof server if enabled
    66  	if enablePprof {
    67  		go func() {
    68  			if err := http.ListenAndServe(fmt.Sprintf(":%d", pprofPort), nil); err != nil {
    69  				logger.Error(err, "error while running pprof server")
    70  			}
    71  		}()
    72  	}
    73  
    74  	// Start Cloud Profiler agent if enabled
    75  	if err := profiler.StartIfEnabled(); err != nil {
    76  		logging.Fatal(err, "error starting Cloud Profiler agent")
    77  	}
    78  
    79  	// Get a config to talk to the apiserver
    80  	cfg, err := config.GetConfig()
    81  	if err != nil {
    82  		log.Fatal(err)
    83  	}
    84  
    85  	// Create a selector to restrict the cache's ListWatch to KCC CRDs.
    86  	labelSelector, err := labels.ValidatedSelectorFromSet(labels.Set{
    87  		k8s.KCCSystemLabel: "true",
    88  	})
    89  	if err != nil {
    90  		log.Fatal(err)
    91  	}
    92  	crdKind := &unstructured.Unstructured{}
    93  	crdKind.SetGroupVersionKind(schema.GroupVersionKind{
    94  		Group:   "apiextensions.k8s.io",
    95  		Version: "v1",
    96  		Kind:    "CustomResourceDefinition",
    97  	})
    98  
    99  	// Create a new Manager to provide shared dependencies and start components
   100  	mgr, err := manager.New(cfg, manager.Options{
   101  		// Although this Port value will specify the port of any webhooks
   102  		// spawned by the manager, those used by this manager are generated
   103  		// by the RegisterCommonWebhooks call below, and will not honor this value.
   104  		Port: webhook.ServicePort,
   105  		NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
   106  			opts.SelectorsByObject = cache.SelectorsByObject{
   107  				crdKind: cache.ObjectSelector{
   108  					Label: labelSelector,
   109  				},
   110  			}
   111  			return cache.New(config, opts)
   112  		},
   113  	})
   114  	if err != nil {
   115  		log.Fatal(err)
   116  	}
   117  
   118  	// Setup Scheme for all resources
   119  	apis.AddToSchemes = append(apis.AddToSchemes, apiextensions.SchemeBuilder.AddToScheme)
   120  	if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
   121  		log.Fatal(err)
   122  	}
   123  
   124  	log.Printf("Registering Webhooks.")
   125  	// Create a client that reads and writes directly from the server without object caches.
   126  	// We want to use a no-cache client for creating/updating the cert secret. With a cached client,
   127  	// it requires list privilege for the secret type.
   128  	nocacheClient, err := client.New(cfg, client.Options{})
   129  	if err != nil {
   130  		log.Fatal(err)
   131  	}
   132  	if err := webhook.RegisterCommonWebhooks(mgr, nocacheClient); err != nil {
   133  		log.Fatalf("error adding the validating webhooks: %v", err)
   134  	}
   135  
   136  	// The webhooks are not actually ready until N seconds after mgr.Start(...) is called, however,
   137  	// we have no easy way to get a signal from controller-runtime when they are ready. Since this
   138  	// is being rewritten soon, do some simple asynchronous polling of the HTTP server for readiness
   139  	// and once it succeeds create the ready file.
   140  	go func() {
   141  		timeout := 2 * time.Minute
   142  		log.Println(fmt.Sprintf("Waiting up to %v for the http server to be ready...", timeout))
   143  		if err := waitForHTTPServerToAcceptRequests("localhost", webhook.ServicePort, timeout); err != nil {
   144  			log.Fatalf("error waiting for http server to be ready: %v", err)
   145  		}
   146  		// Set up the HTTP server for the readiness probe
   147  		log.Println("Setting container as ready...")
   148  		ready.SetContainerAsReady()
   149  		log.Println("Container is ready.")
   150  	}()
   151  
   152  	log.Printf("Starting the Cmd.")
   153  
   154  	// Start the Cmd
   155  	log.Fatal(mgr.Start(stop))
   156  }
   157  
   158  func waitForHTTPServerToAcceptRequests(host string, port int, timeout time.Duration) error {
   159  	address := fmt.Sprintf("%v:%v", host, port)
   160  	var err error
   161  	for totalWait := time.Duration(0); totalWait < timeout; {
   162  		singleDialTimeout := 1 * time.Second
   163  		_, err = net.DialTimeout("tcp", address, singleDialTimeout)
   164  		if err == nil {
   165  			return nil
   166  		}
   167  		totalWait += singleDialTimeout
   168  		sleepTime := 2 * time.Second
   169  		time.Sleep(sleepTime)
   170  		totalWait += sleepTime
   171  	}
   172  	return fmt.Errorf("timeout of '%v' exceeded with a final error of '%w': still cannot contact http server at '%v'",
   173  		timeout, err, address)
   174  }
   175  

View as plain text