...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/entrypoint/entrypoint.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"io/ioutil"
     7  	"os"
     8  	"os/signal"
     9  	"path"
    10  	"strings"
    11  	"sync/atomic"
    12  	"syscall"
    13  	"time"
    14  
    15  	"github.com/google/uuid"
    16  
    17  	"github.com/datawire/dlib/dgroup"
    18  	"github.com/datawire/dlib/dlog"
    19  	"github.com/emissary-ingress/emissary/v3/pkg/acp"
    20  	"github.com/emissary-ingress/emissary/v3/pkg/ambex"
    21  	"github.com/emissary-ingress/emissary/v3/pkg/busy"
    22  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    23  	"github.com/emissary-ingress/emissary/v3/pkg/logutil"
    24  	"github.com/emissary-ingress/emissary/v3/pkg/memory"
    25  )
    26  
    27  // This is the main ambassador entrypoint. It launches and manages two other
    28  // processes:
    29  //
    30  //  1. The diagd process.
    31  //  2. Envoy
    32  //
    33  // The entrypoint process manages two other goroutines:
    34  //
    35  //  1. The watcher goroutine that watches for changes in ambassador inputs and
    36  //     notifies diagd.
    37  //  2. The ambex goroutine that feeds envoy configuration updates via ADS.
    38  //
    39  // Dataflow Diagram
    40  //
    41  //   Kubernetes Watches
    42  //          |
    43  //          | (k8s resources, subscription)
    44  //          |
    45  //         \|/               consul endpoints, subscription)
    46  //     entrypoint[watcher]<----------------------------------- Consul Watches
    47  //          |
    48  //          | (Snapshot, POST)
    49  //          |
    50  //         \|/
    51  //        diagd
    52  //          |
    53  //          | (envoy config resources, pushed via writing files + SIGHUP)
    54  //          |
    55  //         \|/
    56  //     entrypoint[ambex]
    57  //          |
    58  //          | (envoy config resources, ADS subscription)
    59  //          |
    60  //         \|/
    61  //        envoy
    62  //
    63  // Notation:
    64  //
    65  //   The arrows point in the direction that data flows. Each arrow is labeled
    66  //   with a tuple of the data type, and a short description of the nature of
    67  //   communication.
    68  //
    69  // The golang entrypoint process assembles all the ambassador inputs from
    70  // kubernetes and consul. When it has a complete/consistent set of inputs, it
    71  // passes the complete snapshot of inputs along to diagd along with a list of
    72  // deltas and invalid objects. This snapshot is fully detailed in snapshot.go
    73  //
    74  // The entrypoint goes to some trouble to ensure shared fate between all three
    75  // processes as well as all the goroutines it manages, i.e. if any one of them
    76  // dies for any reason, the whole process will shutdown and some larger process
    77  // manager (e.g. kubernetes) is expected to take note and restart if
    78  // appropriate.
    79  
    80  func Main(ctx context.Context, Version string, args ...string) error {
    81  	// Setup logging according to AES_LOG_LEVEL
    82  	busy.SetLogLevel(logutil.DefaultLogLevel)
    83  	if lvl := os.Getenv("AES_LOG_LEVEL"); lvl != "" {
    84  		parsed, err := logutil.ParseLogLevel(lvl)
    85  		if err != nil {
    86  			dlog.Errorf(ctx, "Error parsing log level: %v", err)
    87  		} else {
    88  			busy.SetLogLevel(parsed)
    89  		}
    90  	}
    91  
    92  	// The agent service is no longer supported, so clear it out.
    93  	// For good measure, we also unconditionally return the empty
    94  	// string in GetAgentService().
    95  	os.Unsetenv("AGENT_SERVICE")
    96  	dlog.Infof(ctx, "Started Ambassador (Version %s)", Version)
    97  
    98  	demoMode := false
    99  
   100  	// XXX Yes, this is a disgusting hack. We can switch to a legit argument
   101  	// parser later, when we have a second argument.
   102  	if (len(args) == 1) && (args[0] == "--demo") {
   103  		// Demo mode!
   104  		dlog.Infof(ctx, "DEMO MODE")
   105  		demoMode = true
   106  	}
   107  
   108  	clusterID := GetClusterID(ctx)
   109  	os.Setenv("AMBASSADOR_CLUSTER_ID", clusterID)
   110  	dlog.Infof(ctx, "AMBASSADOR_CLUSTER_ID=%s", clusterID)
   111  
   112  	pec := "PYTHON_EGG_CACHE"
   113  	if os.Getenv(pec) == "" {
   114  		os.Setenv(pec, path.Join(GetAmbassadorConfigBaseDir(), ".cache"))
   115  	}
   116  	os.Setenv("PYTHONUNBUFFERED", "true")
   117  
   118  	// Make sure that all of the directories that we need actually exist.
   119  	if err := ensureDir(GetHomeDir()); err != nil {
   120  		return err
   121  	}
   122  	if err := ensureDir(GetAmbassadorConfigBaseDir()); err != nil {
   123  		return err
   124  	}
   125  	if err := ensureDir(GetSnapshotDir()); err != nil {
   126  		return err
   127  	}
   128  	if err := ensureDir(GetEnvoyDir()); err != nil {
   129  		return err
   130  	}
   131  
   132  	// We use this to wait until the bootstrap config has been written before starting envoy.
   133  	envoyHUP := make(chan os.Signal, 1)
   134  	signal.Notify(envoyHUP, syscall.SIGHUP)
   135  
   136  	// Go ahead and create an AmbassadorWatcher now, since we'll need it later.
   137  	ambwatch := acp.NewAmbassadorWatcher(acp.NewEnvoyWatcher(), acp.NewDiagdWatcher())
   138  
   139  	group := dgroup.NewGroup(ctx, dgroup.GroupConfig{
   140  		EnableSignalHandling: true,
   141  		SoftShutdownTimeout:  10 * time.Second,
   142  		HardShutdownTimeout:  10 * time.Second,
   143  	})
   144  
   145  	// Demo mode: start the demo services. Starting the demo stuff first is
   146  	// kind of important: it's nice to give them a chance to start running before
   147  	// Ambassador really gets running.
   148  	if demoMode {
   149  		bootDemoMode(ctx, group, ambwatch)
   150  	}
   151  
   152  	group.Go("diagd", func(ctx context.Context) error {
   153  		cmd := subcommand(ctx, "diagd", GetDiagdArgs(ctx, demoMode)...)
   154  		if envbool("DEV_SHUTUP_DIAGD") {
   155  			cmd.Stdout = nil
   156  			cmd.Stderr = nil
   157  		}
   158  		return cmd.Run()
   159  	})
   160  
   161  	usage := memory.GetMemoryUsage(ctx)
   162  	if !envbool("DEV_SHUTUP_MEMORY") {
   163  		group.Go("memory", func(ctx context.Context) error {
   164  			usage.Watch(ctx)
   165  			return nil
   166  		})
   167  	}
   168  
   169  	fastpathCh := make(chan *ambex.FastpathSnapshot)
   170  	group.Go("ambex", func(ctx context.Context) error {
   171  		return ambex.Main(ctx, Version, usage.PercentUsed, fastpathCh, "--ads-listen-address",
   172  			"127.0.0.1:8003", GetEnvoyDir())
   173  	})
   174  
   175  	group.Go("envoy", func(ctx context.Context) error {
   176  		return runEnvoy(ctx, envoyHUP)
   177  	})
   178  
   179  	snapshot := &atomic.Value{}
   180  	group.Go("snapshot_server", func(ctx context.Context) error {
   181  		return snapshotServer(ctx, snapshot)
   182  	})
   183  	if !envbool("AMBASSADOR_DISABLE_SNAPSHOT_SERVER") {
   184  		group.Go("external_snapshot_server", func(ctx context.Context) error {
   185  			return externalSnapshotServer(ctx, snapshot)
   186  		})
   187  	}
   188  
   189  	if !demoMode {
   190  		group.Go("watcher", func(ctx context.Context) error {
   191  			// We need to pass the AmbassadorWatcher to this (Kubernetes/Consul) watcher, so
   192  			// that it can tell the AmbassadorWatcher when snapshots are posted.
   193  			return WatchAllTheThings(ctx, ambwatch, snapshot, fastpathCh, clusterID, Version)
   194  		})
   195  	}
   196  
   197  	// Finally, fire up the health check handler.
   198  	group.Go("healthchecks", func(ctx context.Context) error {
   199  		return healthCheckHandler(ctx, ambwatch)
   200  	})
   201  
   202  	// Launch every file in the sidecar directory. Note that this is "bug compatible" with
   203  	// entrypoint.sh for now, e.g. we don't check execute bits or anything like that.
   204  	sidecarDir := "/ambassador/sidecars"
   205  	sidecars, err := ioutil.ReadDir(sidecarDir)
   206  	if err != nil && !os.IsNotExist(err) {
   207  		return err
   208  	}
   209  	for _, sidecar := range sidecars {
   210  		group.Go(sidecar.Name(), func(ctx context.Context) error {
   211  			cmd := subcommand(ctx, path.Join(sidecarDir, sidecar.Name()))
   212  			return cmd.Run()
   213  		})
   214  	}
   215  
   216  	return group.Wait()
   217  }
   218  
   219  func clusterIDFromRootID(rootID string) string {
   220  	clusterUrl := fmt.Sprintf("d6e_id://%s/%s", rootID, GetAmbassadorID())
   221  	uid := uuid.NewSHA1(uuid.NameSpaceURL, []byte(clusterUrl))
   222  
   223  	return strings.ToLower(uid.String())
   224  }
   225  
   226  func GetClusterID(ctx context.Context) (clusterID string) {
   227  	clusterID = env("AMBASSADOR_CLUSTER_ID", env("AMBASSADOR_SCOUT_ID", ""))
   228  	if clusterID != "" {
   229  		return clusterID
   230  	}
   231  
   232  	rootID := "00000000-0000-0000-0000-000000000000"
   233  
   234  	client, err := kates.NewClient(kates.ClientConfig{})
   235  	if err == nil {
   236  		nsName := "default"
   237  		if IsAmbassadorSingleNamespace() {
   238  			nsName = GetAmbassadorNamespace()
   239  		}
   240  		ns := &kates.Namespace{
   241  			TypeMeta:   kates.TypeMeta{Kind: "Namespace"},
   242  			ObjectMeta: kates.ObjectMeta{Name: nsName},
   243  		}
   244  
   245  		err := client.Get(ctx, ns, ns)
   246  		if err == nil {
   247  			rootID = string(ns.GetUID())
   248  		}
   249  	}
   250  
   251  	return clusterIDFromRootID(rootID)
   252  }
   253  

View as plain text