...

Source file src/edge-infra.dev/cmd/x/gcp/publisher/main.go

Documentation: edge-infra.dev/cmd/x/gcp/publisher

     1  package main
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"flag"
     7  	"fmt"
     8  	"os"
     9  
    10  	"edge-infra.dev/pkg/lib/gcp/pubsub"
    11  	"edge-infra.dev/pkg/lib/logging"
    12  )
    13  
    14  var (
    15  	projectID = ""
    16  	topicID   = ""
    17  
    18  	message = `
    19  	{
    20  		"hello": "world"
    21  	}`
    22  
    23  	attrs = map[string]string{
    24  		// cushion provider attributes - leaving for ease of use
    25  		// todo - define these in data sync code somewhere
    26  		// "tenant_id":   "",                    // banner bsl ID, must be an org
    27  		// "db_name":     "",                    // name of database to use, will create if it doesn't not exist
    28  		// "entity_id":   "",                    // random uuid, must use same ID when trying to delete
    29  		// "data_source": "x-publisher-testing", // any string
    30  		// "deleted":     "false",               // string form of bool
    31  	}
    32  )
    33  
    34  func main() {
    35  	// use flags if provided
    36  	// todo - add ability to pass in message as string or file
    37  	flag.StringVar(&projectID, "projectID", projectID, "GCP project ID that the PubSub topic lives in")
    38  	flag.StringVar(&topicID, "topicID", topicID, "GCP PubSub topic ID")
    39  	flag.Parse()
    40  
    41  	l := logging.NewLogger().WithValues("cmd", "x-publisher").Logger
    42  
    43  	// validate required config is set
    44  	if projectID == "" || topicID == "" {
    45  		l.Error(fmt.Errorf("invalid configuration"), "must set values for 'projectID' and 'topicID'", "projectID",
    46  			projectID, "topicID", topicID)
    47  		os.Exit(1)
    48  	}
    49  
    50  	// validate message JSON
    51  	msgBytes := []byte(message)
    52  	var js json.RawMessage
    53  	if err := json.Unmarshal(msgBytes, &js); err != nil {
    54  		l.Error(err, "'message' is not valid json")
    55  		os.Exit(1)
    56  	}
    57  
    58  	ctx := context.Background()
    59  	manager, err := pubsub.New(ctx, projectID)
    60  	if err != nil {
    61  		l.Error(err, "failed to connect to gcp pubsub", "project", projectID)
    62  		os.Exit(1)
    63  	}
    64  	l.Info("connected to gcp pubsub", "project", projectID)
    65  
    66  	l.Info("sending message to topic", "project", projectID, "topic", topicID)
    67  	if err := manager.Send(ctx, topicID, msgBytes, attrs); err != nil {
    68  		l.Error(err, "failed to publish message", "project", projectID, "topic", topicID, "attributes", attrs)
    69  		os.Exit(1)
    70  	}
    71  	l.Info("published message")
    72  }
    73  

View as plain text