1 package main
2
3 import (
4 "context"
5 "flag"
6 "fmt"
7 "net/http"
8 "os"
9
10 "github.com/peterbourgon/ff/v3"
11
12 "edge-infra.dev/pkg/edge/monitoring/plank"
13 "edge-infra.dev/pkg/lib/db/postgres"
14 "edge-infra.dev/pkg/lib/logging"
15 "edge-infra.dev/pkg/lib/runtime"
16 "edge-infra.dev/pkg/lib/runtime/healthz"
17 "edge-infra.dev/pkg/lib/runtime/manager"
18 "edge-infra.dev/pkg/lib/runtime/metrics"
19 "edge-infra.dev/pkg/lib/runtime/subscriber"
20 )
21
22 var (
23 fs = flag.NewFlagSet("plank", flag.ContinueOnError)
24
25 projectID = fs.String("projectID", "", "GCP Project ID for PubSub client")
26 subID = fs.String("subID", "", "The PubSub Subscription ID")
27
28 host = fs.String("db_host", "localhost", "AlloyDB host")
29 user = fs.String("db_user", "", "AlloyDB user")
30 pass = fs.String("db_password", "", "AlloyDB password")
31 dbName = fs.String("db_name", "", "AlloyDB database name")
32
33 slackWebhookURL = fs.String("slackWebhookURL", "", "The slack webhook to send the notifications")
34 )
35
36 func main() {
37 if err := run(); err != nil {
38 os.Exit(1)
39 }
40 }
41
42 func run() error {
43 l := logging.NewLogger().WithValues("cmd", "plank").Logger
44
45 if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix()); err != nil {
46 l.Error(err, "failed to parse flags")
47 return err
48 }
49
50 if err := validateConfig(); err != nil {
51 l.Error(err, "failed to validate configuration")
52 return err
53 }
54
55
56 edgeOpts := manager.Options{
57 Logger: &l,
58 MetricsBindAddress: metrics.DefaultBindAddress,
59 }
60 edgeMgr, err := runtime.NewManager(edgeOpts)
61 if err != nil {
62 l.Error(err, "failed to create default manager")
63 return err
64 }
65
66 ctx := context.Background()
67 opts := postgres.DSNOptions{
68 Host: *host,
69 DBName: *dbName,
70 User: *user,
71 Password: *pass,
72 }
73
74 client, err := postgres.New(ctx, l, opts)
75 if err != nil {
76 l.Error(err, "failed to create postgres client")
77 return err
78 }
79 defer client.Close()
80 if err := client.Connect(); err != nil {
81 l.Error(err, "failed to connect to postgres database")
82 return err
83 }
84
85
86 p := plank.New(l, *slackWebhookURL, client)
87
88 subOpts := subscriber.Options{
89 Logger: &l,
90 ProjectID: *projectID,
91 ID: *subID,
92 Handler: p,
93 }
94 sub, err := runtime.NewSubscriber(subOpts)
95 if err != nil {
96 l.Error(err, "failed to create subscriber")
97 return err
98 }
99
100 if err := edgeMgr.Add(sub); err != nil {
101 l.Error(err, "failed to add subscriber to edge default manager")
102 return err
103 }
104
105
106 if err := edgeMgr.AddLivezCheck("subscriber", healthz.Ping); err != nil {
107 l.Error(err, "failed to add livez check")
108 return err
109 }
110
111 if err := edgeMgr.AddReadyzCheck("subscriber", healthz.Ping); err != nil {
112
113 l.Error(err, "failed to add readyz check")
114 return err
115 }
116
117
118 var storageReadyz healthz.Checker = func(_ *http.Request) error {
119 if !client.IsConnected() {
120 return fmt.Errorf("no storage connection")
121 }
122 return nil
123 }
124 if err := edgeMgr.AddReadyzCheck("storage", storageReadyz); err != nil {
125 l.Error(err, "failed to add readyz check")
126 return err
127 }
128
129 l.Info("starting manager")
130 if err := edgeMgr.Start(runtime.SetupSignalHandler()); err != nil {
131 l.Error(err, "failed to start edge default manager")
132 return err
133 }
134
135 return nil
136 }
137
138 func validateConfig() error {
139
140 if err := ff.Parse(fs, os.Args[1:], ff.WithEnvVarNoPrefix()); err != nil {
141 return err
142 }
143
144 if projectID == nil || *projectID == "" {
145 return fmt.Errorf("'projectID' arg required")
146 }
147 if subID == nil || *subID == "" {
148 return fmt.Errorf("'subID' arg required")
149 }
150
151 return nil
152 }
153
View as plain text