1 package forwarder
2
3 import (
4 "context"
5 "flag"
6 "fmt"
7 "os"
8 "strings"
9
10 "github.com/peterbourgon/ff/v3"
11
12 "edge-infra.dev/pkg/f8n/kinform/sql"
13 "edge-infra.dev/pkg/lib/fog"
14 "edge-infra.dev/pkg/lib/runtime/healthz"
15 "edge-infra.dev/pkg/lib/runtime/manager"
16 "edge-infra.dev/pkg/lib/runtime/subscriber"
17 )
18
19
20
21
22 type config struct {
23 ProjectID string
24
25 Subscriptions []string
26
27
28
29 Repository string
30
31
32
33 subscriptions string
34 destinationsStr string
35
36
37
38 DefaultDestinations []Destination
39
40
41 MetricsAddr string
42
43
44 HealthzAddr string
45 }
46
47
48
49 type Destination struct {
50
51 ProjectID string
52
53
54 Repository string
55 }
56
57 func newConfig() (*config, error) {
58 cfg := &config{}
59 fs := flag.NewFlagSet("palletfwder", flag.ContinueOnError)
60
61 fs.StringVar(&cfg.ProjectID, "source-project-id", "",
62 "project that contains the source registry and subscription(s)")
63 fs.StringVar(&cfg.Repository, "source-repository", "warehouse",
64 "optional source repository instance name to filter events on")
65 fs.StringVar(&cfg.subscriptions, "source-sub-ids", "registry-events-subscription",
66 "comma separated list of subscription IDs")
67 fs.StringVar(&cfg.destinationsStr, "destinations", "",
68 "comma separated list of default destinations to forward pallets to, in the "+
69 "form $PROJECT_ID/$REPOSITORY_RESOURCE_ID, e.g., ret-edge-dev1/warehouse")
70 fs.StringVar(&cfg.MetricsAddr, "metrics-addr", ":8081",
71 "address to bind metrics endpoint to")
72 fs.StringVar(&cfg.HealthzAddr, "healthz-addr", ":8082",
73 "address to bind heath-check endpoints to")
74
75 if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix()); err != nil {
76 return nil, err
77 }
78
79 cfg.Subscriptions = strings.Split(cfg.subscriptions, ",")
80
81 cfg.DefaultDestinations = make([]Destination, 0)
82 for _, dest := range strings.Split(cfg.destinationsStr, ",") {
83 t := strings.Split(dest, "/")
84 if len(t) != 2 {
85 return nil, fmt.Errorf("invalid destination token %s: should be in the "+
86 "format project-id/repository-resource-id", dest)
87 }
88 cfg.DefaultDestinations = append(cfg.DefaultDestinations, Destination{
89 ProjectID: t[0],
90 Repository: t[1],
91 })
92 }
93
94 if cfg.ProjectID == "" || len(cfg.Subscriptions) == 0 ||
95 cfg.destinationsStr == "" {
96 return nil, fmt.Errorf("incomplete configuration, all flags are required. " +
97 "run --help for more information")
98 }
99
100 return cfg, nil
101 }
102
103
104 func Run() error {
105 log := fog.New().WithName("registryforwarder")
106 ctx := context.Background()
107
108 cfg, err := newConfig()
109 if err != nil {
110 log.Error(err, "failed to load config")
111 return err
112 }
113 log.Info("loaded config", "config", &cfg)
114
115 mgr, err := createMgr(cfg)
116 if err != nil {
117 log.Error(err, "failed to create manager")
118 return err
119 }
120
121 log.Info("starting manager")
122 if err := mgr.Start(ctx); err != nil {
123 log.Error(err, "forwarder errored")
124 return err
125 }
126
127 return nil
128 }
129
130
131
132 func createMgr(cfg *config) (manager.Manager, error) {
133
134 mgr, err := manager.New(manager.Options{
135 MetricsBindAddress: cfg.MetricsAddr,
136 HealthProbeBindAddress: cfg.HealthzAddr,
137 })
138 if err != nil {
139 return nil, err
140 }
141
142
143 db, _ := sql.FromEnv()
144 f := &Fwder{
145 DST: cfg.DefaultDestinations,
146 SourceRepository: cfg.Repository,
147 SQL: db,
148 }
149
150 for _, subscription := range cfg.Subscriptions {
151 sub, err := subscriber.New(subscriber.Options{
152 ProjectID: cfg.ProjectID,
153 ID: subscription,
154 Handler: f,
155 })
156 if err != nil {
157 return nil, err
158 }
159
160 if err := mgr.Add(sub); err != nil {
161 return nil, err
162 }
163 }
164
165 if err := mgr.AddLivezCheck("alive", healthz.Ping); err != nil {
166 return nil, fmt.Errorf("failed to add check: /livez/alive. error: %v", err)
167 }
168 if err := mgr.AddReadyzCheck("ready", healthz.Ping); err != nil {
169 return nil, fmt.Errorf("failed to add check: /readyz/ready. error: %v", err)
170 }
171
172 return mgr, nil
173 }
174
View as plain text