1Transactions
2===
3
4The `kgo` package supports transactional producing, consuming only committed
5records, and the EOS consumer/producer.
6
7For an example of the transactional producer and consuming committed offsets,
8see [here](../examples/transactions/produce_and_consume). The only real catch
9to worry about when producing is to make sure that you commit or abort
10appropriately, and for consuming, make sure you use the
11[`FetchIsolationLevel`][1] option with [`ReadCommitted`][2] option.
12
13[1]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#FetchIsolationLevel
14[2]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#ReadCommitted
15
16For an example of the EOS consumer/producer, see
17[here](../examples/transactions/eos). Because EOS requires much more care to
18ensure things operate correctly, there exists a [`GroupTransactSession`][3] helper
19type to manage everything for you. Basically, to help prevent any duplicate
20processing, this helper type sets its internal state to abort whenever
21necessary (specifically, when a group rebalance happens). This may occasionally
22lead to extra work, but it should prevent consuming, modifying, producing, and
23_committing_ a record twice.
24
25[3]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#GroupTransactSession
26
27KIP-447?
28===
29
30## The problem
31
32[KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics)
33bills itself as producer scalability for exactly once semantics. This
34is a KIP to add more safety to EOS.
35
36Before KIP-447, Kafka Streams was implemented to consume from only one
37partition, modify records it consumed, and produce back to a new topic. Streams
38_could not_ consume from multiple partitions as a part of a consumer group,
39because a rebalance could cause input partitions to move around unsafely.
40
41As an example of the problem, let's say we have two EOS consumers, A and B,
42both of which can consume partitions 1 and 2. Both partitions are currently
43assigned to A, and A is consuming, modifying, and producing back as a part of
44its EOS flow. A rebalance happens, and partition 2 moves to consumer B. At this
45point, A may have processed some records and not yet issued a `TxnOffsetCommit`
46request. B will see the old commit and begin consuming, which will reprocess
47records. B will produce and eventually commit, and there will be duplicates.
48At any point, A may eventually commit, but it is already too late. Duplicates
49have been processed, and A never knew
50
51Confluent released a [blog
52post](https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification)
53describing how KIP-447 makes it possible to consume from multiple partitions as
54a part of EOS, and walks through an example of how things were problematic
55before. Specifically, it proposes the following scenario, which I copy here:
56
57```
58Two Kafka consumers C1 and C2 each integrate with transactional producers P1
59and P2, each identified by transactional ID T1 and T2, respectively. They
60process data from two input topic partitions tp-0 and tp-1.
61
62At the beginning, the consumer group has the following assignments:
63(C1, P1 [T1]): tp-0
64(C2, P2 [T2]): tp-1
65
66P2 commits one transaction that pushes the current offset of tp-1 to 5. Next,
67P2 opens another transaction on tp-1, processes data up to offset 10, and
68begins committing. Before the transaction completes, it crashes, and the group
69rebalances the partition assignment:
70
71(C1, P1 [T1]): tp-0, tp-1
72(C2, P2 [T2]): None
73
74Since there is no such static mapping of T1 to partition tp-0 and T2 to
75partition tp-1, P1 proceeds to start its transaction against tp-1 (using its
76own transactional ID T1) without waiting for the pending transaction to
77complete. It reads from last committed offset 5 instead of 10 on tp-1 while the
78previous transaction associated with T0 is still ongoing completion and causes
79duplicate processing.
80```
81
82Fundamentally, this example is missing one hidden detail: P2 did not complete
83its transaction, so there actually are no duplicate records _at the end_ once
84P1 reprocesses offsets 5 to 10. Duplicates only arise if P2 comes back alive
85and finishes its commit before the transactional timeout. It's tough to imagine
86this scenario truly happening; more realistic is if P1 loses connectivity for a
87blip of time and then later reconnects to commit.
88
89## The franz-go approach
90
91The franz-go client supports KIP-447, but allows consuming multiple partitions
92as an EOS consumer/producer even on older (pre 2.5) Kafka clusters. There is
93a very small risk of duplicates with the approach this client chooses, you can
94read on below for more details. Alternatively, you can use this client exactly
95like the Java client, but as with the Java client, this requires extra special
96care.
97
98To start, unlike the Java client, franz-go does not require a separate client
99and producer. Instead, both are merged into one "client", and by merging them,
100the producer knows the state of the consumer at all times. Importantly, this
101means that **if the consumer is revoked, the producer knows it, and the
102producer will only allow an abort at the end of the transaction**. This mostly
103solves the problem, but more remains.
104
105It is possible that the consumer has lost connectivity to the cluster, and so
106the consumer does not actually know whether or not it is still in the group.
107For example, if heartbeat requests are hanging, the consumer could have been
108kicked from the group by Kafka. If we allow a transaction commit at this point,
109then we will again recreate the problem: we will commit offsets when we should
110not, and then we will commit the transaction. To work around this, the franz-go
111client forces a successful heartbeat (and a successful response) immediately
112before committing the transaction. **If a heartbeat immediately before
113committing is successful, then we know we can commit within the session
114timeout**. Still, more remains.
115
116Even if we commit immediately before ending a transaction, it is possible that
117our commit will take so long that a rebalance happens before the commit
118finishes. For example, say `EndTxn` is about to happen, and then every request
119gets stuck in limbo. The consumer is booted, and then `EndTxn` completes. This
120again recreates our problematic scenario. To work around this, the franz-go
121client defaults the transactional timeout to be less than the group session
122timeout. With this, then we have the following order of events:
123
1241) we begin a transaction
1252) we know that we are still in the group
1263) either we end the transaction, or we hang long enough that the transaction timeout expires _before_ the member will be booted
127
128By having the transactional timeout strictly less than the session timeout,
129we know that even if requests hang after our successful heartbeat, then
130the transaction will be timed out before a rebalance happens.
131
132If a rebalance happens while committing, the OnPartitionsRevoked callback is
133blocked until the `EndTxn` request completes, meaning either the `EndTxn` will
134complete successfully before the member is allowed to rebalance, or the
135`EndTxn` will hang long enough for the member to be booted. In either scenario,
136we avoid our problem. Again though, more remains.
137
138After `EndTxn`, it is possible that a rebalance could immediately happen.
139Within Kafka when a transaction ends, Kafka propagates a commit marker to all
140partitions that were a part of the transaction. If a rebalance finishes and the
141new consumer fetches offsets _before_ the commit marker is propagated, then the
142new consumer will fetch the previously committed offsets, not the newly
143committed offsets. There is nothing a client can do to reliably prevent this
144scenario. Here, franz-go takes a heuristic approach: the assumption is that
145inter-broker communication is always inevitably faster than broker `<=>` client
146communication. On successful commit, if the client is not speaking to a 2.5+
147cluster (KIP-447 cluster) _or_ the client does not have
148`RequireStableFetchOffsets` enabled, then the client will sleep 200ms before
149releasing the lock that allows a rebalance to continue. The assumption is that
150200ms is enough time for Kafka to propagate transactional markers: the
151propagation should finish before a client is able to do the following: re-join,
152have a new leader assign partitions, sync the assignment, and issue the offset
153fetch request. In effect, the 200ms here is an attempt to provide KIP-447
154semantics (waiting for stable fetch offsets) in place it matters most even
155though the cluster does not support the wait officially. Internally, the sleep
156is concurrent and only blocks a rebalance from beginning, it does not block
157you from starting a new transaction (but, it does prevent you from _ending_
158a new transaction).
159
160One last flaw of the above approach is that a lot of it is dependent on timing.
161If the servers you are running on do not have reliable clocks and may be very
162out of sync, then the timing aspects above may not work. However, it is likely
163your cluster will have other issues if some broker clocks are very off. It is
164recommended to have alerts on ntp clock drift.
165
166Thus, although we do support 2.5+ behavior, the client itself works around
167duplicates in a pre-2.5 world with a lot of edge case handling. It is
168_strongly_ recommended to use a 2.5+ cluster and to always enable
169`RequireStableFetchOffsets`. The option itself has more documentation on
170what other settings may need to be tweaked.
View as plain text