1 package agent
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io/ioutil"
8 "net/http"
9 "net/url"
10 "os"
11 "strings"
12 "sync"
13 "time"
14
15 envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
16 io_prometheus_client "github.com/prometheus/client_model/go"
17 "google.golang.org/grpc/peer"
18 "google.golang.org/protobuf/types/known/timestamppb"
19 "k8s.io/apimachinery/pkg/runtime/schema"
20
21 "github.com/datawire/ambassador/v2/pkg/api/agent"
22 "github.com/datawire/ambassador/v2/pkg/kates"
23 snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
24 "github.com/datawire/dlib/dlog"
25 )
26
27 const defaultMinReportPeriod = 30 * time.Second
28 const cloudConnectTokenKey = "CLOUD_CONNECT_TOKEN"
29
30 type Comm interface {
31 Close() error
32 Report(context.Context, *agent.Snapshot, string) error
33 ReportCommandResult(context.Context, *agent.CommandResult, string) error
34 Directives() <-chan *agent.Directive
35 StreamMetrics(context.Context, *agent.StreamMetricsMessage, string) error
36 }
37
38 type atomicBool struct {
39 mutex sync.Mutex
40 value bool
41 }
42
43 func (ab *atomicBool) Value() bool {
44 ab.mutex.Lock()
45 defer ab.mutex.Unlock()
46 return ab.value
47 }
48
49 func (ab *atomicBool) Set(v bool) {
50 ab.mutex.Lock()
51 defer ab.mutex.Unlock()
52 ab.value = v
53 }
54
55
56
57 type Agent struct {
58
59
60 comm Comm
61 connInfo *ConnInfo
62 agentID *agent.Identity
63 newDirective <-chan *agent.Directive
64 ambassadorAPIKeyMutex sync.Mutex
65 ambassadorAPIKey string
66 directiveHandler DirectiveHandler
67
68
69 ambassadorAPIKeyEnvVarValue string
70 connAddress string
71
72
73
74 reportingStopped bool
75 minReportPeriod time.Duration
76 lastDirectiveID string
77
78
79
80 reportToSend *agent.Snapshot
81 reportRunning atomicBool
82 reportComplete chan error
83
84
85 coreStore *coreStore
86
87
88 apiDocsStore *APIDocsStore
89
90
91 rolloutStore *RolloutStore
92
93 applicationStore *ApplicationStore
94
95
96
97
98
99 agentNamespace string
100
101
102
103
104 agentCloudResourceConfigName string
105
106
107 agentWatchFieldSelector string
108
109
110 metricsRelayMutex sync.Mutex
111
112
113 metricsBackoffUntil time.Time
114
115
116 aggregatedMetrics map[string][]*io_prometheus_client.MetricFamily
117
118
119 rpcExtraHeaders []string
120 }
121
122 func getEnvWithDefault(envVarKey string, defaultValue string) string {
123 value := os.Getenv(envVarKey)
124 if value == "" {
125 value = defaultValue
126 }
127 return value
128 }
129
130
131 func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsGetterFactory) *Agent {
132 reportPeriodFromEnv := os.Getenv("AGENT_REPORTING_PERIOD")
133 var reportPeriod time.Duration
134 if reportPeriodFromEnv != "" {
135 reportPeriod, err := time.ParseDuration(reportPeriodFromEnv)
136 if err != nil {
137 reportPeriod = defaultMinReportPeriod
138 } else {
139 reportPeriod = MaxDuration(defaultMinReportPeriod, reportPeriod)
140 }
141 } else {
142 reportPeriod = defaultMinReportPeriod
143 }
144 if directiveHandler == nil {
145 directiveHandler = &BasicDirectiveHandler{
146 DefaultMinReportPeriod: defaultMinReportPeriod,
147 rolloutsGetterFactory: rolloutsGetterFactory,
148 }
149 }
150
151 var rpcExtraHeaders = make([]string, 0)
152
153 if os.Getenv("RPC_INTERCEPT_HEADER_KEY") != "" &&
154 os.Getenv("RPC_INTERCEPT_HEADER_VALUE") != "" {
155 rpcExtraHeaders = append(
156 rpcExtraHeaders,
157 os.Getenv("RPC_INTERCEPT_HEADER_KEY"),
158 os.Getenv("RPC_INTERCEPT_HEADER_VALUE"),
159 )
160 }
161
162 return &Agent{
163 minReportPeriod: reportPeriod,
164 reportComplete: make(chan error),
165 ambassadorAPIKey: os.Getenv(cloudConnectTokenKey),
166
167
168
169 ambassadorAPIKeyEnvVarValue: os.Getenv(cloudConnectTokenKey),
170 connAddress: os.Getenv("RPC_CONNECTION_ADDRESS"),
171 agentNamespace: getEnvWithDefault("AGENT_NAMESPACE", "ambassador"),
172 agentCloudResourceConfigName: getEnvWithDefault("AGENT_CONFIG_RESOURCE_NAME", "ambassador-agent-cloud-token"),
173 directiveHandler: directiveHandler,
174 reportRunning: atomicBool{value: false},
175 agentWatchFieldSelector: getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"),
176 metricsBackoffUntil: time.Now().Add(defaultMinReportPeriod),
177 rpcExtraHeaders: rpcExtraHeaders,
178 aggregatedMetrics: map[string][]*io_prometheus_client.MetricFamily{},
179 }
180 }
181
182 func (a *Agent) StopReporting(ctx context.Context) {
183 dlog.Debugf(ctx, "stop reporting: %t -> true", a.reportingStopped)
184 a.reportingStopped = true
185 }
186
187 func (a *Agent) StartReporting(ctx context.Context) {
188 dlog.Debugf(ctx, "stop reporting: %t -> false", a.reportingStopped)
189 a.reportingStopped = false
190 }
191
192 func (a *Agent) SetMinReportPeriod(ctx context.Context, dur time.Duration) {
193 dlog.Debugf(ctx, "minimum report period %s -> %s", a.minReportPeriod, dur)
194 a.minReportPeriod = dur
195 }
196
197 func (a *Agent) SetLastDirectiveID(ctx context.Context, id string) {
198 dlog.Debugf(ctx, "setting last directive ID %s", id)
199 a.lastDirectiveID = id
200 }
201
202 func getAmbSnapshotInfo(url string) (*snapshotTypes.Snapshot, error) {
203 resp, err := http.Get(url)
204 if err != nil {
205 return nil, err
206 }
207 defer resp.Body.Close()
208 rawSnapshot, err := ioutil.ReadAll(resp.Body)
209 if err != nil {
210 return nil, err
211 }
212 ret := &snapshotTypes.Snapshot{}
213 err = json.Unmarshal(rawSnapshot, ret)
214
215 return ret, err
216 }
217
218 func parseAmbassadorAdminHost(rawurl string) (string, error) {
219 url, err := url.Parse(rawurl)
220 if err != nil {
221 return "", err
222 }
223 return url.Hostname(), nil
224
225 }
226
227 func getAPIKeyValue(configValue string, configHadValue bool) string {
228 if configHadValue {
229 return configValue
230 }
231 return ""
232 }
233
234
235
236
237
238
239 func (a *Agent) handleAPIKeyConfigChange(ctx context.Context, secrets []kates.Secret, configMaps []kates.ConfigMap) {
240
241
242 resetComm := func(newKey string, oldKey string, a *Agent) {
243 if newKey != oldKey {
244 a.ClearComm()
245 }
246 }
247 prevKey := a.ambassadorAPIKey
248
249
250
251
252 for _, secret := range secrets {
253 if secret.GetName() == a.agentCloudResourceConfigName && secret.GetNamespace() == a.agentNamespace {
254 connTokenBytes, ok := secret.Data[cloudConnectTokenKey]
255 connToken := string(connTokenBytes)
256 dlog.Infof(ctx, "Setting cloud connect token from secret")
257 a.ambassadorAPIKey = getAPIKeyValue(connToken, ok)
258 resetComm(a.ambassadorAPIKey, prevKey, a)
259 return
260 }
261 }
262
263
264
265 for _, cm := range configMaps {
266 if cm.GetName() == a.agentCloudResourceConfigName && cm.GetNamespace() == a.agentNamespace {
267 connTokenBytes, ok := cm.Data[cloudConnectTokenKey]
268 connToken := string(connTokenBytes)
269 dlog.Infof(ctx, "Setting cloud connect token from configmap")
270 a.ambassadorAPIKey = getAPIKeyValue(connToken, ok)
271 resetComm(a.ambassadorAPIKey, prevKey, a)
272 return
273 }
274 }
275
276
277
278
279
280 dlog.Infof(ctx, "Setting cloud connect token from environment")
281 a.ambassadorAPIKeyMutex.Lock()
282 defer a.ambassadorAPIKeyMutex.Unlock()
283 a.ambassadorAPIKey = a.ambassadorAPIKeyEnvVarValue
284 resetComm(a.ambassadorAPIKey, prevKey, a)
285 }
286
287
288
289
290 func (a *Agent) Watch(ctx context.Context, snapshotURL string) error {
291 client, err := kates.NewClient(kates.ClientConfig{})
292 if err != nil {
293 return err
294 }
295 dlog.Info(ctx, "Agent is running...")
296 agentCMQuery := kates.Query{
297 Namespace: a.agentNamespace,
298 Name: "ConfigMaps",
299 Kind: "configmaps.",
300 FieldSelector: fmt.Sprintf("metadata.name=%s", a.agentCloudResourceConfigName),
301 }
302 agentSecretQuery := kates.Query{
303 Namespace: a.agentNamespace,
304 Name: "Secrets",
305 Kind: "secrets.",
306 FieldSelector: fmt.Sprintf("metadata.name=%s", a.agentCloudResourceConfigName),
307 }
308 configAcc, err := client.Watch(ctx, agentCMQuery, agentSecretQuery)
309 if err != nil {
310 return err
311 }
312 if err := a.waitForAPIKey(ctx, configAcc); err != nil {
313 dlog.Errorf(ctx, "Error waiting for api key: %+v", err)
314 return err
315 }
316
317 podQuery := kates.Query{
318 Name: "Pods",
319 Kind: "pods.",
320 FieldSelector: a.agentWatchFieldSelector,
321 }
322 cmQuery := kates.Query{
323 Name: "ConfigMaps",
324 Kind: "configmaps.",
325 FieldSelector: a.agentWatchFieldSelector,
326 }
327 deployQuery := kates.Query{
328 Name: "Deployments",
329 Kind: "deployments.",
330 FieldSelector: a.agentWatchFieldSelector,
331 }
332 endpointQuery := kates.Query{
333 Name: "Endpoints",
334 Kind: "endpoints.",
335 FieldSelector: a.agentWatchFieldSelector,
336 }
337
338
339
340 coreAcc, err := client.Watch(ctx, podQuery, cmQuery, deployQuery, endpointQuery)
341 if err != nil {
342 return err
343 }
344
345 ns := kates.NamespaceAll
346 dc := NewDynamicClient(client.DynamicInterface(), NewK8sInformer)
347 rolloutGvr, _ := schema.ParseResourceArg("rollouts.v1alpha1.argoproj.io")
348 rolloutCallback := dc.WatchGeneric(ctx, ns, rolloutGvr)
349
350 applicationGvr, _ := schema.ParseResourceArg("applications.v1alpha1.argoproj.io")
351 applicationCallback := dc.WatchGeneric(ctx, ns, applicationGvr)
352
353 return a.watch(ctx, snapshotURL, configAcc, coreAcc, rolloutCallback, applicationCallback)
354 }
355
356 type accumulator interface {
357 Changed() <-chan struct{}
358 FilteredUpdate(ctx context.Context, target interface{}, deltas *[]*kates.Delta, predicate func(*kates.Unstructured) bool) (bool, error)
359 }
360
361 func (a *Agent) waitForAPIKey(ctx context.Context, configAccumulator accumulator) error {
362 isValid := func(un *kates.Unstructured) bool {
363 return true
364 }
365 configSnapshot := struct {
366 Secrets []kates.Secret
367 ConfigMaps []kates.ConfigMap
368 }{}
369
370 for a.ambassadorAPIKey == "" {
371 select {
372 case <-ctx.Done():
373 return nil
374 case <-configAccumulator.Changed():
375 updated, err := configAccumulator.FilteredUpdate(ctx, &configSnapshot, &[]*kates.Delta{}, isValid)
376 if err != nil {
377 return err
378 }
379 if !updated {
380 continue
381 }
382 a.handleAPIKeyConfigChange(ctx, configSnapshot.Secrets, configSnapshot.ConfigMaps)
383 case <-time.After(1 * time.Minute):
384 dlog.Debugf(ctx, "Still waiting for api key")
385 }
386 }
387 return nil
388 }
389
390 func (a *Agent) watch(ctx context.Context, snapshotURL string, configAccumulator accumulator, coreAccumulator accumulator, rolloutCallback <-chan *GenericCallback, applicationCallback <-chan *GenericCallback) error {
391 var err error
392
393
394
395 isValid := func(un *kates.Unstructured) bool {
396 return true
397 }
398 ambHost, err := parseAmbassadorAdminHost(snapshotURL)
399 if err != nil {
400
401
402 return err
403 }
404
405 a.apiDocsStore = NewAPIDocsStore()
406 applicationStore := NewApplicationStore()
407 rolloutStore := NewRolloutStore()
408 coreSnapshot := CoreSnapshot{}
409 configSnapshot := struct {
410 Secrets []kates.Secret
411 ConfigMaps []kates.ConfigMap
412 }{}
413 dlog.Info(ctx, "Beginning to watch and report resources to ambassador cloud")
414 for {
415
416 select {
417 case <-ctx.Done():
418 return nil
419
420
421
422
423 case <-time.After(1 * time.Second):
424
425 case <-configAccumulator.Changed():
426 updated, err := configAccumulator.FilteredUpdate(ctx, &configSnapshot, &[]*kates.Delta{}, isValid)
427 if err != nil {
428 return err
429 }
430 if !updated {
431 continue
432 }
433 a.handleAPIKeyConfigChange(ctx, configSnapshot.Secrets, configSnapshot.ConfigMaps)
434 case <-coreAccumulator.Changed():
435 updated, err := coreAccumulator.FilteredUpdate(ctx, &coreSnapshot, &[]*kates.Delta{}, isValid)
436 if err != nil {
437 return err
438 }
439 if !updated {
440 continue
441 }
442 a.coreStore = NewCoreStore(&coreSnapshot)
443 case callback, ok := <-rolloutCallback:
444 if ok {
445 dlog.Debugf(ctx, "argo rollout callback: %v", callback.EventType)
446 a.rolloutStore, err = rolloutStore.FromCallback(callback)
447 if err != nil {
448 dlog.Warnf(ctx, "Error processing rollout callback: %s", err)
449 }
450 }
451 case callback, ok := <-applicationCallback:
452 if ok {
453 dlog.Debugf(ctx, "argo application callback: %v", callback.EventType)
454 a.applicationStore, err = applicationStore.FromCallback(callback)
455 if err != nil {
456 dlog.Warnf(ctx, "Error processing application callback: %s", err)
457 }
458 }
459 case directive := <-a.newDirective:
460 a.directiveHandler.HandleDirective(ctx, a, directive)
461 }
462
463
464
465 if !a.reportingStopped && !a.reportRunning.Value() {
466 snapshot, err := getAmbSnapshotInfo(snapshotURL)
467 if err != nil {
468 dlog.Warnf(ctx, "Error getting snapshot from ambassador %+v", err)
469 }
470 dlog.Debug(ctx, "Received snapshot in agent")
471 if err = a.ProcessSnapshot(ctx, snapshot, ambHost); err != nil {
472 dlog.Warnf(ctx, "error processing snapshot: %+v", err)
473 }
474 }
475
476 a.MaybeReport(ctx)
477 }
478
479 }
480
481 func (a *Agent) MaybeReport(ctx context.Context) {
482 if a.ambassadorAPIKey == "" {
483 dlog.Debugf(ctx, "CLOUD_CONNECT_TOKEN not set in the environment, not reporting snapshot")
484 return
485 }
486 if a.reportingStopped || a.reportRunning.Value() || (a.reportToSend == nil) {
487
488
489
490 dlog.Debugf(ctx, "Not reporting snapshot [reporting stopped = %t] [report running = %t] [report to send is nil = %t]", a.reportingStopped, a.reportRunning.Value(), (a.reportToSend == nil))
491 return
492 }
493
494
495 if a.comm == nil {
496
497
498
499 newComm, err := NewComm(
500 ctx, a.connInfo, a.agentID, a.ambassadorAPIKey, a.rpcExtraHeaders)
501
502 if err != nil {
503 dlog.Warnf(ctx, "Failed to dial the DCP: %v", err)
504 dlog.Warn(ctx, "DCP functionality disabled until next retry")
505
506 return
507 }
508
509 a.comm = newComm
510 a.newDirective = a.comm.Directives()
511 }
512 a.reportRunning.Set(true)
513
514
515
516
517 go func(ctx context.Context, report *agent.Snapshot, delay time.Duration) {
518 var err error
519 defer func() {
520 if err != nil {
521 dlog.Warnf(ctx, "failed to report: %+v", err)
522 }
523 dlog.Debugf(ctx, "Finished sending snapshot report, sleeping for %s", delay.String())
524 time.Sleep(delay)
525 a.reportRunning.Set(false)
526
527 select {
528 case a.reportComplete <- err:
529
530 default:
531
532 }
533 }()
534 a.ambassadorAPIKeyMutex.Lock()
535 apikey := a.ambassadorAPIKey
536 a.ambassadorAPIKeyMutex.Unlock()
537 err = a.comm.Report(ctx, report, apikey)
538
539 }(ctx, a.reportToSend, a.minReportPeriod)
540
541
542 a.reportToSend = nil
543 }
544
545
546
547
548
549 func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Snapshot, ambHost string) error {
550 if snapshot == nil || snapshot.AmbassadorMeta == nil {
551 dlog.Warn(ctx, "No metadata discovered for snapshot, not reporting.")
552 return nil
553 }
554
555 agentID := GetIdentity(snapshot.AmbassadorMeta, ambHost)
556 if agentID == nil {
557 dlog.Warnf(ctx, "Could not parse identity info out of snapshot, not sending snapshot")
558 return nil
559 }
560 a.agentID = agentID
561
562 newConnInfo, err := connInfoFromAddress(a.connAddress)
563 if err != nil {
564
565
566
567
568
569 return err
570 }
571
572 if a.connInfo == nil || *newConnInfo != *a.connInfo {
573
574
575
576
577
578 a.ClearComm()
579
580
581 a.connInfo = newConnInfo
582 }
583
584 if snapshot.Kubernetes != nil {
585 if a.coreStore != nil {
586 if a.coreStore.podStore != nil {
587 snapshot.Kubernetes.Pods = a.coreStore.podStore.StateOfWorld()
588 dlog.Debugf(ctx, "Found %d pods", len(snapshot.Kubernetes.Pods))
589 }
590 if a.coreStore.configMapStore != nil {
591 snapshot.Kubernetes.ConfigMaps = a.coreStore.configMapStore.StateOfWorld()
592 dlog.Debugf(ctx, "Found %d configMaps", len(snapshot.Kubernetes.ConfigMaps))
593 }
594 if a.coreStore.deploymentStore != nil {
595 snapshot.Kubernetes.Deployments = a.coreStore.deploymentStore.StateOfWorld()
596 dlog.Debugf(ctx, "Found %d Deployments", len(snapshot.Kubernetes.Deployments))
597 }
598 if a.coreStore.endpointStore != nil {
599 snapshot.Kubernetes.Endpoints = a.coreStore.endpointStore.StateOfWorld()
600 dlog.Debugf(ctx, "Found %d Endpoints", len(snapshot.Kubernetes.Endpoints))
601 }
602 }
603 if a.rolloutStore != nil {
604 snapshot.Kubernetes.ArgoRollouts = a.rolloutStore.StateOfWorld()
605 dlog.Debugf(ctx, "Found %d argo rollouts", len(snapshot.Kubernetes.ArgoRollouts))
606 }
607 if a.applicationStore != nil {
608 snapshot.Kubernetes.ArgoApplications = a.applicationStore.StateOfWorld()
609 dlog.Debugf(ctx, "Found %d argo applications", len(snapshot.Kubernetes.ArgoApplications))
610 }
611 if a.apiDocsStore != nil {
612 a.apiDocsStore.ProcessSnapshot(ctx, snapshot)
613 snapshot.APIDocs = a.apiDocsStore.StateOfWorld()
614 dlog.Debugf(ctx, "Found %d api docs", len(snapshot.APIDocs))
615 }
616 }
617
618 if err = snapshot.Sanitize(); err != nil {
619 return err
620 }
621 rawJsonSnapshot, err := json.Marshal(snapshot)
622 if err != nil {
623 return err
624 }
625
626 report := &agent.Snapshot{
627 Identity: agentID,
628 RawSnapshot: rawJsonSnapshot,
629 ContentType: snapshotTypes.ContentTypeJSON,
630 ApiVersion: snapshotTypes.ApiVersion,
631 SnapshotTs: timestamppb.Now(),
632 }
633
634 a.reportToSend = report
635
636 return nil
637 }
638
639 var allowedMetricsSuffixes = []string{"upstream_rq_total", "upstream_rq_time", "upstream_rq_5xx"}
640
641
642 func (a *Agent) MetricsRelayHandler(
643 ctx context.Context,
644 in *envoyMetrics.StreamMetricsMessage,
645 ) {
646 metrics := in.GetEnvoyMetrics()
647
648 if a.comm != nil && !a.reportingStopped {
649 p, ok := peer.FromContext(ctx)
650
651 if !ok {
652 dlog.Warnf(ctx, "peer not found in context")
653 return
654 }
655
656 a.ambassadorAPIKeyMutex.Lock()
657 apikey := a.ambassadorAPIKey
658 a.ambassadorAPIKeyMutex.Unlock()
659
660 newMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics))
661
662 for _, metricFamily := range metrics {
663 for _, suffix := range allowedMetricsSuffixes {
664 if strings.HasSuffix(metricFamily.GetName(), suffix) {
665 newMetrics = append(newMetrics, metricFamily)
666 break
667 }
668 }
669 }
670
671 instanceID := p.Addr.String()
672
673 a.metricsRelayMutex.Lock()
674 defer a.metricsRelayMutex.Unlock()
675
676 if time.Now().Before(a.metricsBackoffUntil) {
677 dlog.Infof(ctx, "Append %d metric(s) to stack from %s",
678 len(newMetrics), instanceID,
679 )
680 a.aggregatedMetrics[instanceID] = newMetrics
681 return
682 }
683
684
685 outMessage := &agent.StreamMetricsMessage{
686 Identity: a.agentID,
687 EnvoyMetrics: []*io_prometheus_client.MetricFamily{},
688 }
689
690 for key, instanceMetrics := range a.aggregatedMetrics {
691 outMessage.EnvoyMetrics = append(outMessage.EnvoyMetrics, instanceMetrics...)
692 delete(a.aggregatedMetrics, key)
693 }
694
695 if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 {
696
697 dlog.Infof(ctx, "Relaying %d metric(s)", relayedMetricCount)
698
699 if err := a.comm.StreamMetrics(ctx, outMessage, apikey); err != nil {
700 dlog.Errorf(ctx, "error streaming metric(s): %+v", err)
701 }
702 }
703
704
705 a.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod)
706
707 dlog.Infof(ctx, "Next metrics relay scheduled for %s",
708 a.metricsBackoffUntil.UTC().String())
709
710 }
711 }
712
713
714
715 func (a *Agent) ClearComm() {
716 if a.comm != nil {
717 a.comm.Close()
718 a.comm = nil
719 }
720 }
721
722
723 func MaxDuration(a, b time.Duration) time.Duration {
724 if a > b {
725 return a
726 }
727 return b
728 }
729
View as plain text