...

Source file src/github.com/datawire/ambassador/v2/pkg/agent/agent.go

Documentation: github.com/datawire/ambassador/v2/pkg/agent

     1  package agent
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"io/ioutil"
     8  	"net/http"
     9  	"net/url"
    10  	"os"
    11  	"strings"
    12  	"sync"
    13  	"time"
    14  
    15  	envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
    16  	io_prometheus_client "github.com/prometheus/client_model/go"
    17  	"google.golang.org/grpc/peer"
    18  	"google.golang.org/protobuf/types/known/timestamppb"
    19  	"k8s.io/apimachinery/pkg/runtime/schema"
    20  
    21  	"github.com/datawire/ambassador/v2/pkg/api/agent"
    22  	"github.com/datawire/ambassador/v2/pkg/kates"
    23  	snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    24  	"github.com/datawire/dlib/dlog"
    25  )
    26  
    27  const defaultMinReportPeriod = 30 * time.Second
    28  const cloudConnectTokenKey = "CLOUD_CONNECT_TOKEN"
    29  
    30  type Comm interface {
    31  	Close() error
    32  	Report(context.Context, *agent.Snapshot, string) error
    33  	ReportCommandResult(context.Context, *agent.CommandResult, string) error
    34  	Directives() <-chan *agent.Directive
    35  	StreamMetrics(context.Context, *agent.StreamMetricsMessage, string) error
    36  }
    37  
    38  type atomicBool struct {
    39  	mutex sync.Mutex
    40  	value bool
    41  }
    42  
    43  func (ab *atomicBool) Value() bool {
    44  	ab.mutex.Lock()
    45  	defer ab.mutex.Unlock()
    46  	return ab.value
    47  }
    48  
    49  func (ab *atomicBool) Set(v bool) {
    50  	ab.mutex.Lock()
    51  	defer ab.mutex.Unlock()
    52  	ab.value = v
    53  }
    54  
    55  // Agent is the component that talks to the DCP Director, which is a cloud
    56  // service run by Datawire.
    57  type Agent struct {
    58  	// Connectivity to the Director
    59  
    60  	comm                  Comm
    61  	connInfo              *ConnInfo
    62  	agentID               *agent.Identity
    63  	newDirective          <-chan *agent.Directive
    64  	ambassadorAPIKeyMutex sync.Mutex
    65  	ambassadorAPIKey      string
    66  	directiveHandler      DirectiveHandler
    67  	// store what the initial value was in the env var so we can set the ambassadorAPIKey value
    68  	// (^^Above) if the configmap and/or secret get deleted.
    69  	ambassadorAPIKeyEnvVarValue string
    70  	connAddress                 string
    71  
    72  	// State managed by the director via the retriever
    73  
    74  	reportingStopped bool          // Did the director say don't report?
    75  	minReportPeriod  time.Duration // How often can we Report?
    76  	lastDirectiveID  string
    77  
    78  	// The state of reporting
    79  
    80  	reportToSend   *agent.Snapshot // Report that's ready to send
    81  	reportRunning  atomicBool      // Is a report being sent right now?
    82  	reportComplete chan error      // Report() finished with this error
    83  
    84  	// current cluster state of core resources
    85  	coreStore *coreStore
    86  
    87  	// apiDocsStore holds OpenAPI documents from cluster Mappings
    88  	apiDocsStore *APIDocsStore
    89  
    90  	// rolloutStore holds Argo Rollouts state from cluster
    91  	rolloutStore *RolloutStore
    92  	// applicationStore holds Argo Applications state from cluster
    93  	applicationStore *ApplicationStore
    94  
    95  	// config map/secret information
    96  	// agent namespace is... the namespace the agent is running in.
    97  	// but more importantly, it's the namespace that the config resource lives in (which is
    98  	// either a ConfigMap or Secret)
    99  	agentNamespace string
   100  	// Name of the k8s ConfigMap or Secret the CLOUD_CONNECT_TOKEN exists on. We're supporting
   101  	// both Secrets and ConfigMaps here because it is likely in an enterprise cluster, the RBAC
   102  	// for secrets is locked down to Ops folks only, and we want to make it easy for regular ol'
   103  	// engineers to give this whole service catalog thing a go
   104  	agentCloudResourceConfigName string
   105  
   106  	// Field selector for the k8s resources that the agent watches
   107  	agentWatchFieldSelector string
   108  
   109  	// A mutex related to the metrics endpoint action, to avoid concurrent (and useless) pushes.
   110  	metricsRelayMutex sync.Mutex
   111  	// Timestamp to keep in memory to Prevent from making too many requests to the Ambassador
   112  	// Cloud API.
   113  	metricsBackoffUntil time.Time
   114  
   115  	// Used to accumulate metrics for a same timestamp before pushing them to the cloud.
   116  	aggregatedMetrics map[string][]*io_prometheus_client.MetricFamily
   117  
   118  	// Extra headers to inject into RPC requests to ambassador cloud.
   119  	rpcExtraHeaders []string
   120  }
   121  
   122  func getEnvWithDefault(envVarKey string, defaultValue string) string {
   123  	value := os.Getenv(envVarKey)
   124  	if value == "" {
   125  		value = defaultValue
   126  	}
   127  	return value
   128  }
   129  
   130  // New returns a new Agent.
   131  func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsGetterFactory) *Agent {
   132  	reportPeriodFromEnv := os.Getenv("AGENT_REPORTING_PERIOD")
   133  	var reportPeriod time.Duration
   134  	if reportPeriodFromEnv != "" {
   135  		reportPeriod, err := time.ParseDuration(reportPeriodFromEnv)
   136  		if err != nil {
   137  			reportPeriod = defaultMinReportPeriod
   138  		} else {
   139  			reportPeriod = MaxDuration(defaultMinReportPeriod, reportPeriod)
   140  		}
   141  	} else {
   142  		reportPeriod = defaultMinReportPeriod
   143  	}
   144  	if directiveHandler == nil {
   145  		directiveHandler = &BasicDirectiveHandler{
   146  			DefaultMinReportPeriod: defaultMinReportPeriod,
   147  			rolloutsGetterFactory:  rolloutsGetterFactory,
   148  		}
   149  	}
   150  
   151  	var rpcExtraHeaders = make([]string, 0)
   152  
   153  	if os.Getenv("RPC_INTERCEPT_HEADER_KEY") != "" &&
   154  		os.Getenv("RPC_INTERCEPT_HEADER_VALUE") != "" {
   155  		rpcExtraHeaders = append(
   156  			rpcExtraHeaders,
   157  			os.Getenv("RPC_INTERCEPT_HEADER_KEY"),
   158  			os.Getenv("RPC_INTERCEPT_HEADER_VALUE"),
   159  		)
   160  	}
   161  
   162  	return &Agent{
   163  		minReportPeriod:  reportPeriod,
   164  		reportComplete:   make(chan error),
   165  		ambassadorAPIKey: os.Getenv(cloudConnectTokenKey),
   166  		// store this same value in a different variable, so that if ambassadorAPIKey gets
   167  		// changed by some other configuration, we know what to change it back to. See
   168  		// comment on the struct for more detail
   169  		ambassadorAPIKeyEnvVarValue:  os.Getenv(cloudConnectTokenKey),
   170  		connAddress:                  os.Getenv("RPC_CONNECTION_ADDRESS"),
   171  		agentNamespace:               getEnvWithDefault("AGENT_NAMESPACE", "ambassador"),
   172  		agentCloudResourceConfigName: getEnvWithDefault("AGENT_CONFIG_RESOURCE_NAME", "ambassador-agent-cloud-token"),
   173  		directiveHandler:             directiveHandler,
   174  		reportRunning:                atomicBool{value: false},
   175  		agentWatchFieldSelector:      getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"),
   176  		metricsBackoffUntil:          time.Now().Add(defaultMinReportPeriod),
   177  		rpcExtraHeaders:              rpcExtraHeaders,
   178  		aggregatedMetrics:            map[string][]*io_prometheus_client.MetricFamily{},
   179  	}
   180  }
   181  
   182  func (a *Agent) StopReporting(ctx context.Context) {
   183  	dlog.Debugf(ctx, "stop reporting: %t -> true", a.reportingStopped)
   184  	a.reportingStopped = true
   185  }
   186  
   187  func (a *Agent) StartReporting(ctx context.Context) {
   188  	dlog.Debugf(ctx, "stop reporting: %t -> false", a.reportingStopped)
   189  	a.reportingStopped = false
   190  }
   191  
   192  func (a *Agent) SetMinReportPeriod(ctx context.Context, dur time.Duration) {
   193  	dlog.Debugf(ctx, "minimum report period %s -> %s", a.minReportPeriod, dur)
   194  	a.minReportPeriod = dur
   195  }
   196  
   197  func (a *Agent) SetLastDirectiveID(ctx context.Context, id string) {
   198  	dlog.Debugf(ctx, "setting last directive ID %s", id)
   199  	a.lastDirectiveID = id
   200  }
   201  
   202  func getAmbSnapshotInfo(url string) (*snapshotTypes.Snapshot, error) {
   203  	resp, err := http.Get(url)
   204  	if err != nil {
   205  		return nil, err
   206  	}
   207  	defer resp.Body.Close()
   208  	rawSnapshot, err := ioutil.ReadAll(resp.Body)
   209  	if err != nil {
   210  		return nil, err
   211  	}
   212  	ret := &snapshotTypes.Snapshot{}
   213  	err = json.Unmarshal(rawSnapshot, ret)
   214  
   215  	return ret, err
   216  }
   217  
   218  func parseAmbassadorAdminHost(rawurl string) (string, error) {
   219  	url, err := url.Parse(rawurl)
   220  	if err != nil {
   221  		return "", err
   222  	}
   223  	return url.Hostname(), nil
   224  
   225  }
   226  
   227  func getAPIKeyValue(configValue string, configHadValue bool) string {
   228  	if configHadValue {
   229  		return configValue
   230  	}
   231  	return ""
   232  }
   233  
   234  // Handle change to the ambassadorAPIKey that we auth to the agent with
   235  // in order of importance: secret > configmap > environment variable
   236  // so if a secret exists, read from that. then, check if a config map exists, and read the value
   237  // from that. If neither a secret or a configmap exists, use the value from the environment that we
   238  // stored on startup.
   239  func (a *Agent) handleAPIKeyConfigChange(ctx context.Context, secrets []kates.Secret, configMaps []kates.ConfigMap) {
   240  	// reset the connection so we use a new api key (or break the connection if the api key was
   241  	// unset). The agent will reset the connection the next time it tries to send a report
   242  	resetComm := func(newKey string, oldKey string, a *Agent) {
   243  		if newKey != oldKey {
   244  			a.ClearComm()
   245  		}
   246  	}
   247  	prevKey := a.ambassadorAPIKey
   248  	// first, check if we have a secret, since we want that value to take if we
   249  	// can get it.
   250  	// there _should_ only be one secret here, but we're going to loop and check that the object
   251  	// meta matches what we expect
   252  	for _, secret := range secrets {
   253  		if secret.GetName() == a.agentCloudResourceConfigName && secret.GetNamespace() == a.agentNamespace {
   254  			connTokenBytes, ok := secret.Data[cloudConnectTokenKey]
   255  			connToken := string(connTokenBytes)
   256  			dlog.Infof(ctx, "Setting cloud connect token from secret")
   257  			a.ambassadorAPIKey = getAPIKeyValue(connToken, ok)
   258  			resetComm(a.ambassadorAPIKey, prevKey, a)
   259  			return
   260  		}
   261  	}
   262  	// then, if we don't have a secret, we check for a config map
   263  	// there _should_ only be one config here, but we're going to loop and check that the object
   264  	// meta matches what we expect
   265  	for _, cm := range configMaps {
   266  		if cm.GetName() == a.agentCloudResourceConfigName && cm.GetNamespace() == a.agentNamespace {
   267  			connTokenBytes, ok := cm.Data[cloudConnectTokenKey]
   268  			connToken := string(connTokenBytes)
   269  			dlog.Infof(ctx, "Setting cloud connect token from configmap")
   270  			a.ambassadorAPIKey = getAPIKeyValue(connToken, ok)
   271  			resetComm(a.ambassadorAPIKey, prevKey, a)
   272  			return
   273  		}
   274  	}
   275  	// so if we got here, we know something changed, but a config map
   276  	// nor a secret exist, which means they never existed or they got
   277  	// deleted. in this case, we fall back to the env var (which is
   278  	// likely empty, so in that case, that is basically equivelant to
   279  	// turning the agent "off")
   280  	dlog.Infof(ctx, "Setting cloud connect token from environment")
   281  	a.ambassadorAPIKeyMutex.Lock()
   282  	defer a.ambassadorAPIKeyMutex.Unlock()
   283  	a.ambassadorAPIKey = a.ambassadorAPIKeyEnvVarValue
   284  	resetComm(a.ambassadorAPIKey, prevKey, a)
   285  }
   286  
   287  // Watch is the work performed by the main goroutine for the Agent. It processes
   288  // Watt/Diag snapshots, reports to the Director, and executes directives from
   289  // the Director.
   290  func (a *Agent) Watch(ctx context.Context, snapshotURL string) error {
   291  	client, err := kates.NewClient(kates.ClientConfig{})
   292  	if err != nil {
   293  		return err
   294  	}
   295  	dlog.Info(ctx, "Agent is running...")
   296  	agentCMQuery := kates.Query{
   297  		Namespace:     a.agentNamespace,
   298  		Name:          "ConfigMaps",
   299  		Kind:          "configmaps.",
   300  		FieldSelector: fmt.Sprintf("metadata.name=%s", a.agentCloudResourceConfigName),
   301  	}
   302  	agentSecretQuery := kates.Query{
   303  		Namespace:     a.agentNamespace,
   304  		Name:          "Secrets",
   305  		Kind:          "secrets.",
   306  		FieldSelector: fmt.Sprintf("metadata.name=%s", a.agentCloudResourceConfigName),
   307  	}
   308  	configAcc, err := client.Watch(ctx, agentCMQuery, agentSecretQuery)
   309  	if err != nil {
   310  		return err
   311  	}
   312  	if err := a.waitForAPIKey(ctx, configAcc); err != nil {
   313  		dlog.Errorf(ctx, "Error waiting for api key: %+v", err)
   314  		return err
   315  	}
   316  
   317  	podQuery := kates.Query{
   318  		Name:          "Pods",
   319  		Kind:          "pods.",
   320  		FieldSelector: a.agentWatchFieldSelector,
   321  	}
   322  	cmQuery := kates.Query{
   323  		Name:          "ConfigMaps",
   324  		Kind:          "configmaps.",
   325  		FieldSelector: a.agentWatchFieldSelector,
   326  	}
   327  	deployQuery := kates.Query{
   328  		Name:          "Deployments",
   329  		Kind:          "deployments.",
   330  		FieldSelector: a.agentWatchFieldSelector,
   331  	}
   332  	endpointQuery := kates.Query{
   333  		Name:          "Endpoints",
   334  		Kind:          "endpoints.",
   335  		FieldSelector: a.agentWatchFieldSelector,
   336  	}
   337  
   338  	// If the user didn't setup RBAC to allow the agent to get pods, the watch will just return
   339  	// no pods, log that it didn't have permission to get pods, and carry along.
   340  	coreAcc, err := client.Watch(ctx, podQuery, cmQuery, deployQuery, endpointQuery)
   341  	if err != nil {
   342  		return err
   343  	}
   344  
   345  	ns := kates.NamespaceAll
   346  	dc := NewDynamicClient(client.DynamicInterface(), NewK8sInformer)
   347  	rolloutGvr, _ := schema.ParseResourceArg("rollouts.v1alpha1.argoproj.io")
   348  	rolloutCallback := dc.WatchGeneric(ctx, ns, rolloutGvr)
   349  
   350  	applicationGvr, _ := schema.ParseResourceArg("applications.v1alpha1.argoproj.io")
   351  	applicationCallback := dc.WatchGeneric(ctx, ns, applicationGvr)
   352  
   353  	return a.watch(ctx, snapshotURL, configAcc, coreAcc, rolloutCallback, applicationCallback)
   354  }
   355  
   356  type accumulator interface {
   357  	Changed() <-chan struct{}
   358  	FilteredUpdate(ctx context.Context, target interface{}, deltas *[]*kates.Delta, predicate func(*kates.Unstructured) bool) (bool, error)
   359  }
   360  
   361  func (a *Agent) waitForAPIKey(ctx context.Context, configAccumulator accumulator) error {
   362  	isValid := func(un *kates.Unstructured) bool {
   363  		return true
   364  	}
   365  	configSnapshot := struct {
   366  		Secrets    []kates.Secret
   367  		ConfigMaps []kates.ConfigMap
   368  	}{}
   369  	// wait until the user installs an api key
   370  	for a.ambassadorAPIKey == "" {
   371  		select {
   372  		case <-ctx.Done():
   373  			return nil
   374  		case <-configAccumulator.Changed():
   375  			updated, err := configAccumulator.FilteredUpdate(ctx, &configSnapshot, &[]*kates.Delta{}, isValid)
   376  			if err != nil {
   377  				return err
   378  			}
   379  			if !updated {
   380  				continue
   381  			}
   382  			a.handleAPIKeyConfigChange(ctx, configSnapshot.Secrets, configSnapshot.ConfigMaps)
   383  		case <-time.After(1 * time.Minute):
   384  			dlog.Debugf(ctx, "Still waiting for api key")
   385  		}
   386  	}
   387  	return nil
   388  }
   389  
   390  func (a *Agent) watch(ctx context.Context, snapshotURL string, configAccumulator accumulator, coreAccumulator accumulator, rolloutCallback <-chan *GenericCallback, applicationCallback <-chan *GenericCallback) error {
   391  	var err error
   392  	// for the watch
   393  	// we're not watching CRDs or anything special, so i'm pretty sure it's okay just to say all
   394  	// the pods are valid
   395  	isValid := func(un *kates.Unstructured) bool {
   396  		return true
   397  	}
   398  	ambHost, err := parseAmbassadorAdminHost(snapshotURL)
   399  	if err != nil {
   400  		// if we can't parse the host out of the url we won't be able to talk to ambassador
   401  		// anyway
   402  		return err
   403  	}
   404  
   405  	a.apiDocsStore = NewAPIDocsStore()
   406  	applicationStore := NewApplicationStore()
   407  	rolloutStore := NewRolloutStore()
   408  	coreSnapshot := CoreSnapshot{}
   409  	configSnapshot := struct {
   410  		Secrets    []kates.Secret
   411  		ConfigMaps []kates.ConfigMap
   412  	}{}
   413  	dlog.Info(ctx, "Beginning to watch and report resources to ambassador cloud")
   414  	for {
   415  		// Wait for an event
   416  		select {
   417  		case <-ctx.Done():
   418  			return nil
   419  			// just hardcode it so we wake every 1 second and check if we're ready to report
   420  			// intentionally not waiting for agent.minReportPeriod seconds because then we may
   421  			// never report if a bunch of directives keep coming in or pods change a
   422  			// bunch
   423  		case <-time.After(1 * time.Second):
   424  			// just a ticker, this will fallthru to the snapshot getting thing
   425  		case <-configAccumulator.Changed():
   426  			updated, err := configAccumulator.FilteredUpdate(ctx, &configSnapshot, &[]*kates.Delta{}, isValid)
   427  			if err != nil {
   428  				return err
   429  			}
   430  			if !updated {
   431  				continue
   432  			}
   433  			a.handleAPIKeyConfigChange(ctx, configSnapshot.Secrets, configSnapshot.ConfigMaps)
   434  		case <-coreAccumulator.Changed():
   435  			updated, err := coreAccumulator.FilteredUpdate(ctx, &coreSnapshot, &[]*kates.Delta{}, isValid)
   436  			if err != nil {
   437  				return err
   438  			}
   439  			if !updated {
   440  				continue
   441  			}
   442  			a.coreStore = NewCoreStore(&coreSnapshot)
   443  		case callback, ok := <-rolloutCallback:
   444  			if ok {
   445  				dlog.Debugf(ctx, "argo rollout callback: %v", callback.EventType)
   446  				a.rolloutStore, err = rolloutStore.FromCallback(callback)
   447  				if err != nil {
   448  					dlog.Warnf(ctx, "Error processing rollout callback: %s", err)
   449  				}
   450  			}
   451  		case callback, ok := <-applicationCallback:
   452  			if ok {
   453  				dlog.Debugf(ctx, "argo application callback: %v", callback.EventType)
   454  				a.applicationStore, err = applicationStore.FromCallback(callback)
   455  				if err != nil {
   456  					dlog.Warnf(ctx, "Error processing application callback: %s", err)
   457  				}
   458  			}
   459  		case directive := <-a.newDirective:
   460  			a.directiveHandler.HandleDirective(ctx, a, directive)
   461  		}
   462  		// only ask ambassador for a snapshot if we're actually going to report it.
   463  		// if reportRunning is true, that means we're still in the quiet period
   464  		// after sending a report.
   465  		if !a.reportingStopped && !a.reportRunning.Value() {
   466  			snapshot, err := getAmbSnapshotInfo(snapshotURL)
   467  			if err != nil {
   468  				dlog.Warnf(ctx, "Error getting snapshot from ambassador %+v", err)
   469  			}
   470  			dlog.Debug(ctx, "Received snapshot in agent")
   471  			if err = a.ProcessSnapshot(ctx, snapshot, ambHost); err != nil {
   472  				dlog.Warnf(ctx, "error processing snapshot: %+v", err)
   473  			}
   474  		}
   475  
   476  		a.MaybeReport(ctx)
   477  	}
   478  
   479  }
   480  
   481  func (a *Agent) MaybeReport(ctx context.Context) {
   482  	if a.ambassadorAPIKey == "" {
   483  		dlog.Debugf(ctx, "CLOUD_CONNECT_TOKEN not set in the environment, not reporting snapshot")
   484  		return
   485  	}
   486  	if a.reportingStopped || a.reportRunning.Value() || (a.reportToSend == nil) {
   487  		// Don't report if the Director told us to stop reporting, if we are
   488  		// already sending a report or waiting for the minimum time between
   489  		// reports, or if there is nothing new to report right now.
   490  		dlog.Debugf(ctx, "Not reporting snapshot [reporting stopped = %t] [report running = %t] [report to send is nil = %t]", a.reportingStopped, a.reportRunning.Value(), (a.reportToSend == nil))
   491  		return
   492  	}
   493  
   494  	// It's time to send a report
   495  	if a.comm == nil {
   496  		// The communications channel to the DCP was not yet created or was
   497  		// closed above, due to a change in identity, or close elsewhere, due to
   498  		// a change in endpoint configuration.
   499  		newComm, err := NewComm(
   500  			ctx, a.connInfo, a.agentID, a.ambassadorAPIKey, a.rpcExtraHeaders)
   501  
   502  		if err != nil {
   503  			dlog.Warnf(ctx, "Failed to dial the DCP: %v", err)
   504  			dlog.Warn(ctx, "DCP functionality disabled until next retry")
   505  
   506  			return
   507  		}
   508  
   509  		a.comm = newComm
   510  		a.newDirective = a.comm.Directives()
   511  	}
   512  	a.reportRunning.Set(true) // Cleared when the report completes
   513  
   514  	// Send a report. This is an RPC, i.e. it can block, so we do this in a
   515  	// goroutine. Sleep after send so we don't need to keep track of
   516  	// whether/when it's okay to send the next report.
   517  	go func(ctx context.Context, report *agent.Snapshot, delay time.Duration) {
   518  		var err error
   519  		defer func() {
   520  			if err != nil {
   521  				dlog.Warnf(ctx, "failed to report: %+v", err)
   522  			}
   523  			dlog.Debugf(ctx, "Finished sending snapshot report, sleeping for %s", delay.String())
   524  			time.Sleep(delay)
   525  			a.reportRunning.Set(false)
   526  			// make the write non blocking
   527  			select {
   528  			case a.reportComplete <- err:
   529  				// cool we sent something
   530  			default:
   531  				// do nothing if nobody is listening
   532  			}
   533  		}()
   534  		a.ambassadorAPIKeyMutex.Lock()
   535  		apikey := a.ambassadorAPIKey
   536  		a.ambassadorAPIKeyMutex.Unlock()
   537  		err = a.comm.Report(ctx, report, apikey)
   538  
   539  	}(ctx, a.reportToSend, a.minReportPeriod)
   540  
   541  	// Update state variables
   542  	a.reportToSend = nil // Set when a snapshot yields a fresh report
   543  }
   544  
   545  // ProcessSnapshot turns a Watt/Diag Snapshot into a report that the agent can
   546  // send to the Director. If the new report is semantically different from the
   547  // prior one sent, then the Agent's state is updated to indicate that reporting
   548  // should occur once again.
   549  func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Snapshot, ambHost string) error {
   550  	if snapshot == nil || snapshot.AmbassadorMeta == nil {
   551  		dlog.Warn(ctx, "No metadata discovered for snapshot, not reporting.")
   552  		return nil
   553  	}
   554  
   555  	agentID := GetIdentity(snapshot.AmbassadorMeta, ambHost)
   556  	if agentID == nil {
   557  		dlog.Warnf(ctx, "Could not parse identity info out of snapshot, not sending snapshot")
   558  		return nil
   559  	}
   560  	a.agentID = agentID
   561  
   562  	newConnInfo, err := connInfoFromAddress(a.connAddress)
   563  	if err != nil {
   564  		// The user has attempted to turn on the Agent (otherwise GetIdentity
   565  		// would have returned nil), but there's a problem with the connection
   566  		// configuration. Rather than processing the entire snapshot and then
   567  		// failing to send the resulting report, let's just fail now. The user
   568  		// will see the error in the logs and correct the configuration.
   569  		return err
   570  	}
   571  
   572  	if a.connInfo == nil || *newConnInfo != *a.connInfo {
   573  		// The configuration for the Director endpoint has changed: either this
   574  		// is the first snapshot or the user changed the value.
   575  		//
   576  		// Close any existing communications channel so that we can create
   577  		// a new one with the new endpoint.
   578  		a.ClearComm()
   579  
   580  		// Save the new endpoint information.
   581  		a.connInfo = newConnInfo
   582  	}
   583  
   584  	if snapshot.Kubernetes != nil {
   585  		if a.coreStore != nil {
   586  			if a.coreStore.podStore != nil {
   587  				snapshot.Kubernetes.Pods = a.coreStore.podStore.StateOfWorld()
   588  				dlog.Debugf(ctx, "Found %d pods", len(snapshot.Kubernetes.Pods))
   589  			}
   590  			if a.coreStore.configMapStore != nil {
   591  				snapshot.Kubernetes.ConfigMaps = a.coreStore.configMapStore.StateOfWorld()
   592  				dlog.Debugf(ctx, "Found %d configMaps", len(snapshot.Kubernetes.ConfigMaps))
   593  			}
   594  			if a.coreStore.deploymentStore != nil {
   595  				snapshot.Kubernetes.Deployments = a.coreStore.deploymentStore.StateOfWorld()
   596  				dlog.Debugf(ctx, "Found %d Deployments", len(snapshot.Kubernetes.Deployments))
   597  			}
   598  			if a.coreStore.endpointStore != nil {
   599  				snapshot.Kubernetes.Endpoints = a.coreStore.endpointStore.StateOfWorld()
   600  				dlog.Debugf(ctx, "Found %d Endpoints", len(snapshot.Kubernetes.Endpoints))
   601  			}
   602  		}
   603  		if a.rolloutStore != nil {
   604  			snapshot.Kubernetes.ArgoRollouts = a.rolloutStore.StateOfWorld()
   605  			dlog.Debugf(ctx, "Found %d argo rollouts", len(snapshot.Kubernetes.ArgoRollouts))
   606  		}
   607  		if a.applicationStore != nil {
   608  			snapshot.Kubernetes.ArgoApplications = a.applicationStore.StateOfWorld()
   609  			dlog.Debugf(ctx, "Found %d argo applications", len(snapshot.Kubernetes.ArgoApplications))
   610  		}
   611  		if a.apiDocsStore != nil {
   612  			a.apiDocsStore.ProcessSnapshot(ctx, snapshot)
   613  			snapshot.APIDocs = a.apiDocsStore.StateOfWorld()
   614  			dlog.Debugf(ctx, "Found %d api docs", len(snapshot.APIDocs))
   615  		}
   616  	}
   617  
   618  	if err = snapshot.Sanitize(); err != nil {
   619  		return err
   620  	}
   621  	rawJsonSnapshot, err := json.Marshal(snapshot)
   622  	if err != nil {
   623  		return err
   624  	}
   625  
   626  	report := &agent.Snapshot{
   627  		Identity:    agentID,
   628  		RawSnapshot: rawJsonSnapshot,
   629  		ContentType: snapshotTypes.ContentTypeJSON,
   630  		ApiVersion:  snapshotTypes.ApiVersion,
   631  		SnapshotTs:  timestamppb.Now(),
   632  	}
   633  
   634  	a.reportToSend = report
   635  
   636  	return nil
   637  }
   638  
   639  var allowedMetricsSuffixes = []string{"upstream_rq_total", "upstream_rq_time", "upstream_rq_5xx"}
   640  
   641  // MetricsRelayHandler is invoked as a callback when the agent receive metrics from Envoy (sink).
   642  func (a *Agent) MetricsRelayHandler(
   643  	ctx context.Context,
   644  	in *envoyMetrics.StreamMetricsMessage,
   645  ) {
   646  	metrics := in.GetEnvoyMetrics()
   647  
   648  	if a.comm != nil && !a.reportingStopped {
   649  		p, ok := peer.FromContext(ctx)
   650  
   651  		if !ok {
   652  			dlog.Warnf(ctx, "peer not found in context")
   653  			return
   654  		}
   655  
   656  		a.ambassadorAPIKeyMutex.Lock()
   657  		apikey := a.ambassadorAPIKey
   658  		a.ambassadorAPIKeyMutex.Unlock()
   659  
   660  		newMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics))
   661  
   662  		for _, metricFamily := range metrics {
   663  			for _, suffix := range allowedMetricsSuffixes {
   664  				if strings.HasSuffix(metricFamily.GetName(), suffix) {
   665  					newMetrics = append(newMetrics, metricFamily)
   666  					break
   667  				}
   668  			}
   669  		}
   670  
   671  		instanceID := p.Addr.String()
   672  
   673  		a.metricsRelayMutex.Lock()
   674  		defer a.metricsRelayMutex.Unlock()
   675  		// Collect metrics until next report.
   676  		if time.Now().Before(a.metricsBackoffUntil) {
   677  			dlog.Infof(ctx, "Append %d metric(s) to stack from %s",
   678  				len(newMetrics), instanceID,
   679  			)
   680  			a.aggregatedMetrics[instanceID] = newMetrics
   681  			return
   682  		}
   683  
   684  		// Otherwise, we reached a new batch of metric, send everything.
   685  		outMessage := &agent.StreamMetricsMessage{
   686  			Identity:     a.agentID,
   687  			EnvoyMetrics: []*io_prometheus_client.MetricFamily{},
   688  		}
   689  
   690  		for key, instanceMetrics := range a.aggregatedMetrics {
   691  			outMessage.EnvoyMetrics = append(outMessage.EnvoyMetrics, instanceMetrics...)
   692  			delete(a.aggregatedMetrics, key)
   693  		}
   694  
   695  		if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 {
   696  
   697  			dlog.Infof(ctx, "Relaying %d metric(s)", relayedMetricCount)
   698  
   699  			if err := a.comm.StreamMetrics(ctx, outMessage, apikey); err != nil {
   700  				dlog.Errorf(ctx, "error streaming metric(s): %+v", err)
   701  			}
   702  		}
   703  
   704  		// Configure next push.
   705  		a.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod)
   706  
   707  		dlog.Infof(ctx, "Next metrics relay scheduled for %s",
   708  			a.metricsBackoffUntil.UTC().String())
   709  
   710  	}
   711  }
   712  
   713  // ClearComm ends the current connection to the Director, if it exists, thereby
   714  // forcing a new connection to be created when needed.
   715  func (a *Agent) ClearComm() {
   716  	if a.comm != nil {
   717  		a.comm.Close()
   718  		a.comm = nil
   719  	}
   720  }
   721  
   722  // MaxDuration returns the greater of two durations.
   723  func MaxDuration(a, b time.Duration) time.Duration {
   724  	if a > b {
   725  		return a
   726  	}
   727  	return b
   728  }
   729  

View as plain text