...

Source file src/edge-infra.dev/cmd/edge/chariot2/main.go

Documentation: edge-infra.dev/cmd/edge/chariot2

     1  package main
     2  
     3  import (
     4  	"context"
     5  	"flag"
     6  	"fmt"
     7  	"net/http"
     8  	"os"
     9  
    10  	"cloud.google.com/go/storage"
    11  	"google.golang.org/api/option"
    12  
    13  	"edge-infra.dev/pkg/edge/chariot"
    14  	"edge-infra.dev/pkg/lib/server"
    15  )
    16  
    17  // Default chariot http ports
    18  const (
    19  	defaultLivenessPort = 8080
    20  	defaultMetricsPort  = 8081
    21  	defaultReadyPort    = 8082
    22  )
    23  
    24  // Flags
    25  var (
    26  	livenessPort int
    27  	metricsPort  int
    28  	readyPort    int
    29  
    30  	projectID string
    31  	subID     string
    32  
    33  	respTopicID string
    34  
    35  	gcsURL string
    36  )
    37  
    38  func init() {
    39  	flag.IntVar(&livenessPort, "liveness-port", defaultLivenessPort, "The Chariot liveness server port")
    40  	flag.IntVar(&metricsPort, "metrics-port", defaultMetricsPort, "The Chariot metrics server port")
    41  	flag.IntVar(&readyPort, "ready-port", defaultReadyPort, "The Chariot ready server port")
    42  
    43  	// Daemon PubSub Configuration
    44  	flag.StringVar(&projectID, "projectID", os.Getenv("GOOGLE_CLOUD_PROJECT_ID"), "The Google Cloud Project ID")
    45  	flag.StringVar(&subID, "subID", os.Getenv("PUBSUB_SUBSCRIPTION_ID"), "The PubSub Subscription ID")
    46  	flag.StringVar(&respTopicID, "responseTopicID", os.Getenv("PUBSUB_RESPONSE_TOPIC_ID"), "The PubSub Response Topic ID")
    47  
    48  	const exGcsURL = "http://localhost:8084/storage/v1/"
    49  	var gcsURLHelp = fmt.Sprintf("Optional. Override the Google Cloud Storage Endpoint with a custom URL. Ex: %q", exGcsURL)
    50  	flag.StringVar(&gcsURL, "gcs-endpoint-url", os.Getenv("GCS_ENDPOINT_URL"), gcsURLHelp)
    51  }
    52  
    53  const (
    54  	logfmt            = `{"severity":%q,"message":%q}` + "\n"
    55  	sevError          = "error"
    56  	DaemonErrorPrefix = "Chariot Daemon Error:"
    57  )
    58  
    59  func runServers() error {
    60  	metricsServer, err := server.NewMetricsServer(server.OptionPort(metricsPort))
    61  	if err != nil {
    62  		return err
    63  	}
    64  
    65  	readyServer, ready, err := server.NewReadyServer(server.OptionPort(readyPort))
    66  	if err != nil {
    67  		return err
    68  	}
    69  	defer ready()
    70  
    71  	livenessServer, setCode, err := server.NewLivenessServer(server.OptionPort(livenessPort))
    72  	if err != nil {
    73  		return err
    74  	}
    75  	defer setCode(http.StatusOK)
    76  
    77  	go metricsServer.Run()  //nolint:errcheck
    78  	go readyServer.Run()    //nolint:errcheck
    79  	go livenessServer.Run() //nolint:errcheck
    80  	return nil
    81  }
    82  
    83  func main() {
    84  	flag.Parse()
    85  
    86  	if err := runServers(); err != nil {
    87  		fmt.Printf(logfmt, sevError, err)
    88  		os.Exit(1)
    89  	}
    90  
    91  	opts, err := daemonOptions()
    92  	if err != nil {
    93  		fmt.Printf(logfmt, sevError, err)
    94  		os.Exit(1)
    95  	}
    96  
    97  	daemon, err := chariot.NewDaemon(opts...)
    98  	if err != nil {
    99  		fmt.Printf(logfmt, sevError, err)
   100  		os.Exit(1)
   101  	}
   102  
   103  	err = daemon.Run(context.Background())
   104  	fmt.Printf(logfmt, sevError, err)
   105  	os.Exit(1)
   106  }
   107  
   108  func daemonOptions() ([]chariot.Option, error) {
   109  	ctx := context.Background()
   110  	var gcsClientOptions []option.ClientOption
   111  	if gcsURL != "" {
   112  		gcsClientOptions = append(gcsClientOptions, option.WithEndpoint(gcsURL))
   113  	}
   114  	gcsClient, err := storage.NewClient(ctx, gcsClientOptions...)
   115  	if err != nil {
   116  		return nil, fmt.Errorf("%s error creating storage client: %w", DaemonErrorPrefix, err)
   117  	}
   118  
   119  	ipsr, err := chariot.NewGooglePubSubReceiver(projectID, subID)
   120  	if err != nil {
   121  		return nil, fmt.Errorf("%s error creating pubsub receiver: %w", DaemonErrorPrefix, err)
   122  	}
   123  
   124  	ipub, err := chariot.NewGooglePubSubResponseTopicPublisher(projectID, respTopicID)
   125  	if err != nil {
   126  		return nil, fmt.Errorf("%s error creating pubsub topic for responses: %w", DaemonErrorPrefix, err)
   127  	}
   128  
   129  	return []chariot.Option{
   130  		chariot.OptionGoogleCloudStorage(gcsClient),
   131  		chariot.OptionPubSubReceiver(ipsr),
   132  		chariot.OptionPubSubResponsePublisher(ipub),
   133  	}, nil
   134  }
   135  

View as plain text