...

Source file src/github.com/linkerd/linkerd2/multicluster/cmd/service-mirror/main.go

Documentation: github.com/linkerd/linkerd2/multicluster/cmd/service-mirror

     1  package servicemirror
     2  
     3  import (
     4  	"context"
     5  	"flag"
     6  	"fmt"
     7  	"os"
     8  	"os/signal"
     9  	"syscall"
    10  	"time"
    11  
    12  	controllerK8s "github.com/linkerd/linkerd2/controller/k8s"
    13  	servicemirror "github.com/linkerd/linkerd2/multicluster/service-mirror"
    14  	"github.com/linkerd/linkerd2/pkg/admin"
    15  	"github.com/linkerd/linkerd2/pkg/flags"
    16  	"github.com/linkerd/linkerd2/pkg/k8s"
    17  	"github.com/linkerd/linkerd2/pkg/multicluster"
    18  	sm "github.com/linkerd/linkerd2/pkg/servicemirror"
    19  	log "github.com/sirupsen/logrus"
    20  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    21  	dynamic "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    22  	"k8s.io/apimachinery/pkg/watch"
    23  	"k8s.io/client-go/tools/clientcmd"
    24  	"k8s.io/client-go/tools/leaderelection"
    25  	"k8s.io/client-go/tools/leaderelection/resourcelock"
    26  )
    27  
    28  const (
    29  	linkWatchRestartAfter = 10 * time.Second
    30  	// Duration of the lease
    31  	LEASE_DURATION = 30 * time.Second
    32  	// Deadline for the leader to refresh its lease. Defaults to the same value
    33  	// used by core controllers
    34  	LEASE_RENEW_DEADLINE = 10 * time.Second
    35  	// Duration leader elector clients should wait between action re-tries.
    36  	// Defaults to the same value used by core controllers
    37  	LEASE_RETRY_PERIOD = 2 * time.Second
    38  )
    39  
    40  var (
    41  	clusterWatcher *servicemirror.RemoteClusterServiceWatcher
    42  	probeWorker    *servicemirror.ProbeWorker
    43  )
    44  
    45  // Main executes the service-mirror controller
    46  func Main(args []string) {
    47  	cmd := flag.NewFlagSet("service-mirror", flag.ExitOnError)
    48  
    49  	kubeConfigPath := cmd.String("kubeconfig", "", "path to the local kube config")
    50  	requeueLimit := cmd.Int("event-requeue-limit", 3, "requeue limit for events")
    51  	metricsAddr := cmd.String("metrics-addr", ":9999", "address to serve scrapable metrics on")
    52  	namespace := cmd.String("namespace", "", "namespace containing Link and credentials Secret")
    53  	repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution")
    54  	enableHeadlessSvc := cmd.Bool("enable-headless-services", false, "toggle support for headless service mirroring")
    55  	enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
    56  
    57  	flags.ConfigureAndParse(cmd, args)
    58  	linkName := cmd.Arg(0)
    59  
    60  	ready := false
    61  	adminServer := admin.NewServer(*metricsAddr, *enablePprof, &ready)
    62  
    63  	go func() {
    64  		log.Infof("starting admin server on %s", *metricsAddr)
    65  		if err := adminServer.ListenAndServe(); err != nil {
    66  			log.Errorf("failed to start service mirror admin server: %s", err)
    67  		}
    68  	}()
    69  
    70  	rootCtx, cancel := context.WithCancel(context.Background())
    71  
    72  	stop := make(chan os.Signal, 1)
    73  	signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
    74  	go func() {
    75  		<-stop
    76  		log.Info("Received shutdown signal")
    77  		// Cancel root context. Cancellation will be propagated to all other
    78  		// contexts that are children of the root context.
    79  		cancel()
    80  	}()
    81  
    82  	// We create two different kubernetes API clients for the local cluster:
    83  	// k8sAPI is used as a dynamic client for unstructured access to Link custom
    84  	// resources.
    85  	//
    86  	// controllerK8sAPI is used by the cluster watcher to manage
    87  	// mirror resources such as services, namespaces, and endpoints.
    88  	k8sAPI, err := k8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0)
    89  	//TODO: Use can-i to check for required permissions
    90  	if err != nil {
    91  		log.Fatalf("Failed to initialize K8s API: %s", err)
    92  	}
    93  	controllerK8sAPI, err := controllerK8s.InitializeAPI(
    94  		rootCtx,
    95  		*kubeConfigPath,
    96  		false,
    97  		"local",
    98  		controllerK8s.NS,
    99  		controllerK8s.Svc,
   100  		controllerK8s.Endpoint,
   101  	)
   102  	if err != nil {
   103  		log.Fatalf("Failed to initialize K8s API: %s", err)
   104  	}
   105  
   106  	linkClient := k8sAPI.DynamicClient.Resource(multicluster.LinkGVR).Namespace(*namespace)
   107  	metrics := servicemirror.NewProbeMetricVecs()
   108  	controllerK8sAPI.Sync(nil)
   109  
   110  	ready = true
   111  	run := func(ctx context.Context) {
   112  	main:
   113  		for {
   114  			// Start link watch
   115  			linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{})
   116  			if err != nil {
   117  				log.Fatalf("Failed to watch Link %s: %s", linkName, err)
   118  			}
   119  			results := linkWatch.ResultChan()
   120  
   121  			// Each time the link resource is updated, reload the config and restart the
   122  			// cluster watcher.
   123  			for {
   124  				select {
   125  				// ctx.Done() is a one-shot channel that will be closed once
   126  				// the context has been cancelled. Receiving from a closed
   127  				// channel yields the value immediately.
   128  				case <-ctx.Done():
   129  					// The channel will be closed by the leader elector when a
   130  					// lease is lost, or by a background task handling SIGTERM.
   131  					// Before terminating the loop, stop the workers and set
   132  					// them to nil to release memory.
   133  					cleanupWorkers()
   134  					return
   135  				case event, ok := <-results:
   136  					if !ok {
   137  						log.Info("Link watch terminated; restarting watch")
   138  						continue main
   139  					}
   140  					switch obj := event.Object.(type) {
   141  					case *dynamic.Unstructured:
   142  						if obj.GetName() == linkName {
   143  							switch event.Type {
   144  							case watch.Added, watch.Modified:
   145  								link, err := multicluster.NewLink(*obj)
   146  								if err != nil {
   147  									log.Errorf("Failed to parse link %s: %s", linkName, err)
   148  									continue
   149  								}
   150  								log.Infof("Got updated link %s: %+v", linkName, link)
   151  								creds, err := loadCredentials(ctx, link, *namespace, k8sAPI)
   152  								if err != nil {
   153  									log.Errorf("Failed to load remote cluster credentials: %s", err)
   154  								}
   155  								err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc)
   156  								if err != nil {
   157  									// failed to restart cluster watcher; give a bit of slack
   158  									// and restart the link watch to give it another try
   159  									log.Error(err)
   160  									time.Sleep(linkWatchRestartAfter)
   161  									linkWatch.Stop()
   162  								}
   163  							case watch.Deleted:
   164  								log.Infof("Link %s deleted", linkName)
   165  								cleanupWorkers()
   166  							default:
   167  								log.Infof("Ignoring event type %s", event.Type)
   168  							}
   169  						}
   170  					default:
   171  						log.Errorf("Unknown object type detected: %+v", obj)
   172  					}
   173  				}
   174  			}
   175  		}
   176  	}
   177  
   178  	hostname, found := os.LookupEnv("HOSTNAME")
   179  	if !found {
   180  		log.Fatal("Failed to fetch 'HOSTNAME' environment variable")
   181  	}
   182  
   183  	lock := &resourcelock.LeaseLock{
   184  		LeaseMeta: metav1.ObjectMeta{
   185  			Name:      fmt.Sprintf("service-mirror-write-%s", linkName),
   186  			Namespace: *namespace,
   187  		},
   188  		Client: k8sAPI.CoordinationV1(),
   189  		LockConfig: resourcelock.ResourceLockConfig{
   190  			Identity: hostname,
   191  		},
   192  	}
   193  
   194  election:
   195  	for {
   196  		// RunOrDie will block until the lease is lost.
   197  		//
   198  		// When a lease is acquired, the OnStartedLeading callback will be
   199  		// triggered, and a main watcher loop will be established to watch Link
   200  		// resources.
   201  		//
   202  		// When the lease is lost, all watchers will be cleaned-up and we will
   203  		// loop then attempt to re-acquire the lease.
   204  		leaderelection.RunOrDie(rootCtx, leaderelection.LeaderElectionConfig{
   205  			// When runtime context is cancelled, lock will be released. Implies any
   206  			// code guarded by the lease _must_ finish before cancelling.
   207  			ReleaseOnCancel: true,
   208  			Lock:            lock,
   209  			LeaseDuration:   LEASE_DURATION,
   210  			RenewDeadline:   LEASE_RENEW_DEADLINE,
   211  			RetryPeriod:     LEASE_RETRY_PERIOD,
   212  			Callbacks: leaderelection.LeaderCallbacks{
   213  				OnStartedLeading: func(ctx context.Context) {
   214  					// When a lease is lost, RunOrDie will cancel the context
   215  					// passed into the OnStartedLeading callback. This will in
   216  					// turn cause us to cancel the work in the run() function,
   217  					// effectively terminating and cleaning-up the watches.
   218  					log.Info("Starting controller loop")
   219  					run(ctx)
   220  				},
   221  				OnStoppedLeading: func() {
   222  					log.Infof("%s released lease", hostname)
   223  				},
   224  				OnNewLeader: func(identity string) {
   225  					if identity == hostname {
   226  						log.Infof("%s acquired lease", hostname)
   227  					}
   228  				},
   229  			},
   230  		})
   231  
   232  		select {
   233  		// If the lease has been lost, and we have received a shutdown signal,
   234  		// break the loop and gracefully exit. We can guarantee at this point
   235  		// resources have been released.
   236  		case <-rootCtx.Done():
   237  			break election
   238  		// If the lease has been lost, loop and attempt to re-acquire it.
   239  		default:
   240  
   241  		}
   242  	}
   243  	log.Info("Shutting down")
   244  }
   245  
   246  // cleanupWorkers is a utility function that checks whether the worker pointers
   247  // (clusterWatcher and probeWorker) are instantiated, and if they are, stops
   248  // their execution and sets the pointers to a nil value so that memory may be
   249  // garbage collected.
   250  func cleanupWorkers() {
   251  	if clusterWatcher != nil {
   252  		// release, but do not clean-up services created
   253  		// the `unlink` command will take care of that
   254  		clusterWatcher.Stop(false)
   255  		clusterWatcher = nil
   256  	}
   257  
   258  	if probeWorker != nil {
   259  		probeWorker.Stop()
   260  		probeWorker = nil
   261  	}
   262  }
   263  
   264  func loadCredentials(ctx context.Context, link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) ([]byte, error) {
   265  	// Load the credentials secret
   266  	secret, err := k8sAPI.Interface.CoreV1().Secrets(namespace).Get(ctx, link.ClusterCredentialsSecret, metav1.GetOptions{})
   267  	if err != nil {
   268  		return nil, fmt.Errorf("failed to load credentials secret %s: %w", link.ClusterCredentialsSecret, err)
   269  	}
   270  	return sm.ParseRemoteClusterSecret(secret)
   271  }
   272  
   273  func restartClusterWatcher(
   274  	ctx context.Context,
   275  	link multicluster.Link,
   276  	namespace string,
   277  	creds []byte,
   278  	controllerK8sAPI *controllerK8s.API,
   279  	requeueLimit int,
   280  	repairPeriod time.Duration,
   281  	metrics servicemirror.ProbeMetricVecs,
   282  	enableHeadlessSvc bool,
   283  ) error {
   284  
   285  	cleanupWorkers()
   286  
   287  	workerMetrics, err := metrics.NewWorkerMetrics(link.TargetClusterName)
   288  	if err != nil {
   289  		return fmt.Errorf("failed to create metrics for cluster watcher: %w", err)
   290  	}
   291  
   292  	// If linked against a cluster that has a gateway, start a probe and
   293  	// initialise the liveness channel
   294  	var ch chan bool
   295  	if link.ProbeSpec.Path != "" {
   296  		probeWorker = servicemirror.NewProbeWorker(fmt.Sprintf("probe-gateway-%s", link.TargetClusterName), &link.ProbeSpec, workerMetrics, link.TargetClusterName)
   297  		probeWorker.Start()
   298  		ch = probeWorker.Liveness
   299  	}
   300  
   301  	// Start cluster watcher
   302  	cfg, err := clientcmd.RESTConfigFromKubeConfig(creds)
   303  	if err != nil {
   304  		return fmt.Errorf("unable to parse kube config: %w", err)
   305  	}
   306  	cw, err := servicemirror.NewRemoteClusterServiceWatcher(
   307  		ctx,
   308  		namespace,
   309  		controllerK8sAPI,
   310  		cfg,
   311  		&link,
   312  		requeueLimit,
   313  		repairPeriod,
   314  		ch,
   315  		enableHeadlessSvc,
   316  	)
   317  	if err != nil {
   318  		return fmt.Errorf("unable to create cluster watcher: %w", err)
   319  	}
   320  	clusterWatcher = cw
   321  	err = clusterWatcher.Start(ctx)
   322  	if err != nil {
   323  		return fmt.Errorf("failed to start cluster watcher: %w", err)
   324  	}
   325  
   326  	return nil
   327  }
   328  

View as plain text