package forwarder import ( "context" "flag" "fmt" "os" "strings" "github.com/peterbourgon/ff/v3" "edge-infra.dev/pkg/f8n/kinform/sql" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/lib/runtime/healthz" "edge-infra.dev/pkg/lib/runtime/manager" "edge-infra.dev/pkg/lib/runtime/subscriber" ) // Forwarder configuration // Fields are exported so that Zap will log the fields: // https://github.com/uber-go/zap/issues/1097 type config struct { ProjectID string // Subscriptions is populated after parsing flags Subscriptions []string // Repository is an optional source repository to filter messages. If not provided, // all messages are processed, since GCP publishes events for all registries // once Repository string // Bound to flag and split on , // TODO(aw185176): just implement flags.Value? subscriptions string destinationsStr string // DefaultDestinations controls how artifacts are forwarded if the information // does not exist in the message. Populated after parsing flags. DefaultDestinations []Destination // MetricsAddr is the address to bind the metrics endpoint to MetricsAddr string // HealthzAddr is the address to bind heath-check endpoints to HealthzAddr string } // Destination contains the required information for building a destination // URI for a reference. type Destination struct { // ProjectID is where the destination registry is hosted ProjectID string // Repository is the resource name for the artifact repository in the destination // project ID. Repository string } func newConfig() (*config, error) { cfg := &config{} fs := flag.NewFlagSet("palletfwder", flag.ContinueOnError) fs.StringVar(&cfg.ProjectID, "source-project-id", "", "project that contains the source registry and subscription(s)") fs.StringVar(&cfg.Repository, "source-repository", "warehouse", "optional source repository instance name to filter events on") fs.StringVar(&cfg.subscriptions, "source-sub-ids", "registry-events-subscription", "comma separated list of subscription IDs") fs.StringVar(&cfg.destinationsStr, "destinations", "", "comma separated list of default destinations to forward pallets to, in the "+ "form $PROJECT_ID/$REPOSITORY_RESOURCE_ID, e.g., ret-edge-dev1/warehouse") fs.StringVar(&cfg.MetricsAddr, "metrics-addr", ":8081", "address to bind metrics endpoint to") fs.StringVar(&cfg.HealthzAddr, "healthz-addr", ":8082", "address to bind heath-check endpoints to") if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix()); err != nil { return nil, err } cfg.Subscriptions = strings.Split(cfg.subscriptions, ",") cfg.DefaultDestinations = make([]Destination, 0) for _, dest := range strings.Split(cfg.destinationsStr, ",") { t := strings.Split(dest, "/") if len(t) != 2 { return nil, fmt.Errorf("invalid destination token %s: should be in the "+ "format project-id/repository-resource-id", dest) } cfg.DefaultDestinations = append(cfg.DefaultDestinations, Destination{ ProjectID: t[0], Repository: t[1], }) } if cfg.ProjectID == "" || len(cfg.Subscriptions) == 0 || cfg.destinationsStr == "" { return nil, fmt.Errorf("incomplete configuration, all flags are required. " + "run --help for more information") } return cfg, nil } // Run creates the palletfwder and then starts it. func Run() error { log := fog.New().WithName("registryforwarder") ctx := context.Background() cfg, err := newConfig() if err != nil { log.Error(err, "failed to load config") return err } log.Info("loaded config", "config", &cfg) mgr, err := createMgr(cfg) if err != nil { log.Error(err, "failed to create manager") return err } log.Info("starting manager") if err := mgr.Start(ctx); err != nil { log.Error(err, "forwarder errored") return err } return nil } // createMgr creates the runtime manager for the forwarder, instantiating an // a Handler per configured subscription and adding them to the created manager. func createMgr(cfg *config) (manager.Manager, error) { // log := cfg.Runtime.Logger.WithName("setup") mgr, err := manager.New(manager.Options{ MetricsBindAddress: cfg.MetricsAddr, HealthProbeBindAddress: cfg.HealthzAddr, }) if err != nil { return nil, err } // TODO(dk185217): unchecked err. ingestion is best effort for now db, _ := sql.FromEnv() f := &Fwder{ DST: cfg.DefaultDestinations, SourceRepository: cfg.Repository, SQL: db, } for _, subscription := range cfg.Subscriptions { sub, err := subscriber.New(subscriber.Options{ ProjectID: cfg.ProjectID, ID: subscription, Handler: f, }) if err != nil { return nil, err } if err := mgr.Add(sub); err != nil { return nil, err } } if err := mgr.AddLivezCheck("alive", healthz.Ping); err != nil { return nil, fmt.Errorf("failed to add check: /livez/alive. error: %v", err) } if err := mgr.AddReadyzCheck("ready", healthz.Ping); err != nil { return nil, fmt.Errorf("failed to add check: /readyz/ready. error: %v", err) } return mgr, nil }