...

Source file src/edge-infra.dev/pkg/f8n/warehouse/forwarder/forwarder.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/forwarder

     1  package forwarder
     2  
     3  import (
     4  	"context"
     5  	"flag"
     6  	"fmt"
     7  	"os"
     8  	"strings"
     9  
    10  	"github.com/peterbourgon/ff/v3"
    11  
    12  	"edge-infra.dev/pkg/f8n/kinform/sql"
    13  	"edge-infra.dev/pkg/lib/fog"
    14  	"edge-infra.dev/pkg/lib/runtime/healthz"
    15  	"edge-infra.dev/pkg/lib/runtime/manager"
    16  	"edge-infra.dev/pkg/lib/runtime/subscriber"
    17  )
    18  
    19  // Forwarder configuration
    20  // Fields are exported so that Zap will log the fields:
    21  // https://github.com/uber-go/zap/issues/1097
    22  type config struct {
    23  	ProjectID string
    24  	// Subscriptions is populated after parsing flags
    25  	Subscriptions []string
    26  	// Repository is an optional source repository to filter messages. If not provided,
    27  	// all messages are processed, since GCP publishes events for all registries
    28  	// once
    29  	Repository string
    30  
    31  	// Bound to flag and split on ,
    32  	// TODO(aw185176): just implement flags.Value?
    33  	subscriptions   string
    34  	destinationsStr string
    35  
    36  	// DefaultDestinations controls how artifacts are forwarded if the information
    37  	// does not exist in the message. Populated after parsing flags.
    38  	DefaultDestinations []Destination
    39  
    40  	// MetricsAddr is the address to bind the metrics endpoint to
    41  	MetricsAddr string
    42  
    43  	// HealthzAddr is the address to bind heath-check endpoints to
    44  	HealthzAddr string
    45  }
    46  
    47  // Destination contains the required information for building a destination
    48  // URI for a reference.
    49  type Destination struct {
    50  	// ProjectID is where the destination registry is hosted
    51  	ProjectID string
    52  	// Repository is the resource name for the artifact repository in the destination
    53  	// project ID.
    54  	Repository string
    55  }
    56  
    57  func newConfig() (*config, error) {
    58  	cfg := &config{}
    59  	fs := flag.NewFlagSet("palletfwder", flag.ContinueOnError)
    60  
    61  	fs.StringVar(&cfg.ProjectID, "source-project-id", "",
    62  		"project that contains the source registry and subscription(s)")
    63  	fs.StringVar(&cfg.Repository, "source-repository", "warehouse",
    64  		"optional source repository instance name to filter events on")
    65  	fs.StringVar(&cfg.subscriptions, "source-sub-ids", "registry-events-subscription",
    66  		"comma separated list of subscription IDs")
    67  	fs.StringVar(&cfg.destinationsStr, "destinations", "",
    68  		"comma separated list of default destinations to forward pallets to, in the "+
    69  			"form $PROJECT_ID/$REPOSITORY_RESOURCE_ID, e.g., ret-edge-dev1/warehouse")
    70  	fs.StringVar(&cfg.MetricsAddr, "metrics-addr", ":8081",
    71  		"address to bind metrics endpoint to")
    72  	fs.StringVar(&cfg.HealthzAddr, "healthz-addr", ":8082",
    73  		"address to bind heath-check endpoints to")
    74  
    75  	if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix()); err != nil {
    76  		return nil, err
    77  	}
    78  
    79  	cfg.Subscriptions = strings.Split(cfg.subscriptions, ",")
    80  
    81  	cfg.DefaultDestinations = make([]Destination, 0)
    82  	for _, dest := range strings.Split(cfg.destinationsStr, ",") {
    83  		t := strings.Split(dest, "/")
    84  		if len(t) != 2 {
    85  			return nil, fmt.Errorf("invalid destination token %s: should be in the "+
    86  				"format project-id/repository-resource-id", dest)
    87  		}
    88  		cfg.DefaultDestinations = append(cfg.DefaultDestinations, Destination{
    89  			ProjectID:  t[0],
    90  			Repository: t[1],
    91  		})
    92  	}
    93  
    94  	if cfg.ProjectID == "" || len(cfg.Subscriptions) == 0 ||
    95  		cfg.destinationsStr == "" {
    96  		return nil, fmt.Errorf("incomplete configuration, all flags are required. " +
    97  			"run --help for more information")
    98  	}
    99  
   100  	return cfg, nil
   101  }
   102  
   103  // Run creates the palletfwder and then starts it.
   104  func Run() error {
   105  	log := fog.New().WithName("registryforwarder")
   106  	ctx := context.Background()
   107  
   108  	cfg, err := newConfig()
   109  	if err != nil {
   110  		log.Error(err, "failed to load config")
   111  		return err
   112  	}
   113  	log.Info("loaded config", "config", &cfg)
   114  
   115  	mgr, err := createMgr(cfg)
   116  	if err != nil {
   117  		log.Error(err, "failed to create manager")
   118  		return err
   119  	}
   120  
   121  	log.Info("starting manager")
   122  	if err := mgr.Start(ctx); err != nil {
   123  		log.Error(err, "forwarder errored")
   124  		return err
   125  	}
   126  
   127  	return nil
   128  }
   129  
   130  // createMgr creates the runtime manager for the forwarder, instantiating an
   131  // a Handler per configured subscription and adding them to the created manager.
   132  func createMgr(cfg *config) (manager.Manager, error) {
   133  	// log := cfg.Runtime.Logger.WithName("setup")
   134  	mgr, err := manager.New(manager.Options{
   135  		MetricsBindAddress:     cfg.MetricsAddr,
   136  		HealthProbeBindAddress: cfg.HealthzAddr,
   137  	})
   138  	if err != nil {
   139  		return nil, err
   140  	}
   141  
   142  	// TODO(dk185217): unchecked err. ingestion is best effort for now
   143  	db, _ := sql.FromEnv()
   144  	f := &Fwder{
   145  		DST:              cfg.DefaultDestinations,
   146  		SourceRepository: cfg.Repository,
   147  		SQL:              db,
   148  	}
   149  
   150  	for _, subscription := range cfg.Subscriptions {
   151  		sub, err := subscriber.New(subscriber.Options{
   152  			ProjectID: cfg.ProjectID,
   153  			ID:        subscription,
   154  			Handler:   f,
   155  		})
   156  		if err != nil {
   157  			return nil, err
   158  		}
   159  
   160  		if err := mgr.Add(sub); err != nil {
   161  			return nil, err
   162  		}
   163  	}
   164  
   165  	if err := mgr.AddLivezCheck("alive", healthz.Ping); err != nil {
   166  		return nil, fmt.Errorf("failed to add check: /livez/alive. error: %v", err)
   167  	}
   168  	if err := mgr.AddReadyzCheck("ready", healthz.Ping); err != nil {
   169  		return nil, fmt.Errorf("failed to add check: /readyz/ready. error: %v", err)
   170  	}
   171  
   172  	return mgr, nil
   173  }
   174  

View as plain text