package main import ( "context" "flag" "fmt" "net/http" "os" "cloud.google.com/go/storage" "google.golang.org/api/option" "edge-infra.dev/pkg/edge/chariot" "edge-infra.dev/pkg/lib/server" ) // Default chariot http ports const ( defaultLivenessPort = 8080 defaultMetricsPort = 8081 defaultReadyPort = 8082 ) // Flags var ( livenessPort int metricsPort int readyPort int projectID string subID string respTopicID string gcsURL string ) func init() { flag.IntVar(&livenessPort, "liveness-port", defaultLivenessPort, "The Chariot liveness server port") flag.IntVar(&metricsPort, "metrics-port", defaultMetricsPort, "The Chariot metrics server port") flag.IntVar(&readyPort, "ready-port", defaultReadyPort, "The Chariot ready server port") // Daemon PubSub Configuration flag.StringVar(&projectID, "projectID", os.Getenv("GOOGLE_CLOUD_PROJECT_ID"), "The Google Cloud Project ID") flag.StringVar(&subID, "subID", os.Getenv("PUBSUB_SUBSCRIPTION_ID"), "The PubSub Subscription ID") flag.StringVar(&respTopicID, "responseTopicID", os.Getenv("PUBSUB_RESPONSE_TOPIC_ID"), "The PubSub Response Topic ID") const exGcsURL = "http://localhost:8084/storage/v1/" var gcsURLHelp = fmt.Sprintf("Optional. Override the Google Cloud Storage Endpoint with a custom URL. Ex: %q", exGcsURL) flag.StringVar(&gcsURL, "gcs-endpoint-url", os.Getenv("GCS_ENDPOINT_URL"), gcsURLHelp) } const ( logfmt = `{"severity":%q,"message":%q}` + "\n" sevError = "error" DaemonErrorPrefix = "Chariot Daemon Error:" ) func runServers() error { metricsServer, err := server.NewMetricsServer(server.OptionPort(metricsPort)) if err != nil { return err } readyServer, ready, err := server.NewReadyServer(server.OptionPort(readyPort)) if err != nil { return err } defer ready() livenessServer, setCode, err := server.NewLivenessServer(server.OptionPort(livenessPort)) if err != nil { return err } defer setCode(http.StatusOK) go metricsServer.Run() //nolint:errcheck go readyServer.Run() //nolint:errcheck go livenessServer.Run() //nolint:errcheck return nil } func main() { flag.Parse() if err := runServers(); err != nil { fmt.Printf(logfmt, sevError, err) os.Exit(1) } opts, err := daemonOptions() if err != nil { fmt.Printf(logfmt, sevError, err) os.Exit(1) } daemon, err := chariot.NewDaemon(opts...) if err != nil { fmt.Printf(logfmt, sevError, err) os.Exit(1) } err = daemon.Run(context.Background()) fmt.Printf(logfmt, sevError, err) os.Exit(1) } func daemonOptions() ([]chariot.Option, error) { ctx := context.Background() var gcsClientOptions []option.ClientOption if gcsURL != "" { gcsClientOptions = append(gcsClientOptions, option.WithEndpoint(gcsURL)) } gcsClient, err := storage.NewClient(ctx, gcsClientOptions...) if err != nil { return nil, fmt.Errorf("%s error creating storage client: %w", DaemonErrorPrefix, err) } ipsr, err := chariot.NewGooglePubSubReceiver(projectID, subID) if err != nil { return nil, fmt.Errorf("%s error creating pubsub receiver: %w", DaemonErrorPrefix, err) } ipub, err := chariot.NewGooglePubSubResponseTopicPublisher(projectID, respTopicID) if err != nil { return nil, fmt.Errorf("%s error creating pubsub topic for responses: %w", DaemonErrorPrefix, err) } return []chariot.Option{ chariot.OptionGoogleCloudStorage(gcsClient), chariot.OptionPubSubReceiver(ipsr), chariot.OptionPubSubResponsePublisher(ipub), }, nil }