...

Source file src/github.com/twmb/franz-go/examples/requesting/request_metadata.go

Documentation: github.com/twmb/franz-go/examples/requesting

     1  package main
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"time"
     8  
     9  	"github.com/twmb/franz-go/pkg/kerr"
    10  	"github.com/twmb/franz-go/pkg/kgo"
    11  	"github.com/twmb/franz-go/pkg/kmsg"
    12  	"github.com/twmb/franz-go/pkg/kversion"
    13  )
    14  
    15  func main() {
    16  	seeds := []string{"localhost:9092"}
    17  	client, err := kgo.NewClient(
    18  		kgo.SeedBrokers(seeds...),
    19  
    20  		// Do not try to send requests newer than 2.4.0 to avoid breaking changes in the request struct.
    21  		// Sometimes there are breaking changes for newer versions where more properties are required to set.
    22  		kgo.MaxVersions(kversion.V2_4_0()),
    23  	)
    24  	if err != nil {
    25  		panic(err)
    26  	}
    27  	defer client.Close()
    28  
    29  	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
    30  	defer cancel()
    31  
    32  	// Construct message request and send it to Kafka.
    33  	//
    34  	// Because Go does not have RAII, and the Kafka protocol can add new fields
    35  	// at any time, all struct initializes from the kmsg package should use the
    36  	// NewXyz functions (or NewPtr for requests). These functions call "Default"
    37  	// before returning, which is forward compatible when Kafka adds new fields
    38  	// that require non-zero defaults.
    39  
    40  	// For the first request, we will create topic foo with 1 partition and
    41  	// a replication factor of 1.
    42  	{
    43  		req := kmsg.NewPtrCreateTopicsRequest()
    44  		topic := kmsg.NewCreateTopicsRequestTopic()
    45  		topic.Topic = "foo"
    46  		topic.NumPartitions = 1
    47  		topic.ReplicationFactor = 1
    48  		req.Topics = append(req.Topics, topic)
    49  
    50  		res, err := req.RequestWith(ctx, client)
    51  		if err != nil {
    52  			// Error during request has happened (e. g. context cancelled)
    53  			panic(err)
    54  		}
    55  
    56  		if len(res.Topics) != 1 {
    57  			panic(fmt.Sprintf("expected one topic in response, saw %d", len(res.Topics)))
    58  		}
    59  		t := res.Topics[0]
    60  
    61  		if err := kerr.ErrorForCode(t.ErrorCode); err != nil {
    62  			fmt.Fprintf(os.Stderr, "topic creation failure: %v", err)
    63  			return
    64  		}
    65  		fmt.Printf("topic %s created successfully!", t.Topic)
    66  	}
    67  
    68  	// Now we will issue a metadata request to see that topic.
    69  	{
    70  		req := kmsg.NewPtrMetadataRequest()
    71  		topic := kmsg.NewMetadataRequestTopic()
    72  		topic.Topic = kmsg.StringPtr("foo")
    73  		req.Topics = append(req.Topics, topic)
    74  
    75  		res, err := req.RequestWith(ctx, client)
    76  		if err != nil {
    77  			panic(err)
    78  		}
    79  
    80  		// Check response for Kafka error codes and print them.
    81  		// Other requests might have top level error codes, which indicate completed but failed requests.
    82  		for _, topic := range res.Topics {
    83  			err := kerr.ErrorForCode(topic.ErrorCode)
    84  			if err != nil {
    85  				fmt.Printf("topic %v response has errored: %v\n", topic.Topic, err.Error())
    86  			}
    87  		}
    88  
    89  		fmt.Printf("received '%v' topics and '%v' brokers", len(res.Topics), len(res.Brokers))
    90  	}
    91  }
    92  

View as plain text