1 package main
2
3 import (
4 "context"
5 crypto "crypto/rand"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "math/rand"
10 "os"
11 "time"
12
13 "cloud.google.com/go/pubsub"
14 "google.golang.org/api/iterator"
15 "gopkg.in/yaml.v2"
16
17 "edge-infra.dev/pkg/edge/chariot"
18 )
19
20 var (
21 requestRate = os.Getenv("DEMO_REQUEST_RATE")
22 bucket = os.Getenv("DEMO_BUCKET_NAME")
23 projectID = os.Getenv("GOOGLE_CLOUD_PROJECT_ID")
24 requestTopic = os.Getenv("PUBSUB_TOPIC_ID")
25 requestSub = os.Getenv("PUBSUB_SUBSCRIPTION_ID")
26 responseTopic = os.Getenv("PUBSUB_RESPONSE_TOPIC_ID")
27 )
28
29 var ticker <-chan time.Time
30
31 func init() {
32 reqr, err := time.ParseDuration(requestRate)
33 if err != nil {
34 fmt.Printf("Could not parse duration: %q %v\n", requestRate, err)
35 reqr = 200 * time.Millisecond
36 }
37 ticker = time.NewTicker(reqr).C
38 }
39
40 var owner string
41
42 func init() {
43 var ownerB [8]byte
44 crypto.Read(ownerB[:])
45 owner = fmt.Sprintf("o%x", ownerB)
46 }
47
48 func getFilter() string {
49 return fmt.Sprintf("attributes.owner == %q", owner)
50 }
51
52 func main() {
53 fmt.Println("Chariot Request Producer Demo")
54 fmt.Printf("Request Rate: %q\n", requestRate)
55 fmt.Printf("Owner: %q\n", owner)
56 fmt.Printf("Project ID: %q\n", projectID)
57 fmt.Printf("Topic ID: %q\n", requestTopic)
58 fmt.Printf("Subscription ID: %q\n", requestSub)
59 fmt.Printf("Google Cloud Storage Bucket: %q\n", bucket)
60
61 ctx := context.Background()
62
63 client, err := pubsub.NewClient(ctx, projectID)
64 if err != nil {
65 fmt.Println("Error creating pubsub client:", err)
66 return
67 }
68
69 respCh, err := getResponsesChan(context.Background(), client)
70 _, _ = respCh, err
71
72 var topic = client.Topic(requestTopic)
73 if exists, err := topic.Exists(ctx); err != nil {
74 fmt.Printf("Error checking if request topic %q exists: %v\n", requestTopic, err)
75 return
76 } else if !exists {
77 topic, err = client.CreateTopic(ctx, requestTopic)
78 if err != nil {
79 fmt.Printf("Error creating request topic %q. Got error: %v\n", requestTopic, err)
80 return
81 }
82 }
83
84
85 const timeoutDur = 30 * time.Minute
86 timeout, cancel := context.WithTimeout(ctx, timeoutDur)
87 if err = waitForSubscription(timeout, topic); err != nil {
88 fmt.Printf("Timeout %v exceeded waiting for chariot to subscribe to the request topic\n", timeoutDur)
89 return
90 }
91 cancel()
92
93
94 const timeoutDuration = time.Minute
95 for range ticker {
96 timeout, cancel = context.WithTimeout(ctx, timeoutDuration)
97 var req = randomCreateRequest()
98 var reqMsg = &pubsub.Message{}
99 reqMsg.Data, _ = json.Marshal(req)
100 fmt.Printf("Request Data: %s\n", base64.StdEncoding.EncodeToString(reqMsg.Data))
101 reqID, err := topic.Publish(timeout, reqMsg).Get(timeout)
102 if err != nil {
103 fmt.Printf("Error publishing request: %v\n", err)
104 return
105 }
106 fmt.Printf("Request ID: %q\n", reqID)
107
108 var resp chariot.ResponseJSON
109
110
111
112 for {
113 select {
114 case <-timeout.Done():
115 fmt.Printf("Exceeded timeout of %v\n", timeoutDuration)
116 return
117 case psresp := <-respCh:
118 psresp.Ack()
119 err := json.Unmarshal(psresp.Data, &resp)
120 if err != nil {
121 fmt.Printf("Error unmarshalling JSON: %v\n", err)
122 return
123 }
124 }
125
126 if resp.PubSubRequestID == reqID {
127 break
128 }
129 }
130
131 fmt.Printf("Response ID: %q\n", resp.PubSubRequestID)
132 fmt.Printf("Response OK: %t\n", resp.Ok)
133 if !resp.Ok {
134 fmt.Printf("Response Error: %q\n", resp.Error)
135 }
136 cancel()
137 }
138 }
139
140 func getResponsesChan(ctx context.Context, client *pubsub.Client) (<-chan *pubsub.Message, error) {
141 var topic = client.Topic(responseTopic)
142 if exists, err := topic.Exists(ctx); err != nil {
143 fmt.Println("Error checking if topic exists:", err)
144 return nil, err
145 } else if !exists {
146 fmt.Printf("Creating response topic: %q\n", responseTopic)
147 topic, err = client.CreateTopic(ctx, responseTopic)
148 if err != nil {
149 return nil, err
150 }
151 }
152 fmt.Printf("Using response topic: %q\n", responseTopic)
153
154
155 var filter = getFilter()
156 fmt.Printf("Creating Subscription: %q\n", owner)
157 fmt.Printf(" With Filter: %q\n", filter)
158 sub, err := client.CreateSubscription(ctx, owner, pubsub.SubscriptionConfig{
159 Topic: topic,
160 AckDeadline: time.Minute,
161 Filter: filter,
162 })
163 if err != nil {
164 return nil, err
165 }
166 fmt.Println("Response subscription created:", owner)
167
168 var ret = make(chan *pubsub.Message)
169
170 go sub.Receive(ctx, func(rctx context.Context, msg *pubsub.Message) {
171 select {
172 case <-rctx.Done():
173 return
174 case ret <- msg:
175
176 }
177 })
178 return ret, nil
179 }
180
181 func waitForSubscription(ctx context.Context, topic *pubsub.Topic) error {
182 fmt.Printf("Waiting for chariot to subscribe.")
183 for {
184 subs := topic.Subscriptions(ctx)
185 for {
186 sub, err := subs.Next()
187 if err == iterator.Done {
188 break
189 }
190 if err != nil {
191 return err
192 }
193 fmt.Printf("found subscription: %q\n", sub.ID())
194 return nil
195 }
196
197 select {
198 case <-time.After(time.Second):
199 fmt.Print(".")
200 case <-ctx.Done():
201 return ctx.Err()
202 }
203 }
204 }
205
206
207 func randomCreateRequest() chariot.Request {
208 var cluster string
209 if rand.Int()%2 == 0 {
210 cluster = fmt.Sprintf("cluster%d", 1+(rand.Int()%7))
211 }
212
213 var req = chariot.Request{
214 Operation: "CREATE",
215 Banner: bucket,
216 Cluster: cluster,
217 Owner: owner,
218 }
219
220 const apiVersion = "demo.chariot.one/v1alpha1"
221
222 const maxObjects = 5
223 var numObjects = 1 + (rand.Int() % maxObjects)
224 for i := 0; i < numObjects; i++ {
225
226 var kinds = []string{"Cluster", "Namespace", "Tenant"}
227 var kind = kinds[rand.Int()%len(kinds)]
228
229
230 var nameBytes [16]byte
231 crypto.Read(nameBytes[:])
232 var name = fmt.Sprintf("%x", nameBytes)
233
234
235 var namespaces = []string{"foo", "bar", "baz"}
236 var namespace string
237 if rand.Int()%2 == 0 {
238 namespace = namespaces[rand.Int()%len(namespaces)]
239 }
240
241 var object struct {
242 APIVersion string `yaml:"apiVersion"`
243 Kind string `yaml:"kind"`
244 Metadata struct {
245 Name string `yaml:"name"`
246 Namespace string `yaml:"namespace"`
247 } `yaml:"metadata"`
248 Spec struct {
249 Foo string `yaml:"foo"`
250 Bar string `yaml:"bar"`
251 } `yaml:"spec"`
252 }
253 object.APIVersion = apiVersion
254 object.Kind = kind
255 object.Metadata.Name = name
256 object.Metadata.Namespace = namespace
257 object.Spec.Foo = "foo"
258 object.Spec.Bar = "baz"
259
260 var y, _ = yaml.Marshal(object)
261 req.Objects = append(req.Objects, y)
262 }
263
264
265
266 if rand.Int()%8 == 0 {
267 req.Operation = "CHAOS"
268 }
269
270 return req
271 }
272
View as plain text