...
1 package main
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "time"
8
9 "github.com/twmb/franz-go/pkg/kerr"
10 "github.com/twmb/franz-go/pkg/kgo"
11 "github.com/twmb/franz-go/pkg/kmsg"
12 "github.com/twmb/franz-go/pkg/kversion"
13 )
14
15 func main() {
16 seeds := []string{"localhost:9092"}
17 client, err := kgo.NewClient(
18 kgo.SeedBrokers(seeds...),
19
20
21
22 kgo.MaxVersions(kversion.V2_4_0()),
23 )
24 if err != nil {
25 panic(err)
26 }
27 defer client.Close()
28
29 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
30 defer cancel()
31
32
33
34
35
36
37
38
39
40
41
42 {
43 req := kmsg.NewPtrCreateTopicsRequest()
44 topic := kmsg.NewCreateTopicsRequestTopic()
45 topic.Topic = "foo"
46 topic.NumPartitions = 1
47 topic.ReplicationFactor = 1
48 req.Topics = append(req.Topics, topic)
49
50 res, err := req.RequestWith(ctx, client)
51 if err != nil {
52
53 panic(err)
54 }
55
56 if len(res.Topics) != 1 {
57 panic(fmt.Sprintf("expected one topic in response, saw %d", len(res.Topics)))
58 }
59 t := res.Topics[0]
60
61 if err := kerr.ErrorForCode(t.ErrorCode); err != nil {
62 fmt.Fprintf(os.Stderr, "topic creation failure: %v", err)
63 return
64 }
65 fmt.Printf("topic %s created successfully!", t.Topic)
66 }
67
68
69 {
70 req := kmsg.NewPtrMetadataRequest()
71 topic := kmsg.NewMetadataRequestTopic()
72 topic.Topic = kmsg.StringPtr("foo")
73 req.Topics = append(req.Topics, topic)
74
75 res, err := req.RequestWith(ctx, client)
76 if err != nil {
77 panic(err)
78 }
79
80
81
82 for _, topic := range res.Topics {
83 err := kerr.ErrorForCode(topic.ErrorCode)
84 if err != nil {
85 fmt.Printf("topic %v response has errored: %v\n", topic.Topic, err.Error())
86 }
87 }
88
89 fmt.Printf("received '%v' topics and '%v' brokers", len(res.Topics), len(res.Brokers))
90 }
91 }
92
View as plain text