1 package main
2
3 import (
4 "context"
5 "flag"
6 "fmt"
7 "net/http"
8 "os"
9
10 "cloud.google.com/go/storage"
11 "google.golang.org/api/option"
12
13 "edge-infra.dev/pkg/edge/chariot"
14 "edge-infra.dev/pkg/lib/server"
15 )
16
17
18 const (
19 defaultLivenessPort = 8080
20 defaultMetricsPort = 8081
21 defaultReadyPort = 8082
22 )
23
24
25 var (
26 livenessPort int
27 metricsPort int
28 readyPort int
29
30 projectID string
31 subID string
32
33 respTopicID string
34
35 gcsURL string
36 )
37
38 func init() {
39 flag.IntVar(&livenessPort, "liveness-port", defaultLivenessPort, "The Chariot liveness server port")
40 flag.IntVar(&metricsPort, "metrics-port", defaultMetricsPort, "The Chariot metrics server port")
41 flag.IntVar(&readyPort, "ready-port", defaultReadyPort, "The Chariot ready server port")
42
43
44 flag.StringVar(&projectID, "projectID", os.Getenv("GOOGLE_CLOUD_PROJECT_ID"), "The Google Cloud Project ID")
45 flag.StringVar(&subID, "subID", os.Getenv("PUBSUB_SUBSCRIPTION_ID"), "The PubSub Subscription ID")
46 flag.StringVar(&respTopicID, "responseTopicID", os.Getenv("PUBSUB_RESPONSE_TOPIC_ID"), "The PubSub Response Topic ID")
47
48 const exGcsURL = "http://localhost:8084/storage/v1/"
49 var gcsURLHelp = fmt.Sprintf("Optional. Override the Google Cloud Storage Endpoint with a custom URL. Ex: %q", exGcsURL)
50 flag.StringVar(&gcsURL, "gcs-endpoint-url", os.Getenv("GCS_ENDPOINT_URL"), gcsURLHelp)
51 }
52
53 const (
54 logfmt = `{"severity":%q,"message":%q}` + "\n"
55 sevError = "error"
56 DaemonErrorPrefix = "Chariot Daemon Error:"
57 )
58
59 func runServers() error {
60 metricsServer, err := server.NewMetricsServer(server.OptionPort(metricsPort))
61 if err != nil {
62 return err
63 }
64
65 readyServer, ready, err := server.NewReadyServer(server.OptionPort(readyPort))
66 if err != nil {
67 return err
68 }
69 defer ready()
70
71 livenessServer, setCode, err := server.NewLivenessServer(server.OptionPort(livenessPort))
72 if err != nil {
73 return err
74 }
75 defer setCode(http.StatusOK)
76
77 go metricsServer.Run()
78 go readyServer.Run()
79 go livenessServer.Run()
80 return nil
81 }
82
83 func main() {
84 flag.Parse()
85
86 if err := runServers(); err != nil {
87 fmt.Printf(logfmt, sevError, err)
88 os.Exit(1)
89 }
90
91 opts, err := daemonOptions()
92 if err != nil {
93 fmt.Printf(logfmt, sevError, err)
94 os.Exit(1)
95 }
96
97 daemon, err := chariot.NewDaemon(opts...)
98 if err != nil {
99 fmt.Printf(logfmt, sevError, err)
100 os.Exit(1)
101 }
102
103 err = daemon.Run(context.Background())
104 fmt.Printf(logfmt, sevError, err)
105 os.Exit(1)
106 }
107
108 func daemonOptions() ([]chariot.Option, error) {
109 ctx := context.Background()
110 var gcsClientOptions []option.ClientOption
111 if gcsURL != "" {
112 gcsClientOptions = append(gcsClientOptions, option.WithEndpoint(gcsURL))
113 }
114 gcsClient, err := storage.NewClient(ctx, gcsClientOptions...)
115 if err != nil {
116 return nil, fmt.Errorf("%s error creating storage client: %w", DaemonErrorPrefix, err)
117 }
118
119 ipsr, err := chariot.NewGooglePubSubReceiver(projectID, subID)
120 if err != nil {
121 return nil, fmt.Errorf("%s error creating pubsub receiver: %w", DaemonErrorPrefix, err)
122 }
123
124 ipub, err := chariot.NewGooglePubSubResponseTopicPublisher(projectID, respTopicID)
125 if err != nil {
126 return nil, fmt.Errorf("%s error creating pubsub topic for responses: %w", DaemonErrorPrefix, err)
127 }
128
129 return []chariot.Option{
130 chariot.OptionGoogleCloudStorage(gcsClient),
131 chariot.OptionPubSubReceiver(ipsr),
132 chariot.OptionPubSubResponsePublisher(ipub),
133 }, nil
134 }
135
View as plain text