...

Source file src/edge-infra.dev/pkg/edge/chariot/example/main.go

Documentation: edge-infra.dev/pkg/edge/chariot/example

     1  package main
     2  
     3  import (
     4  	"context"
     5  	crypto "crypto/rand"
     6  	"encoding/base64"
     7  	"encoding/json"
     8  	"fmt"
     9  	"math/rand" //nolint:gosec
    10  	"os"
    11  	"time"
    12  
    13  	"cloud.google.com/go/pubsub"
    14  	"google.golang.org/api/iterator"
    15  	"gopkg.in/yaml.v2"
    16  
    17  	"edge-infra.dev/pkg/edge/chariot"
    18  )
    19  
    20  var (
    21  	requestRate   = os.Getenv("DEMO_REQUEST_RATE")
    22  	bucket        = os.Getenv("DEMO_BUCKET_NAME")
    23  	projectID     = os.Getenv("GOOGLE_CLOUD_PROJECT_ID")
    24  	requestTopic  = os.Getenv("PUBSUB_TOPIC_ID")
    25  	requestSub    = os.Getenv("PUBSUB_SUBSCRIPTION_ID")
    26  	responseTopic = os.Getenv("PUBSUB_RESPONSE_TOPIC_ID")
    27  )
    28  
    29  var ticker <-chan time.Time // = time.NewTicker(50 * time.Millisecond).C
    30  
    31  func init() {
    32  	reqr, err := time.ParseDuration(requestRate)
    33  	if err != nil {
    34  		fmt.Printf("Could not parse duration: %q %v\n", requestRate, err)
    35  		reqr = 200 * time.Millisecond
    36  	}
    37  	ticker = time.NewTicker(reqr).C
    38  }
    39  
    40  var owner string
    41  
    42  func init() {
    43  	var ownerB [8]byte
    44  	crypto.Read(ownerB[:]) //nolint:errcheck
    45  	owner = fmt.Sprintf("o%x", ownerB)
    46  }
    47  
    48  func getFilter() string {
    49  	return fmt.Sprintf("attributes.owner == %q", owner)
    50  }
    51  
    52  func main() {
    53  	fmt.Println("Chariot Request Producer Demo")
    54  	fmt.Printf("Request Rate: %q\n", requestRate)
    55  	fmt.Printf("Owner: %q\n", owner)
    56  	fmt.Printf("Project ID: %q\n", projectID)
    57  	fmt.Printf("Topic ID: %q\n", requestTopic)
    58  	fmt.Printf("Subscription ID: %q\n", requestSub)
    59  	fmt.Printf("Google Cloud Storage Bucket: %q\n", bucket)
    60  
    61  	ctx := context.Background()
    62  
    63  	client, err := pubsub.NewClient(ctx, projectID)
    64  	if err != nil {
    65  		fmt.Println("Error creating pubsub client:", err)
    66  		return
    67  	}
    68  
    69  	respCh, err := getResponsesChan(context.Background(), client)
    70  	_, _ = respCh, err
    71  
    72  	var topic = client.Topic(requestTopic)
    73  	if exists, err := topic.Exists(ctx); err != nil {
    74  		fmt.Printf("Error checking if request topic %q exists: %v\n", requestTopic, err)
    75  		return
    76  	} else if !exists {
    77  		topic, err = client.CreateTopic(ctx, requestTopic)
    78  		if err != nil {
    79  			fmt.Printf("Error creating request topic %q. Got error: %v\n", requestTopic, err)
    80  			return
    81  		}
    82  	}
    83  
    84  	// Wait until chariot has subscribed.
    85  	const timeoutDur = 30 * time.Minute
    86  	timeout, cancel := context.WithTimeout(ctx, timeoutDur)
    87  	if err = waitForSubscription(timeout, topic); err != nil {
    88  		fmt.Printf("Timeout %v exceeded waiting for chariot to subscribe to the request topic\n", timeoutDur)
    89  		return
    90  	}
    91  	cancel()
    92  
    93  	// Send requests and await their responses
    94  	const timeoutDuration = time.Minute
    95  	for range ticker {
    96  		timeout, cancel = context.WithTimeout(ctx, timeoutDuration)
    97  		var req = randomCreateRequest()
    98  		var reqMsg = &pubsub.Message{}
    99  		reqMsg.Data, _ = json.Marshal(req)
   100  		fmt.Printf("Request Data: %s\n", base64.StdEncoding.EncodeToString(reqMsg.Data))
   101  		reqID, err := topic.Publish(timeout, reqMsg).Get(timeout)
   102  		if err != nil {
   103  			fmt.Printf("Error publishing request: %v\n", err)
   104  			return
   105  		}
   106  		fmt.Printf("Request ID:  %q\n", reqID)
   107  
   108  		var resp chariot.ResponseJSON
   109  
   110  		// EMULATOR FILTERING DOES NOT WORK! ARGHHHH.
   111  		// See https://issuetracker.google.com/issues/170471468
   112  		for {
   113  			select {
   114  			case <-timeout.Done():
   115  				fmt.Printf("Exceeded timeout of %v\n", timeoutDuration)
   116  				return
   117  			case psresp := <-respCh:
   118  				psresp.Ack()
   119  				err := json.Unmarshal(psresp.Data, &resp)
   120  				if err != nil {
   121  					fmt.Printf("Error unmarshalling JSON: %v\n", err)
   122  					return
   123  				}
   124  			}
   125  
   126  			if resp.PubSubRequestID == reqID {
   127  				break
   128  			}
   129  		}
   130  
   131  		fmt.Printf("Response ID: %q\n", resp.PubSubRequestID)
   132  		fmt.Printf("Response OK: %t\n", resp.Ok)
   133  		if !resp.Ok {
   134  			fmt.Printf("Response Error: %q\n", resp.Error)
   135  		}
   136  		cancel()
   137  	}
   138  }
   139  
   140  func getResponsesChan(ctx context.Context, client *pubsub.Client) (<-chan *pubsub.Message, error) {
   141  	var topic = client.Topic(responseTopic)
   142  	if exists, err := topic.Exists(ctx); err != nil {
   143  		fmt.Println("Error checking if topic exists:", err)
   144  		return nil, err
   145  	} else if !exists {
   146  		fmt.Printf("Creating response topic: %q\n", responseTopic)
   147  		topic, err = client.CreateTopic(ctx, responseTopic)
   148  		if err != nil {
   149  			return nil, err
   150  		}
   151  	}
   152  	fmt.Printf("Using response topic: %q\n", responseTopic)
   153  
   154  	// Create unique subscription for this owner.
   155  	var filter = getFilter()
   156  	fmt.Printf("Creating Subscription: %q\n", owner)
   157  	fmt.Printf("  With Filter: %q\n", filter)
   158  	sub, err := client.CreateSubscription(ctx, owner, pubsub.SubscriptionConfig{
   159  		Topic:       topic,
   160  		AckDeadline: time.Minute,
   161  		Filter:      filter,
   162  	})
   163  	if err != nil {
   164  		return nil, err
   165  	}
   166  	fmt.Println("Response subscription created:", owner)
   167  
   168  	var ret = make(chan *pubsub.Message)
   169  	//nolint:errcheck
   170  	go sub.Receive(ctx, func(rctx context.Context, msg *pubsub.Message) {
   171  		select {
   172  		case <-rctx.Done():
   173  			return
   174  		case ret <- msg:
   175  			// message sent.
   176  		}
   177  	})
   178  	return ret, nil
   179  }
   180  
   181  func waitForSubscription(ctx context.Context, topic *pubsub.Topic) error {
   182  	fmt.Printf("Waiting for chariot to subscribe.")
   183  	for {
   184  		subs := topic.Subscriptions(ctx)
   185  		for {
   186  			sub, err := subs.Next()
   187  			if err == iterator.Done {
   188  				break
   189  			}
   190  			if err != nil {
   191  				return err
   192  			}
   193  			fmt.Printf("found subscription: %q\n", sub.ID())
   194  			return nil
   195  		}
   196  
   197  		select {
   198  		case <-time.After(time.Second):
   199  			fmt.Print(".")
   200  		case <-ctx.Done():
   201  			return ctx.Err()
   202  		}
   203  	}
   204  }
   205  
   206  //nolint:gosec
   207  func randomCreateRequest() chariot.Request {
   208  	var cluster string
   209  	if rand.Int()%2 == 0 {
   210  		cluster = fmt.Sprintf("cluster%d", 1+(rand.Int()%7))
   211  	}
   212  
   213  	var req = chariot.Request{
   214  		Operation: "CREATE",
   215  		Banner:    bucket,
   216  		Cluster:   cluster,
   217  		Owner:     owner,
   218  	}
   219  
   220  	const apiVersion = "demo.chariot.one/v1alpha1"
   221  
   222  	const maxObjects = 5
   223  	var numObjects = 1 + (rand.Int() % maxObjects)
   224  	for i := 0; i < numObjects; i++ {
   225  		// Determine object Kind
   226  		var kinds = []string{"Cluster", "Namespace", "Tenant"}
   227  		var kind = kinds[rand.Int()%len(kinds)]
   228  
   229  		// Determine object Name
   230  		var nameBytes [16]byte
   231  		crypto.Read(nameBytes[:]) //nolint:errcheck
   232  		var name = fmt.Sprintf("%x", nameBytes)
   233  
   234  		// Determine object Namespace if any
   235  		var namespaces = []string{"foo", "bar", "baz"}
   236  		var namespace string
   237  		if rand.Int()%2 == 0 {
   238  			namespace = namespaces[rand.Int()%len(namespaces)]
   239  		}
   240  
   241  		var object struct {
   242  			APIVersion string `yaml:"apiVersion"`
   243  			Kind       string `yaml:"kind"`
   244  			Metadata   struct {
   245  				Name      string `yaml:"name"`
   246  				Namespace string `yaml:"namespace"`
   247  			} `yaml:"metadata"`
   248  			Spec struct {
   249  				Foo string `yaml:"foo"`
   250  				Bar string `yaml:"bar"`
   251  			} `yaml:"spec"`
   252  		}
   253  		object.APIVersion = apiVersion
   254  		object.Kind = kind
   255  		object.Metadata.Name = name
   256  		object.Metadata.Namespace = namespace
   257  		object.Spec.Foo = "foo"
   258  		object.Spec.Bar = "baz"
   259  
   260  		var y, _ = yaml.Marshal(object)
   261  		req.Objects = append(req.Objects, y)
   262  	}
   263  
   264  	// CHAOS. INTRODUCE CHAOS.
   265  	// Have a 1/8th percent chance that an bad request is sent.
   266  	if rand.Int()%8 == 0 {
   267  		req.Operation = "CHAOS"
   268  	}
   269  
   270  	return req
   271  }
   272  

View as plain text