1 package ambex
2
3
40
41 import (
42
43 "context"
44 "encoding/json"
45 "flag"
46 "fmt"
47 "io/ioutil"
48 "net"
49 "os"
50 "os/signal"
51 "path"
52 "path/filepath"
53 "strconv"
54 "strings"
55 "syscall"
56
57
58 "github.com/fsnotify/fsnotify"
59 "google.golang.org/grpc"
60 "google.golang.org/protobuf/encoding/protojson"
61 "google.golang.org/protobuf/encoding/prototext"
62 "google.golang.org/protobuf/proto"
63 "google.golang.org/protobuf/types/known/anypb"
64
65
66 ecp_cache_types "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
67 ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
68 ecp_v3_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3"
69 ecp_log "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/log"
70 ecp_v2_server "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v2"
71 ecp_v3_server "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v3"
72
73
74
75
76 v2 "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2"
77 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/auth"
78 v2core "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/core"
79 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/accesslog/v2"
80 v2bootstrap "github.com/datawire/ambassador/v2/pkg/api/envoy/config/bootstrap/v2"
81 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/buffer/v2"
82 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/ext_authz/v2"
83 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/gzip/v2"
84 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/lua/v2"
85 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/rate_limit/v2"
86 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/rbac/v2"
87 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/router/v2"
88 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/network/http_connection_manager/v2"
89 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/network/tcp_proxy/v2"
90 v2discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v2"
91
92
93
94
95 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/accesslog/v3"
96 v3bootstrap "github.com/datawire/ambassador/v2/pkg/api/envoy/config/bootstrap/v3"
97 v3clusterconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/cluster/v3"
98 v3core "github.com/datawire/ambassador/v2/pkg/api/envoy/config/core/v3"
99 v3endpointconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/endpoint/v3"
100 v3listenerconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/listener/v3"
101 v3routeconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/route/v3"
102 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/access_loggers/file/v3"
103 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/access_loggers/grpc/v3"
104 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/compression/gzip/compressor/v3"
105 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/buffer/v3"
106 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/compressor/v3"
107 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/ext_authz/v3"
108 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/grpc_stats/v3"
109 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/gzip/v3"
110 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/lua/v3"
111 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/ratelimit/v3"
112 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/rbac/v3"
113 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/response_map/v3"
114 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/router/v3"
115 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/network/http_connection_manager/v3"
116 _ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/network/tcp_proxy/v3"
117 v3cluster "github.com/datawire/ambassador/v2/pkg/api/envoy/service/cluster/v3"
118 v3discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v3"
119 v3endpoint "github.com/datawire/ambassador/v2/pkg/api/envoy/service/endpoint/v3"
120 v3listener "github.com/datawire/ambassador/v2/pkg/api/envoy/service/listener/v3"
121 v3route "github.com/datawire/ambassador/v2/pkg/api/envoy/service/route/v3"
122 v3runtime "github.com/datawire/ambassador/v2/pkg/api/envoy/service/runtime/v3"
123
124
125 "github.com/datawire/dlib/dgroup"
126 "github.com/datawire/dlib/dhttp"
127 "github.com/datawire/dlib/dlog"
128 )
129
130 type Args struct {
131 watch bool
132
133 adsNetwork string
134 adsAddress string
135
136 dirs []string
137
138 snapdirPath string
139 numsnaps int
140
141
142
143 edsBypass bool
144 }
145
146 func parseArgs(ctx context.Context, rawArgs ...string) (*Args, error) {
147 var args Args
148 flagset := flag.NewFlagSet("ambex", flag.ContinueOnError)
149
150 flagset.BoolVar(&args.watch, "watch", false, "Watch for file changes")
151
152
153 flagset.StringVar(&args.adsNetwork, "ads-listen-network", "tcp", "network for ADS to listen on")
154 flagset.StringVar(&args.adsAddress, "ads-listen-address", ":18000", "address (on --ads-listen-network) for ADS to listen on")
155
156 var legacyAdsPort uint
157 flagset.UintVar(&legacyAdsPort, "ads", 0, "port number for ADS to listen on--deprecated, use --ads-listen-address=:1234 instead")
158
159 if err := flagset.Parse(rawArgs); err != nil {
160 return nil, err
161 }
162
163 if legacyAdsPort != 0 {
164 args.adsAddress = fmt.Sprintf(":%v", legacyAdsPort)
165 }
166
167 args.dirs = flagset.Args()
168 if len(args.dirs) == 0 {
169 args.dirs = []string{"."}
170 }
171
172
173
174
175
176 snapdirPath := os.Getenv("AMBASSADOR_CONFIG_BASE_DIR")
177 if snapdirPath == "" {
178 snapdirPath = os.Getenv("ambassador_root")
179 }
180 if snapdirPath == "" {
181 snapdirPath = "/ambassador"
182 }
183 args.snapdirPath = path.Join(snapdirPath, "snapshots")
184
185
186
187
188 numsnapStr := os.Getenv("AMBASSADOR_AMBEX_SNAPSHOT_COUNT")
189 if numsnapStr == "" {
190 numsnapStr = "30"
191 }
192 var err error
193 args.numsnaps, err = strconv.Atoi(numsnapStr)
194 if (err != nil) || (args.numsnaps < 0) {
195 args.numsnaps = 30
196 dlog.Errorf(ctx, "Invalid AMBASSADOR_AMBEX_SNAPSHOT_COUNT: %s, using %d", numsnapStr, args.numsnaps)
197 }
198
199
200
201 edsBypass := os.Getenv("AMBASSADOR_EDS_BYPASS")
202 if v, err := strconv.ParseBool(edsBypass); err == nil && v {
203 dlog.Info(ctx, "AMBASSADOR_EDS_BYPASS has been set to true. EDS will be bypassed and endpoints will be inserted manually.")
204 args.edsBypass = v
205 }
206
207 return &args, nil
208 }
209
210
211 type HasherV2 struct {
212 }
213
214
215 func (h HasherV2) ID(node *v2core.Node) string {
216 if node == nil {
217 return "unknown"
218 }
219 return node.Id
220 }
221
222
223 type HasherV3 struct {
224 }
225
226
227 func (h HasherV3) ID(node *v3core.Node) string {
228 if node == nil {
229 return "unknown"
230 }
231 return node.Id
232 }
233
234
235
236
237
238 func runManagementServer(ctx context.Context, server ecp_v2_server.Server, serverv3 ecp_v3_server.Server, adsNetwork, adsAddress string) error {
239 grpcServer := grpc.NewServer()
240
241 lis, err := net.Listen(adsNetwork, adsAddress)
242 if err != nil {
243 return fmt.Errorf("failed to listen: %w", err)
244 }
245
246
247 v2discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
248 v2.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
249 v2.RegisterClusterDiscoveryServiceServer(grpcServer, server)
250 v2.RegisterRouteDiscoveryServiceServer(grpcServer, server)
251 v2.RegisterListenerDiscoveryServiceServer(grpcServer, server)
252
253 v3discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, serverv3)
254 v3endpoint.RegisterEndpointDiscoveryServiceServer(grpcServer, serverv3)
255 v3cluster.RegisterClusterDiscoveryServiceServer(grpcServer, serverv3)
256 v3route.RegisterRouteDiscoveryServiceServer(grpcServer, serverv3)
257 v3listener.RegisterListenerDiscoveryServiceServer(grpcServer, serverv3)
258
259 dlog.Infof(ctx, "Listening on %s:%s", adsNetwork, adsAddress)
260
261 sc := &dhttp.ServerConfig{
262 Handler: grpcServer,
263 }
264 return sc.Serve(ctx, lis)
265 }
266
267
268 var decoders = map[string](func([]byte, proto.Message) error){
269 ".json": protojson.Unmarshal,
270 ".pb": prototext.Unmarshal,
271 }
272
273 func isDecodable(name string) bool {
274 if strings.HasPrefix(name, ".") {
275 return false
276 }
277
278 ext := filepath.Ext(name)
279 _, ok := decoders[ext]
280 return ok
281 }
282
283
284
285 type Validatable interface {
286 proto.Message
287 Validate() error
288 }
289
290 func Decode(ctx context.Context, name string) (proto.Message, error) {
291 any := &anypb.Any{}
292 contents, err := ioutil.ReadFile(name)
293 if err != nil {
294 return nil, err
295 }
296
297 ext := filepath.Ext(name)
298 decoder := decoders[ext]
299 err = decoder(contents, any)
300 if err != nil {
301 return nil, err
302 }
303
304 m, err := any.UnmarshalNew()
305 if err != nil {
306 return nil, err
307 }
308
309 v := m.(Validatable)
310
311 if err := v.Validate(); err != nil {
312 return nil, err
313 }
314 dlog.Infof(ctx, "Loaded file %s", name)
315 return v, nil
316 }
317
318
319
320
321
322 type v2ExpandedSnapshot struct {
323 Endpoints ecp_v2_cache.Resources `json:"endpoints"`
324 Clusters ecp_v2_cache.Resources `json:"clusters"`
325 Routes ecp_v2_cache.Resources `json:"routes"`
326 Listeners ecp_v2_cache.Resources `json:"listeners"`
327 Runtimes ecp_v2_cache.Resources `json:"runtimes"`
328 }
329
330 func NewV2ExpandedSnapshot(v2snap *ecp_v2_cache.Snapshot) v2ExpandedSnapshot {
331 return v2ExpandedSnapshot{
332 Endpoints: v2snap.Resources[ecp_cache_types.Endpoint],
333 Clusters: v2snap.Resources[ecp_cache_types.Cluster],
334 Routes: v2snap.Resources[ecp_cache_types.Route],
335 Listeners: v2snap.Resources[ecp_cache_types.Listener],
336 Runtimes: v2snap.Resources[ecp_cache_types.Runtime],
337 }
338 }
339
340 type v3ExpandedSnapshot struct {
341 Endpoints ecp_v3_cache.Resources `json:"endpoints"`
342 Clusters ecp_v3_cache.Resources `json:"clusters"`
343 Routes ecp_v3_cache.Resources `json:"routes"`
344 Listeners ecp_v3_cache.Resources `json:"listeners"`
345 Runtimes ecp_v3_cache.Resources `json:"runtimes"`
346 }
347
348 func NewV3ExpandedSnapshot(v3snap *ecp_v3_cache.Snapshot) v3ExpandedSnapshot {
349 return v3ExpandedSnapshot{
350 Endpoints: v3snap.Resources[ecp_cache_types.Endpoint],
351 Clusters: v3snap.Resources[ecp_cache_types.Cluster],
352 Routes: v3snap.Resources[ecp_cache_types.Route],
353 Listeners: v3snap.Resources[ecp_cache_types.Listener],
354 Runtimes: v3snap.Resources[ecp_cache_types.Runtime],
355 }
356 }
357
358
359 type combinedSnapshot struct {
360 Version string `json:"version"`
361 V2 v2ExpandedSnapshot `json:"v2"`
362 V3 v3ExpandedSnapshot `json:"v3"`
363 }
364
365
366
367
368
369
370 func csDump(ctx context.Context, snapdirPath string, numsnaps int, generation int, v2snap *ecp_v2_cache.Snapshot, v3snap *ecp_v3_cache.Snapshot) {
371 if numsnaps <= 0 {
372
373 return
374 }
375
376
377 version := fmt.Sprintf("v%d", generation)
378
379
380 cs := combinedSnapshot{
381 Version: version,
382 V2: NewV2ExpandedSnapshot(v2snap),
383 V3: NewV3ExpandedSnapshot(v3snap),
384 }
385
386
387
388
389
390 bs, err := json.MarshalIndent(cs, "", " ")
391
392 if err != nil {
393 dlog.Errorf(ctx, "CSNAP: marshal failure: %s", err)
394 return
395 }
396
397 csPath := path.Join(snapdirPath, "ambex-0.json")
398
399 err = ioutil.WriteFile(csPath, bs, 0644)
400
401 if err != nil {
402 dlog.Errorf(ctx, "CSNAP: write failure: %s", err)
403 } else {
404 dlog.Infof(ctx, "Saved snapshot %s", version)
405 }
406
407
408
409 for i := numsnaps; i > 0; i-- {
410 previous := i - 1
411
412 fromPath := path.Join(snapdirPath, fmt.Sprintf("ambex-%d.json", previous))
413 toPath := path.Join(snapdirPath, fmt.Sprintf("ambex-%d.json", i))
414
415 err := os.Rename(fromPath, toPath)
416
417 if (err != nil) && !os.IsNotExist(err) {
418 dlog.Infof(ctx, "CSNAP: could not rename %s -> %s: %#v", fromPath, toPath, err)
419 }
420 }
421 }
422
423
424 func update(
425 ctx context.Context,
426 snapdirPath string,
427 numsnaps int,
428 edsBypass bool,
429 config ecp_v2_cache.SnapshotCache,
430 configv3 ecp_v3_cache.SnapshotCache,
431 generation *int,
432 dirs []string,
433 edsEndpoints map[string]*v2.ClusterLoadAssignment,
434 edsEndpointsV3 map[string]*v3endpointconfig.ClusterLoadAssignment,
435 fastpathSnapshot *FastpathSnapshot,
436 updates chan<- Update,
437 ) error {
438 clusters := []ecp_cache_types.Resource{}
439 routes := []ecp_cache_types.Resource{}
440 listeners := []ecp_cache_types.Resource{}
441 runtimes := []ecp_cache_types.Resource{}
442
443 clustersv3 := []ecp_cache_types.Resource{}
444 routesv3 := []ecp_cache_types.Resource{}
445 listenersv3 := []ecp_cache_types.Resource{}
446 runtimesv3 := []ecp_cache_types.Resource{}
447
448 var filenames []string
449
450 for _, dir := range dirs {
451 files, err := ioutil.ReadDir(dir)
452 if err != nil {
453 dlog.Warnf(ctx, "Error listing %q: %v", dir, err)
454 continue
455 }
456 for _, file := range files {
457 name := file.Name()
458 if isDecodable(name) {
459 filenames = append(filenames, filepath.Join(dir, name))
460 }
461 }
462 }
463
464 for _, name := range filenames {
465 m, e := Decode(ctx, name)
466 if e != nil {
467 dlog.Warnf(ctx, "%s: %v", name, e)
468 continue
469 }
470 var dst *[]ecp_cache_types.Resource
471 switch m.(type) {
472 case *v2.Cluster:
473 dst = &clusters
474 case *v2.RouteConfiguration:
475 dst = &routes
476 case *v2.Listener:
477 dst = &listeners
478 case *v2discovery.Runtime:
479 dst = &runtimes
480 case *v2bootstrap.Bootstrap:
481 bs := m.(*v2bootstrap.Bootstrap)
482 sr := bs.StaticResources
483 for _, lst := range sr.Listeners {
484
485
486
487
488
489
490
491 rdsListener, routeConfigs, err := ListenerToRdsListener(lst)
492 if err != nil {
493 dlog.Errorf(ctx, "Error converting listener to RDS: %+v", err)
494 listeners = append(listeners, proto.Clone(lst).(ecp_cache_types.Resource))
495 continue
496 }
497 listeners = append(listeners, rdsListener)
498 for _, rc := range routeConfigs {
499
500 routes = append(routes, rc)
501 }
502 }
503 for _, cls := range sr.Clusters {
504 clusters = append(clusters, proto.Clone(cls).(ecp_cache_types.Resource))
505 }
506 continue
507 case *v3clusterconfig.Cluster:
508 dst = &clustersv3
509 case *v3routeconfig.RouteConfiguration:
510 dst = &routesv3
511 case *v3listenerconfig.Listener:
512 dst = &listenersv3
513 case *v3runtime.Runtime:
514 dst = &runtimesv3
515 case *v3bootstrap.Bootstrap:
516 bs := m.(*v3bootstrap.Bootstrap)
517 sr := bs.StaticResources
518 for _, lst := range sr.Listeners {
519
520
521
522
523
524
525
526 rdsListener, routeConfigs, err := V3ListenerToRdsListener(lst)
527 if err != nil {
528 dlog.Errorf(ctx, "Error converting listener to RDS: %+v", err)
529 listenersv3 = append(listenersv3, proto.Clone(lst).(ecp_cache_types.Resource))
530 continue
531 }
532 listenersv3 = append(listenersv3, rdsListener)
533 for _, rc := range routeConfigs {
534
535 routesv3 = append(routesv3, rc)
536 }
537 }
538 for _, cls := range sr.Clusters {
539 clustersv3 = append(clustersv3, proto.Clone(cls).(ecp_cache_types.Resource))
540 }
541 continue
542 default:
543 dlog.Warnf(ctx, "Unrecognized resource %s: %v", name, e)
544 continue
545 }
546 *dst = append(*dst, m.(ecp_cache_types.Resource))
547 }
548
549 if fastpathSnapshot != nil && fastpathSnapshot.Snapshot != nil {
550 for _, lst := range fastpathSnapshot.Snapshot.Resources[ecp_cache_types.Listener].Items {
551 listeners = append(listeners, lst.Resource)
552 }
553 for _, route := range fastpathSnapshot.Snapshot.Resources[ecp_cache_types.Route].Items {
554 routes = append(routes, route.Resource)
555 }
556 for _, clu := range fastpathSnapshot.Snapshot.Resources[ecp_cache_types.Cluster].Items {
557 clusters = append(clusters, clu.Resource)
558 }
559
560 }
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585 endpoints := JoinEdsClusters(ctx, clusters, edsEndpoints, edsBypass)
586 endpointsv3 := JoinEdsClustersV3(ctx, clustersv3, edsEndpointsV3, edsBypass)
587
588
589 curgen := *generation
590 *generation++
591
592 version := fmt.Sprintf("v%d", curgen)
593 snapshot := ecp_v2_cache.NewSnapshot(
594 version,
595 endpoints,
596 clusters,
597 routes,
598 listeners,
599 runtimes,
600 nil,
601 )
602
603 if err := snapshot.Consistent(); err != nil {
604 bs, _ := json.Marshal(snapshot)
605 dlog.Errorf(ctx, "V2 Snapshot inconsistency: %v: %s", err, bs)
606 return nil
607 }
608
609 snapshotv3 := ecp_v3_cache.NewSnapshot(
610 version,
611 endpointsv3,
612 clustersv3,
613 routesv3,
614 listenersv3,
615 runtimesv3,
616 nil,
617 )
618
619 if err := snapshotv3.Consistent(); err != nil {
620 bs, _ := json.Marshal(snapshotv3)
621 dlog.Errorf(ctx, "V3 Snapshot inconsistency: %v: %s", err, bs)
622 return nil
623 }
624
625
626
627
628
629 dlog.Debugf(ctx, "Created snapshot %s", version)
630 csDump(ctx, snapdirPath, numsnaps, curgen, &snapshot, &snapshotv3)
631
632 update := Update{version, func() error {
633 dlog.Debugf(ctx, "Accepting snapshot %s", version)
634
635 err := config.SetSnapshot("test-id", snapshot)
636 if err != nil {
637 return fmt.Errorf("V2 Snapshot error %q for %+v", err, snapshot)
638 }
639
640 err = configv3.SetSnapshot("test-id", snapshotv3)
641 if err != nil {
642 return fmt.Errorf("V3 Snapshot error %q for %+v", err, snapshotv3)
643 }
644
645 return nil
646 }}
647
648
649
650
651 select {
652 case updates <- update:
653 case <-ctx.Done():
654 }
655 return nil
656 }
657
658 type logAdapterBase struct {
659 prefix string
660 }
661
662 type logAdapterV2 struct {
663 logAdapterBase
664 }
665
666 var _ ecp_v2_server.Callbacks = logAdapterV2{}
667 var _ ecp_log.Logger = logAdapterV2{}
668
669 type logAdapterV3 struct {
670 logAdapterBase
671 }
672
673 var _ ecp_v3_server.Callbacks = logAdapterV3{}
674 var _ ecp_log.Logger = logAdapterV3{}
675
676
677 func (l logAdapterBase) Debugf(format string, args ...interface{}) {
678 dlog.Debugf(context.TODO(), format, args...)
679 }
680
681
682 func (l logAdapterBase) Infof(format string, args ...interface{}) {
683 dlog.Infof(context.TODO(), format, args...)
684 }
685
686
687 func (l logAdapterBase) Warnf(format string, args ...interface{}) {
688 dlog.Warnf(context.TODO(), format, args...)
689 }
690
691
692 func (l logAdapterBase) Errorf(format string, args ...interface{}) {
693 dlog.Errorf(context.TODO(), format, args...)
694 }
695
696
697 func (l logAdapterBase) OnStreamOpen(ctx context.Context, sid int64, stype string) error {
698 dlog.Debugf(ctx, "%v Stream open[%v]: %v", l.prefix, sid, stype)
699 return nil
700 }
701
702
703 func (l logAdapterBase) OnStreamClosed(sid int64) {
704 dlog.Debugf(context.TODO(), "%v Stream closed[%v]", l.prefix, sid)
705 }
706
707
708 func (l logAdapterV2) OnStreamRequest(sid int64, req *v2.DiscoveryRequest) error {
709 dlog.Debugf(context.TODO(), "V2 Stream request[%v] for type %s: requesting %d resources", sid, req.TypeUrl, len(req.ResourceNames))
710 dlog.Debugf(context.TODO(), "V2 Stream request[%v] dump: %v", sid, req)
711 return nil
712 }
713
714
715 func (l logAdapterV3) OnStreamRequest(sid int64, req *v3discovery.DiscoveryRequest) error {
716 dlog.Debugf(context.TODO(), "V3 Stream request[%v] for type %s: requesting %d resources", sid, req.TypeUrl, len(req.ResourceNames))
717 dlog.Debugf(context.TODO(), "V3 Stream request[%v] dump: %v", sid, req)
718 return nil
719 }
720
721
722 func (l logAdapterV2) OnStreamResponse(sid int64, req *v2.DiscoveryRequest, res *v2.DiscoveryResponse) {
723 dlog.Debugf(context.TODO(), "V2 Stream response[%v] for type %s: returning %d resources", sid, res.TypeUrl, len(res.Resources))
724 dlog.Debugf(context.TODO(), "V2 Stream dump response[%v]: %v -> %v", sid, req, res)
725 }
726
727
728 func (l logAdapterV3) OnStreamResponse(sid int64, req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
729 dlog.Debugf(context.TODO(), "V3 Stream response[%v] for type %s: returning %d resources", sid, res.TypeUrl, len(res.Resources))
730 dlog.Debugf(context.TODO(), "V3 Stream dump response[%v]: %v -> %v", sid, req, res)
731 }
732
733
734 func (l logAdapterV2) OnFetchRequest(ctx context.Context, r *v2.DiscoveryRequest) error {
735 dlog.Debugf(ctx, "V2 Fetch request: %v", r)
736 return nil
737 }
738
739
740 func (l logAdapterV3) OnFetchRequest(ctx context.Context, r *v3discovery.DiscoveryRequest) error {
741 dlog.Debugf(ctx, "V3 Fetch request: %v", r)
742 return nil
743 }
744
745
746 func (l logAdapterV2) OnFetchResponse(req *v2.DiscoveryRequest, res *v2.DiscoveryResponse) {
747 dlog.Debugf(context.TODO(), "V2 Fetch response: %v -> %v", req, res)
748 }
749
750
751 func (l logAdapterV3) OnFetchResponse(req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
752 dlog.Debugf(context.TODO(), "V3 Fetch response: %v -> %v", req, res)
753 }
754
755 func Main(
756 ctx context.Context,
757 Version string,
758 getUsage MemoryGetter,
759 fastpathCh <-chan *FastpathSnapshot,
760 rawArgs ...string,
761 ) error {
762 args, err := parseArgs(ctx, rawArgs...)
763 if err != nil {
764 return err
765 }
766
767 dlog.Infof(ctx, "Ambex %s starting, snapdirPath %s", Version, args.snapdirPath)
768
769 watcher, err := fsnotify.NewWatcher()
770 if err != nil {
771 return err
772 }
773 defer watcher.Close()
774
775 if args.watch {
776 for _, d := range args.dirs {
777 if err := watcher.Add(d); err != nil {
778 return err
779 }
780 }
781 }
782
783
784
785
786
787
788 sigCh := make(chan os.Signal, 100)
789 signal.Notify(sigCh, syscall.SIGHUP)
790 defer func() { signal.Stop(sigCh) }()
791
792 ctx, cancel := context.WithCancel(ctx)
793 defer cancel()
794
795 config := ecp_v2_cache.NewSnapshotCache(true, HasherV2{}, logAdapterV2{logAdapterBase{"V2"}})
796 configv3 := ecp_v3_cache.NewSnapshotCache(true, HasherV3{}, logAdapterV3{logAdapterBase{"V3"}})
797 server := ecp_v2_server.NewServer(ctx, config, logAdapterV2{logAdapterBase{"V2"}})
798 serverv3 := ecp_v3_server.NewServer(ctx, configv3, logAdapterV3{logAdapterBase{"V3"}})
799
800 grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
801
802 grp.Go("management-server", func(ctx context.Context) error {
803 return runManagementServer(ctx, server, serverv3, args.adsNetwork, args.adsAddress)
804 })
805
806 pid := os.Getpid()
807 file := "ambex.pid"
808 if err := ioutil.WriteFile(file, []byte(fmt.Sprintf("%v", pid)), 0644); err != nil {
809 dlog.Warn(ctx, err)
810 } else {
811 ctx := dlog.WithField(ctx, "pid", pid)
812 ctx = dlog.WithField(ctx, "file", file)
813 dlog.Info(ctx, "Wrote PID")
814 }
815
816 updates := make(chan Update)
817 grp.Go("updater", func(ctx context.Context) error {
818 return Updater(ctx, updates, getUsage)
819 })
820 grp.Go("main-loop", func(ctx context.Context) error {
821 generation := 0
822 var fastpathSnapshot *FastpathSnapshot
823 edsEndpoints := map[string]*v2.ClusterLoadAssignment{}
824 edsEndpointsV3 := map[string]*v3endpointconfig.ClusterLoadAssignment{}
825
826
827
828
829
830 err = update(
831 ctx,
832 args.snapdirPath,
833 args.numsnaps,
834 args.edsBypass,
835 config,
836 configv3,
837 &generation,
838 args.dirs,
839 edsEndpoints,
840 edsEndpointsV3,
841 fastpathSnapshot,
842 updates,
843 )
844 if err != nil {
845 return err
846 }
847
848
849 for {
850
851 select {
852 case _ = <-sigCh:
853 err := update(
854 ctx,
855 args.snapdirPath,
856 args.numsnaps,
857 args.edsBypass,
858 config,
859 configv3,
860 &generation,
861 args.dirs,
862 edsEndpoints,
863 edsEndpointsV3,
864 fastpathSnapshot,
865 updates,
866 )
867 if err != nil {
868 return err
869 }
870 case fpSnap := <-fastpathCh:
871
872 if fpSnap.Endpoints != nil {
873 edsEndpoints = fpSnap.Endpoints.ToMap_v2()
874 edsEndpointsV3 = fpSnap.Endpoints.ToMap_v3()
875 }
876 fastpathSnapshot = fpSnap
877 err := update(
878 ctx,
879 args.snapdirPath,
880 args.numsnaps,
881 args.edsBypass,
882 config,
883 configv3,
884 &generation,
885 args.dirs,
886 edsEndpoints,
887 edsEndpointsV3,
888 fastpathSnapshot,
889 updates,
890 )
891 if err != nil {
892 return err
893 }
894 case <-watcher.Events:
895
896 err := update(
897 ctx,
898 args.snapdirPath,
899 args.numsnaps,
900 args.edsBypass,
901 config,
902 configv3,
903 &generation,
904 args.dirs,
905 edsEndpoints,
906 edsEndpointsV3,
907 fastpathSnapshot,
908 updates,
909 )
910 if err != nil {
911 return err
912 }
913 case err := <-watcher.Errors:
914
915 dlog.Warnf(ctx, "Watcher error: %v", err)
916 case <-ctx.Done():
917 return nil
918 }
919 }
920 })
921
922 return grp.Wait()
923 }
924
View as plain text