...

Text file src/github.com/twmb/franz-go/docs/producing-and-consuming.md

Documentation: github.com/twmb/franz-go/docs

     1Producing and consuming
     2===
     3
     4This document describes at a high level how producing and consuming works, and
     5links to pkg.go.dev documentation for you to understand more on the mentioned
     6functions/methods and how they can be used.
     7
     8Code for both producing and consuming can be seen in the examples directory,
     9particularly in the transaction examples.
    10
    11## Producing
    12
    13The client provides three methods to produce, [`Produce`][1],
    14[`ProduceSync`][2], and [`TryProduce`][TryProduce]. The first allows for
    15asynchronous production, the second for synchronous, and the third for async
    16while also failing a record immediately if the [maximum records][max_records]
    17are buffered. These methods are also available on [`GroupTransactSession`][3]
    18if you are using that for EOS.
    19
    20Everything is produced through a [`Record`][4]. You can produce to multiple
    21topics by creating a record in full (i.e., with a `Value` or `Key` or
    22`Headers`) and then setting the `Topic` field, but if you are only ever
    23producing to one topic, you can use the client's `DefaultProduceTopic` option.
    24You can still use this option even when producing to multiple topics; the
    25option only applies to records that have an empty topic.
    26
    27There exist a few small helpers to create records out of slices or strings:
    28
    29* [`kgo.StringRecord`][5]
    30* [`kgo.KeyStringRecord`][6]
    31* [`kgo.SliceRecord`][7]
    32* [`kgo.KeySliceRecord`][8]
    33
    34The string functions should only be used if you do not touch the `Key` and
    35`Value` fields after the record is created, because the string functions use
    36the `unsafe` package to convert the strings to slices without allocating.
    37
    38Lastly, if you are producing asynchronously in batches and only want to know
    39whether the batch errored at all, there exists a [`FirstErrPromise`][9] type to
    40help eliminate promise boilerplate.
    41
    42[1]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.Produce
    43[2]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.ProduceSync
    44[3]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#GroupTransactSession
    45[4]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Record
    46[5]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#StringRecord
    47[6]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#KeyStringRecord
    48[7]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#SliceRecord
    49[8]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#KeySliceRecord
    50[9]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#FirstErrPromise
    51[TryProduce]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.TryProduce
    52[max_records]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#MaxBufferedRecords
    53
    54### Record reliability
    55
    56By default, kgo uses idempotent production. This can be disabled with the
    57[`DisableIdempotentWrite`][10] option, but this should really only be necessary
    58if you want to produce with no ack required or with only leader acks required.
    59
    60[10]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#DisableIdempotentWrite
    61
    62The default is to always retry records forever, but this can be dropped with
    63the [`RecordRetries`][11] and [`RecordDeliveryTimeout`][12] options, as well as with
    64the context that you use for producing a record. A record will only be aborted
    65if it is safe to do so without messing up the client's sequence numbers. Thus,
    66a record can only be aborted if it has never been produced or if it knows that
    67it received a successful response from its last produce attempt (even if that
    68response indicated an error on that partition). If a record is ever failed, all
    69records buffered on the same partition are failed.
    70
    71[11]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#RecordRetries
    72[12]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#RecordDeliveryTimeout
    73
    74### Exactly once semantics
    75
    76As mentioned above, kgo supports EOS. Because there are a lot of corner cases
    77around transactions, this client favors a "if we maybe should abort, abort"
    78approach. This client provides a `GroupTransactSession` type that is used
    79to manage consume-modify-produce transactions. Any time it is possible that
    80the transaction should be aborted, the session sets its internal abort state.
    81This may mean you will end up re-processing records more than necessary, but
    82in general this should only happen on group rebalances, which should be rare.
    83
    84Producer-only transactions are also supported. This is just a simple extension
    85of the idempotent producer except with a manual begin transaction and end
    86transaction call whenever appropriate.
    87
    88To see more documentation about transactions and EOS, see the
    89[transactions](./transactions.md) page.
    90
    91### Latency
    92
    93Producer latency can be modified by adding a linger. By default, there is no
    94linger and records are sent as soon as they are published. In a high throughput
    95scenario, this is fine and will not lead to single-record batches, but in a low
    96throughput scenario it may be worth it to add lingering.
    97
    98As well, it is possible to completely disable auto-flushing and instead only
    99have manual flushes with the
   100[`ManualFlushing`](https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#ManualFlushing)
   101option. This allows you to buffer as much as you want before flushing in one
   102go. However, with this option, you likely want to consider the
   103[`MaxBufferedRecords`](https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#MaxBufferedRecords)
   104option.
   105
   106## Consuming
   107
   108franz-go supports consuming partitions directly, consuming as a part of a
   109consumer group, and consuming in a group for EOS. Consuming can also be done
   110via regex to match certain topics to consume.
   111
   112To consume partitions directly, use [`ConsumeTopics`][13] or [`ConsumePartitions`][a]. Otherwise, use
   113`ConsumeTopics` with [`ConsumerGroup`][14] for group consuming or [`NewGroupTransactSession`][15]
   114for group consuming for EOS.
   115
   116[13]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#ConsumeTopics
   117[a]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#ConsumePartitions
   118[14]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#ConsumerGroup
   119[15]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#NewGroupTransactSession
   120
   121### Consumer groups
   122
   123The default consumer group balancer is the new "cooperative-sticky" balancer.
   124This is **not compatible** with historical balancers (sticky, range, roundrobin).
   125If you wish to use this client with another client that uses a historical balancer,
   126you must set the balancers option.
   127
   128By default, the group consumer will autocommit every 5s, commit whenever a
   129rebalance happens (in [`OnPartitionsRevoked`][16]), and will issue a blocking
   130commit when leaving the group. For most purposes, this can suffice. The default
   131commit logs any errors encountered, but this can be overridden with the
   132[`AutoCommitCallback`][17] option or by disabling autocommit and instead committing
   133yourself.
   134
   135[16]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnPartitionsRevoked
   136[17]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#AutoCommitCallback
   137
   138#### Offset management
   139
   140Unlike Sarama or really most Kafka clients, this client manages the consumer
   141group **completely independently** from consuming itself. More to the point, a
   142revoke can happen **at any time** and if you need to stop consuming or do some
   143cleanup on a revoke, you must set a callback that will **not return** until you
   144are ready for the group to be rejoined. Even more to the point, if you are
   145manually committing offsets, you **must** commit in your `OnPartitionsRevoked`,
   146or you must abandon your work after the revoke finishes, because otherwise you
   147may be working on partitions that moved to another client. When you are **done
   148consuming**, before you shut down, you must perform a blocking commit. If you
   149rely on the default options and do not commit yourself, all of this is
   150automatically handled.
   151
   152Alternatively, you can use the [`BlockRebalanceOnPoll`][BROP] option in
   153combination with [`AllowRebalance`][AR] to ensure rebalance cannot happen after
   154you poll until you explicitly allow it. This option is much easier to reason
   155about, but has a risk if processing your poll takes so long that a rebalance
   156started and finished and you were kicked from the group. If you use this option
   157and this API, it is recommended to take care and use [`PollRecords`][PR] and
   158ensure your [`RebalanceTimeout`][RT] is long enough to encompass any processing you
   159do between polls. My recommendation is to block rebalance on poll, ensure your
   160processing is quick, and to use [`CommitUncommittedOffsets`][CUO].
   161
   162[BROP]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#BlockRebalanceOnPoll
   163[AR]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.AllowRebalance
   164[PR]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.PollRecords
   165[RT]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#RebalanceTimeout
   166[CUO]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.CommitUncommittedOffsets
   167
   168##### Direct offset management outside of a group
   169
   170You can use the `ConsumePartitions` option to assign partitions manually and
   171consume outside of the context of a group. If you want to use Kafka to manage
   172group offsets even with direct partition assignment, this repo provides a
   173`kadm` package to easily manage offsets via an admin interface. Check the
   174[`manual_committing`](../examples/manual_committing) example to see some
   175example code for how to do this.
   176
   177##### Without transactions
   178
   179There are two easy patterns to success for offset management in a normal
   180consumer group.
   181
   182First and the most recommended option, you can just rely on the default
   183autocommitting behavior and the default blocking commit on leave. At most, you
   184may want to use your own custom commit callback.
   185
   186Alternatively, you can disable autocommitting with [`DisableAutoCommit`][19]
   187and instead use a custom `OnPartitionsRevoked`.
   188
   189[19]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#DisableAutoCommit
   190
   191In your custom revoke, you can guard a revoked variable with a mutex. Before
   192committing, check this revoked variable and do not commit if it has been set.
   193For some hints as to how to do this properly, check how
   194[`GroupTransactSession`][20] is implemented (albeit it is more complicated due
   195to handling transactions).
   196
   197[20]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#GroupTransactSession
   198
   199##### With transactions
   200
   201Because an EOS consumer is difficult to implement correctly, all details have
   202been abstracted away to a [`GroupTransactSession`][20] type. See the
   203[transactions](./transactions.md) page for more details.
   204
   205### The cooperative balancer
   206
   207Kafka 2.4.0 introduced support for [KIP-429][21], the incremental rebalancing
   208protocol. This allows consumers to continue fetching records **during** a
   209rebalance, effectively eliminating the stop the world aspect of rebalancing.
   210However, while the Java client introduced this in Kafka 2.4.0, the balancer
   211is not actually dependent on that Kafka version.
   212
   213[21]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
   214
   215This client has support for KIP-429 and in fact defaults to cooperative
   216consuming. Cooperative consuming is not compatible with clients using the
   217historical consumer group strategies, and if you plan to use kgo with these
   218historical clients, you need to set the balancers appropriately.
   219
   220Cooperative rebalancing allows a client to continue fetching during rebalances,
   221even during transactions. For transactions, a transact session will only be
   222aborted if the member has partitions revoked.
   223
   224### Static membership
   225
   226Kafka 2.4.0 also introduced support for [KIP-345][22], the "static" member
   227concept for consumer group members. This is a relatively simple concept that
   228basically just means that group members must be managed out of band from the
   229client, whereas historically, member IDs were newly determined every time a
   230client connected.
   231
   232[22]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
   233
   234Static membership avoids unnecessary partition migration during rebalances and
   235conveys a host of other benefits; see the KIP for more details. To use static
   236membership, your cluster must be at least 2.4.0, and you can use the
   237[`InstanceID`][23] option.
   238
   239[23]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#InstanceID

View as plain text