...

Text file src/github.com/twmb/franz-go/docs/transactions.md

Documentation: github.com/twmb/franz-go/docs

     1Transactions
     2===
     3
     4The `kgo` package supports transactional producing, consuming only committed
     5records, and the EOS consumer/producer.
     6
     7For an example of the transactional producer and consuming committed offsets,
     8see [here](../examples/transactions/produce_and_consume). The only real catch
     9to worry about when producing is to make sure that you commit or abort
    10appropriately, and for consuming, make sure you use the
    11[`FetchIsolationLevel`][1] option with [`ReadCommitted`][2] option.
    12
    13[1]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#FetchIsolationLevel
    14[2]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#ReadCommitted
    15
    16For an example of the EOS consumer/producer, see
    17[here](../examples/transactions/eos). Because EOS requires much more care to
    18ensure things operate correctly, there exists a [`GroupTransactSession`][3] helper
    19type to manage everything for you. Basically, to help prevent any duplicate
    20processing, this helper type sets its internal state to abort whenever
    21necessary (specifically, when a group rebalance happens). This may occasionally
    22lead to extra work, but it should prevent consuming, modifying, producing, and
    23_committing_ a record twice.
    24
    25[3]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#GroupTransactSession
    26
    27KIP-447?
    28===
    29
    30## The problem
    31 
    32[KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics)
    33bills itself as producer scalability for exactly once semantics. This
    34is a KIP to add more safety to EOS.
    35
    36Before KIP-447, Kafka Streams was implemented to consume from only one
    37partition, modify records it consumed, and produce back to a new topic. Streams
    38_could not_ consume from multiple partitions as a part of a consumer group,
    39because a rebalance could cause input partitions to move around unsafely.
    40
    41As an example of the problem, let's say we have two EOS consumers, A and B,
    42both of which can consume partitions 1 and 2. Both partitions are currently
    43assigned to A, and A is consuming, modifying, and producing back as a part of
    44its EOS flow. A rebalance happens, and partition 2 moves to consumer B. At this
    45point, A may have processed some records and not yet issued a `TxnOffsetCommit`
    46request. B will see the old commit and begin consuming, which will reprocess
    47records. B will produce and eventually commit, and there will be duplicates.
    48At any point, A may eventually commit, but it is already too late. Duplicates
    49have been processed, and A never knew
    50
    51Confluent released a [blog
    52post](https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification)
    53describing how KIP-447 makes it possible to consume from multiple partitions as
    54a part of EOS, and walks through an example of how things were problematic
    55before. Specifically, it proposes the following scenario, which I copy here:
    56
    57```
    58Two Kafka consumers C1 and C2 each integrate with transactional producers P1
    59and P2, each identified by transactional ID T1 and T2, respectively. They
    60process data from two input topic partitions tp-0 and tp-1. 
    61
    62At the beginning, the consumer group has the following assignments: 
    63(C1, P1 [T1]): tp-0 
    64(C2, P2 [T2]): tp-1 
    65
    66P2 commits one transaction that pushes the current offset of tp-1 to 5. Next,
    67P2 opens another transaction on tp-1, processes data up to offset 10, and
    68begins committing. Before the transaction completes, it crashes, and the group
    69rebalances the partition assignment: 
    70
    71(C1, P1 [T1]): tp-0, tp-1 
    72(C2, P2 [T2]): None 
    73
    74Since there is no such static mapping of T1 to partition tp-0 and T2 to
    75partition tp-1, P1 proceeds to start its transaction against tp-1 (using its
    76own transactional ID T1) without waiting for the pending transaction to
    77complete. It reads from last committed offset 5 instead of 10 on tp-1 while the
    78previous transaction associated with T0 is still ongoing completion and causes
    79duplicate processing.
    80```
    81
    82Fundamentally, this example is missing one hidden detail: P2 did not complete
    83its transaction, so there actually are no duplicate records _at the end_ once
    84P1 reprocesses offsets 5 to 10. Duplicates only arise if P2 comes back alive
    85and finishes its commit before the transactional timeout. It's tough to imagine
    86this scenario truly happening; more realistic is if P1 loses connectivity for a
    87blip of time and then later reconnects to commit.
    88
    89## The franz-go approach
    90
    91The franz-go client supports KIP-447, but allows consuming multiple partitions
    92as an EOS consumer/producer even on older (pre 2.5) Kafka clusters. There is
    93a very small risk of duplicates with the approach this client chooses, you can
    94read on below for more details. Alternatively, you can use this client exactly
    95like the Java client, but as with the Java client, this requires extra special
    96care.
    97
    98To start, unlike the Java client, franz-go does not require a separate client
    99and producer. Instead, both are merged into one "client", and by merging them,
   100the producer knows the state of the consumer at all times. Importantly, this
   101means that **if the consumer is revoked, the producer knows it, and the
   102producer will only allow an abort at the end of the transaction**. This mostly
   103solves the problem, but more remains.
   104
   105It is possible that the consumer has lost connectivity to the cluster, and so
   106the consumer does not actually know whether or not it is still in the group.
   107For example, if heartbeat requests are hanging, the consumer could have been
   108kicked from the group by Kafka. If we allow a transaction commit at this point,
   109then we will again recreate the problem: we will commit offsets when we should
   110not, and then we will commit the transaction. To work around this, the franz-go
   111client forces a successful heartbeat (and a successful response) immediately
   112before committing the transaction. **If a heartbeat immediately before
   113committing is successful, then we know we can commit within the session
   114timeout**. Still, more remains.
   115
   116Even if we commit immediately before ending a transaction, it is possible that
   117our commit will take so long that a rebalance happens before the commit
   118finishes. For example, say `EndTxn` is about to happen, and then every request
   119gets stuck in limbo. The consumer is booted, and then `EndTxn` completes. This
   120again recreates our problematic scenario. To work around this, the franz-go
   121client defaults the transactional timeout to be less than the group session
   122timeout. With this, then we have the following order of events:
   123
   1241) we begin a transaction  
   1252) we know that we are still in the group  
   1263) either we end the transaction, or we hang long enough that the transaction timeout expires _before_ the member will be booted  
   127
   128By having the transactional timeout strictly less than the session timeout,
   129we know that even if requests hang after our successful heartbeat, then
   130the transaction will be timed out before a rebalance happens.
   131
   132If a rebalance happens while committing, the OnPartitionsRevoked callback is
   133blocked until the `EndTxn` request completes, meaning either the `EndTxn` will
   134complete successfully before the member is allowed to rebalance, or the
   135`EndTxn` will hang long enough for the member to be booted. In either scenario,
   136we avoid our problem. Again though, more remains.
   137
   138After `EndTxn`, it is possible that a rebalance could immediately happen.
   139Within Kafka when a transaction ends, Kafka propagates a commit marker to all
   140partitions that were a part of the transaction. If a rebalance finishes and the
   141new consumer fetches offsets _before_ the commit marker is propagated, then the
   142new consumer will fetch the previously committed offsets, not the newly
   143committed offsets. There is nothing a client can do to reliably prevent this
   144scenario. Here, franz-go takes a heuristic approach: the assumption is that
   145inter-broker communication is always inevitably faster than broker `<=>` client
   146communication. On successful commit, if the client is not speaking to a 2.5+
   147cluster (KIP-447 cluster) _or_ the client does not have
   148`RequireStableFetchOffsets` enabled, then the client will sleep 200ms before
   149releasing the lock that allows a rebalance to continue. The assumption is that
   150200ms is enough time for Kafka to propagate transactional markers: the
   151propagation should finish before a client is able to do the following: re-join,
   152have a new leader assign partitions, sync the assignment, and issue the offset
   153fetch request. In effect, the 200ms here is an attempt to provide KIP-447
   154semantics (waiting for stable fetch offsets) in place it matters most even
   155though the cluster does not support the wait officially. Internally, the sleep
   156is concurrent and only blocks a rebalance from beginning, it does not block
   157you from starting a new transaction (but, it does prevent you from _ending_
   158a new transaction).
   159
   160One last flaw of the above approach is that a lot of it is dependent on timing.
   161If the servers you are running on do not have reliable clocks and may be very
   162out of sync, then the timing aspects above may not work. However, it is likely
   163your cluster will have other issues if some broker clocks are very off. It is
   164recommended to have alerts on ntp clock drift.
   165
   166Thus, although we do support 2.5+ behavior, the client itself works around
   167duplicates in a pre-2.5 world with a lot of edge case handling. It is
   168_strongly_ recommended to use a 2.5+ cluster and to always enable
   169`RequireStableFetchOffsets`. The option itself has more documentation on
   170what other settings may need to be tweaked.

View as plain text