package main import ( "context" crypto "crypto/rand" "encoding/base64" "encoding/json" "fmt" "math/rand" //nolint:gosec "os" "time" "cloud.google.com/go/pubsub" "google.golang.org/api/iterator" "gopkg.in/yaml.v2" "edge-infra.dev/pkg/edge/chariot" ) var ( requestRate = os.Getenv("DEMO_REQUEST_RATE") bucket = os.Getenv("DEMO_BUCKET_NAME") projectID = os.Getenv("GOOGLE_CLOUD_PROJECT_ID") requestTopic = os.Getenv("PUBSUB_TOPIC_ID") requestSub = os.Getenv("PUBSUB_SUBSCRIPTION_ID") responseTopic = os.Getenv("PUBSUB_RESPONSE_TOPIC_ID") ) var ticker <-chan time.Time // = time.NewTicker(50 * time.Millisecond).C func init() { reqr, err := time.ParseDuration(requestRate) if err != nil { fmt.Printf("Could not parse duration: %q %v\n", requestRate, err) reqr = 200 * time.Millisecond } ticker = time.NewTicker(reqr).C } var owner string func init() { var ownerB [8]byte crypto.Read(ownerB[:]) //nolint:errcheck owner = fmt.Sprintf("o%x", ownerB) } func getFilter() string { return fmt.Sprintf("attributes.owner == %q", owner) } func main() { fmt.Println("Chariot Request Producer Demo") fmt.Printf("Request Rate: %q\n", requestRate) fmt.Printf("Owner: %q\n", owner) fmt.Printf("Project ID: %q\n", projectID) fmt.Printf("Topic ID: %q\n", requestTopic) fmt.Printf("Subscription ID: %q\n", requestSub) fmt.Printf("Google Cloud Storage Bucket: %q\n", bucket) ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { fmt.Println("Error creating pubsub client:", err) return } respCh, err := getResponsesChan(context.Background(), client) _, _ = respCh, err var topic = client.Topic(requestTopic) if exists, err := topic.Exists(ctx); err != nil { fmt.Printf("Error checking if request topic %q exists: %v\n", requestTopic, err) return } else if !exists { topic, err = client.CreateTopic(ctx, requestTopic) if err != nil { fmt.Printf("Error creating request topic %q. Got error: %v\n", requestTopic, err) return } } // Wait until chariot has subscribed. const timeoutDur = 30 * time.Minute timeout, cancel := context.WithTimeout(ctx, timeoutDur) if err = waitForSubscription(timeout, topic); err != nil { fmt.Printf("Timeout %v exceeded waiting for chariot to subscribe to the request topic\n", timeoutDur) return } cancel() // Send requests and await their responses const timeoutDuration = time.Minute for range ticker { timeout, cancel = context.WithTimeout(ctx, timeoutDuration) var req = randomCreateRequest() var reqMsg = &pubsub.Message{} reqMsg.Data, _ = json.Marshal(req) fmt.Printf("Request Data: %s\n", base64.StdEncoding.EncodeToString(reqMsg.Data)) reqID, err := topic.Publish(timeout, reqMsg).Get(timeout) if err != nil { fmt.Printf("Error publishing request: %v\n", err) return } fmt.Printf("Request ID: %q\n", reqID) var resp chariot.ResponseJSON // EMULATOR FILTERING DOES NOT WORK! ARGHHHH. // See https://issuetracker.google.com/issues/170471468 for { select { case <-timeout.Done(): fmt.Printf("Exceeded timeout of %v\n", timeoutDuration) return case psresp := <-respCh: psresp.Ack() err := json.Unmarshal(psresp.Data, &resp) if err != nil { fmt.Printf("Error unmarshalling JSON: %v\n", err) return } } if resp.PubSubRequestID == reqID { break } } fmt.Printf("Response ID: %q\n", resp.PubSubRequestID) fmt.Printf("Response OK: %t\n", resp.Ok) if !resp.Ok { fmt.Printf("Response Error: %q\n", resp.Error) } cancel() } } func getResponsesChan(ctx context.Context, client *pubsub.Client) (<-chan *pubsub.Message, error) { var topic = client.Topic(responseTopic) if exists, err := topic.Exists(ctx); err != nil { fmt.Println("Error checking if topic exists:", err) return nil, err } else if !exists { fmt.Printf("Creating response topic: %q\n", responseTopic) topic, err = client.CreateTopic(ctx, responseTopic) if err != nil { return nil, err } } fmt.Printf("Using response topic: %q\n", responseTopic) // Create unique subscription for this owner. var filter = getFilter() fmt.Printf("Creating Subscription: %q\n", owner) fmt.Printf(" With Filter: %q\n", filter) sub, err := client.CreateSubscription(ctx, owner, pubsub.SubscriptionConfig{ Topic: topic, AckDeadline: time.Minute, Filter: filter, }) if err != nil { return nil, err } fmt.Println("Response subscription created:", owner) var ret = make(chan *pubsub.Message) //nolint:errcheck go sub.Receive(ctx, func(rctx context.Context, msg *pubsub.Message) { select { case <-rctx.Done(): return case ret <- msg: // message sent. } }) return ret, nil } func waitForSubscription(ctx context.Context, topic *pubsub.Topic) error { fmt.Printf("Waiting for chariot to subscribe.") for { subs := topic.Subscriptions(ctx) for { sub, err := subs.Next() if err == iterator.Done { break } if err != nil { return err } fmt.Printf("found subscription: %q\n", sub.ID()) return nil } select { case <-time.After(time.Second): fmt.Print(".") case <-ctx.Done(): return ctx.Err() } } } //nolint:gosec func randomCreateRequest() chariot.Request { var cluster string if rand.Int()%2 == 0 { cluster = fmt.Sprintf("cluster%d", 1+(rand.Int()%7)) } var req = chariot.Request{ Operation: "CREATE", Banner: bucket, Cluster: cluster, Owner: owner, } const apiVersion = "demo.chariot.one/v1alpha1" const maxObjects = 5 var numObjects = 1 + (rand.Int() % maxObjects) for i := 0; i < numObjects; i++ { // Determine object Kind var kinds = []string{"Cluster", "Namespace", "Tenant"} var kind = kinds[rand.Int()%len(kinds)] // Determine object Name var nameBytes [16]byte crypto.Read(nameBytes[:]) //nolint:errcheck var name = fmt.Sprintf("%x", nameBytes) // Determine object Namespace if any var namespaces = []string{"foo", "bar", "baz"} var namespace string if rand.Int()%2 == 0 { namespace = namespaces[rand.Int()%len(namespaces)] } var object struct { APIVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata struct { Name string `yaml:"name"` Namespace string `yaml:"namespace"` } `yaml:"metadata"` Spec struct { Foo string `yaml:"foo"` Bar string `yaml:"bar"` } `yaml:"spec"` } object.APIVersion = apiVersion object.Kind = kind object.Metadata.Name = name object.Metadata.Namespace = namespace object.Spec.Foo = "foo" object.Spec.Bar = "baz" var y, _ = yaml.Marshal(object) req.Objects = append(req.Objects, y) } // CHAOS. INTRODUCE CHAOS. // Have a 1/8th percent chance that an bad request is sent. if rand.Int()%8 == 0 { req.Operation = "CHAOS" } return req }