...

Source file src/github.com/twmb/franz-go/pkg/kadm/txn.go

Documentation: github.com/twmb/franz-go/pkg/kadm

     1  package kadm
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"sort"
     7  
     8  	"github.com/twmb/franz-go/pkg/kerr"
     9  	"github.com/twmb/franz-go/pkg/kmsg"
    10  )
    11  
    12  // DescribedProducer contains the state of a transactional producer's last
    13  // produce.
    14  type DescribedProducer struct {
    15  	Leader                int32  // Leader is the leader broker for this topic / partition.
    16  	Topic                 string // Topic is the topic being produced to.
    17  	Partition             int32  // Partition is the partition being produced to.
    18  	ProducerID            int64  // ProducerID is the producer ID that produced.
    19  	ProducerEpoch         int16  // ProducerEpoch is the epoch that produced.
    20  	LastSequence          int32  // LastSequence is the last sequence number the producer produced.
    21  	LastTimestamp         int64  // LastTimestamp is the last time this producer produced.
    22  	CoordinatorEpoch      int32  // CoordinatorEpoch is the epoch of the transactional coordinator for the last produce.
    23  	CurrentTxnStartOffset int64  // CurrentTxnStartOffset is the first offset in the transaction.
    24  }
    25  
    26  // Less returns whether the left described producer is less than the right,
    27  // in order of:
    28  //
    29  //   - Topic
    30  //   - Partition
    31  //   - ProducerID
    32  //   - ProducerEpoch
    33  //   - LastTimestamp
    34  //   - LastSequence
    35  func (l *DescribedProducer) Less(r *DescribedProducer) bool {
    36  	if l.Topic < r.Topic {
    37  		return true
    38  	}
    39  	if l.Topic > r.Topic {
    40  		return false
    41  	}
    42  	if l.Partition < r.Partition {
    43  		return true
    44  	}
    45  	if l.Partition > r.Partition {
    46  		return false
    47  	}
    48  	if l.ProducerID < r.ProducerID {
    49  		return true
    50  	}
    51  	if l.ProducerID > r.ProducerID {
    52  		return false
    53  	}
    54  	if l.ProducerEpoch < r.ProducerEpoch {
    55  		return true
    56  	}
    57  	if l.ProducerEpoch > r.ProducerEpoch {
    58  		return false
    59  	}
    60  	if l.LastTimestamp < r.LastTimestamp {
    61  		return true
    62  	}
    63  	if l.LastTimestamp > r.LastTimestamp {
    64  		return false
    65  	}
    66  	return l.LastSequence < r.LastSequence
    67  }
    68  
    69  // DescribedProducers maps producer IDs to the full described producer.
    70  type DescribedProducers map[int64]DescribedProducer
    71  
    72  // Sorted returns the described producers sorted by topic, partition, and
    73  // producer ID.
    74  func (ds DescribedProducers) Sorted() []DescribedProducer {
    75  	var all []DescribedProducer
    76  	for _, d := range ds {
    77  		all = append(all, d)
    78  	}
    79  	sort.Slice(all, func(i, j int) bool {
    80  		l, r := all[i], all[j]
    81  		return l.Topic < r.Topic || l.Topic == r.Topic && (l.Partition < r.Partition || l.Partition == r.Partition && l.ProducerID < r.ProducerID)
    82  	})
    83  	return all
    84  }
    85  
    86  // Each calls fn for each described producer.
    87  func (ds DescribedProducers) Each(fn func(DescribedProducer)) {
    88  	for _, d := range ds {
    89  		fn(d)
    90  	}
    91  }
    92  
    93  // DescribedProducersPartition is a partition whose producer's were described.
    94  type DescribedProducersPartition struct {
    95  	Leader          int32              // Leader is the leader broker for this topic / partition.
    96  	Topic           string             // Topic is the topic whose producer's were described.
    97  	Partition       int32              // Partition is the partition whose producer's were described.
    98  	ActiveProducers DescribedProducers // ActiveProducers are producer's actively transactionally producing to this partition.
    99  	Err             error              // Err is non-nil if describing this partition failed.
   100  }
   101  
   102  // DescribedProducersPartitions contains partitions whose producer's were described.
   103  type DescribedProducersPartitions map[int32]DescribedProducersPartition
   104  
   105  // Sorted returns the described partitions sorted by topic and partition.
   106  func (ds DescribedProducersPartitions) Sorted() []DescribedProducersPartition {
   107  	var all []DescribedProducersPartition
   108  	for _, d := range ds {
   109  		all = append(all, d)
   110  	}
   111  	sort.Slice(all, func(i, j int) bool {
   112  		l, r := all[i], all[j]
   113  		return l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
   114  	})
   115  	return all
   116  }
   117  
   118  // SortedProducer returns all producers sorted first by partition, then by producer ID.
   119  func (ds DescribedProducersPartitions) SortedProducers() []DescribedProducer {
   120  	var all []DescribedProducer
   121  	ds.EachProducer(func(d DescribedProducer) {
   122  		all = append(all, d)
   123  	})
   124  	sort.Slice(all, func(i, j int) bool {
   125  		l, r := all[i], all[j]
   126  		return l.Topic < r.Topic || l.Topic == r.Topic && (l.Partition < r.Partition || l.Partition == r.Partition && l.ProducerID < r.ProducerID)
   127  	})
   128  	return all
   129  }
   130  
   131  // Each calls fn for each partition.
   132  func (ds DescribedProducersPartitions) Each(fn func(DescribedProducersPartition)) {
   133  	for _, d := range ds {
   134  		fn(d)
   135  	}
   136  }
   137  
   138  // EachProducer calls fn for each producer in all partitions.
   139  func (ds DescribedProducersPartitions) EachProducer(fn func(DescribedProducer)) {
   140  	for _, d := range ds {
   141  		for _, p := range d.ActiveProducers {
   142  			fn(p)
   143  		}
   144  	}
   145  }
   146  
   147  // DescribedProducersTopic contains topic partitions whose producer's were described.
   148  type DescribedProducersTopic struct {
   149  	Topic      string                       // Topic is the topic whose producer's were described.
   150  	Partitions DescribedProducersPartitions // Partitions are partitions whose producer's were described.
   151  }
   152  
   153  // DescribedProducersTopics contains topics whose producer's were described.
   154  type DescribedProducersTopics map[string]DescribedProducersTopic
   155  
   156  // Sorted returns the described topics sorted by topic.
   157  func (ds DescribedProducersTopics) Sorted() []DescribedProducersTopic {
   158  	var all []DescribedProducersTopic
   159  	ds.Each(func(d DescribedProducersTopic) {
   160  		all = append(all, d)
   161  	})
   162  	sort.Slice(all, func(i, j int) bool {
   163  		l, r := all[i], all[j]
   164  		return l.Topic < r.Topic
   165  	})
   166  	return all
   167  }
   168  
   169  // Sorted returns the described partitions sorted by topic and partition.
   170  func (ds DescribedProducersTopics) SortedPartitions() []DescribedProducersPartition {
   171  	var all []DescribedProducersPartition
   172  	ds.EachPartition(func(d DescribedProducersPartition) {
   173  		all = append(all, d)
   174  	})
   175  	sort.Slice(all, func(i, j int) bool {
   176  		l, r := all[i], all[j]
   177  		return l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
   178  	})
   179  	return all
   180  }
   181  
   182  // SortedProducer returns all producers sorted first by partition, then by producer ID.
   183  func (ds DescribedProducersTopics) SortedProducers() []DescribedProducer {
   184  	var all []DescribedProducer
   185  	ds.EachProducer(func(d DescribedProducer) {
   186  		all = append(all, d)
   187  	})
   188  	sort.Slice(all, func(i, j int) bool {
   189  		l, r := all[i], all[j]
   190  		return l.Topic < r.Topic || l.Topic == r.Topic && (l.Partition < r.Partition || l.Partition == r.Partition && l.ProducerID < r.ProducerID)
   191  	})
   192  	return all
   193  }
   194  
   195  // Each calls fn for every topic.
   196  func (ds DescribedProducersTopics) Each(fn func(DescribedProducersTopic)) {
   197  	for _, d := range ds {
   198  		fn(d)
   199  	}
   200  }
   201  
   202  // EachPartitions calls fn for all topic partitions.
   203  func (ds DescribedProducersTopics) EachPartition(fn func(DescribedProducersPartition)) {
   204  	for _, d := range ds {
   205  		for _, p := range d.Partitions {
   206  			fn(p)
   207  		}
   208  	}
   209  }
   210  
   211  // EachProducer calls fn for each producer in all topics and partitions.
   212  func (ds DescribedProducersTopics) EachProducer(fn func(DescribedProducer)) {
   213  	for _, d := range ds {
   214  		for _, p := range d.Partitions {
   215  			for _, b := range p.ActiveProducers {
   216  				fn(b)
   217  			}
   218  		}
   219  	}
   220  }
   221  
   222  // DescribeProducers describes all producers that are transactionally producing
   223  // to the requested topic set. This request can be used to detect hanging
   224  // transactions or other transaction related problems. If the input set is
   225  // empty, this requests data for all partitions.
   226  //
   227  // This may return *ShardErrors or *AuthError.
   228  func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (DescribedProducersTopics, error) {
   229  	if len(s) == 0 {
   230  		m, err := cl.Metadata(ctx)
   231  		if err != nil {
   232  			return nil, err
   233  		}
   234  		s = m.Topics.TopicsSet()
   235  	} else if e := s.EmptyTopics(); len(e) > 0 {
   236  		m, err := cl.Metadata(ctx, e...)
   237  		if err != nil {
   238  			return nil, err
   239  		}
   240  		for t, ps := range m.Topics.TopicsSet() {
   241  			s[t] = ps
   242  		}
   243  	}
   244  
   245  	req := kmsg.NewPtrDescribeProducersRequest()
   246  	for _, t := range s.IntoList() {
   247  		rt := kmsg.NewDescribeProducersRequestTopic()
   248  		rt.Topic = t.Topic
   249  		rt.Partitions = t.Partitions
   250  		req.Topics = append(req.Topics, rt)
   251  	}
   252  	shards := cl.cl.RequestSharded(ctx, req)
   253  	dts := make(DescribedProducersTopics)
   254  	return dts, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
   255  		resp := kr.(*kmsg.DescribeProducersResponse)
   256  		for _, rt := range resp.Topics {
   257  			dt, exists := dts[rt.Topic]
   258  			if !exists { // topic could be spread around brokers, we need to check existence
   259  				dt = DescribedProducersTopic{
   260  					Topic:      rt.Topic,
   261  					Partitions: make(DescribedProducersPartitions),
   262  				}
   263  				dts[rt.Topic] = dt
   264  			}
   265  			dps := dt.Partitions
   266  			for _, rp := range rt.Partitions {
   267  				if err := maybeAuthErr(rp.ErrorCode); err != nil {
   268  					return err
   269  				}
   270  				drs := make(DescribedProducers)
   271  				dp := DescribedProducersPartition{
   272  					Leader:          b.NodeID,
   273  					Topic:           rt.Topic,
   274  					Partition:       rp.Partition,
   275  					ActiveProducers: drs,
   276  					Err:             kerr.ErrorForCode(rp.ErrorCode),
   277  				}
   278  				dps[rp.Partition] = dp // one partition globally, no need to exist-check
   279  				for _, rr := range rp.ActiveProducers {
   280  					dr := DescribedProducer{
   281  						Leader:                b.NodeID,
   282  						Topic:                 rt.Topic,
   283  						Partition:             rp.Partition,
   284  						ProducerID:            rr.ProducerID,
   285  						ProducerEpoch:         int16(rr.ProducerEpoch),
   286  						LastSequence:          rr.LastSequence,
   287  						LastTimestamp:         rr.LastTimestamp,
   288  						CoordinatorEpoch:      rr.CoordinatorEpoch,
   289  						CurrentTxnStartOffset: rr.CurrentTxnStartOffset,
   290  					}
   291  					drs[dr.ProducerID] = dr
   292  				}
   293  			}
   294  		}
   295  		return nil
   296  	})
   297  }
   298  
   299  // DescribedTransaction contains data from a describe transactions response for
   300  // a single transactional ID.
   301  type DescribedTransaction struct {
   302  	Coordinator    int32  // Coordinator is the coordinator broker for this transactional ID.
   303  	TxnID          string // TxnID is the name of this transactional ID.
   304  	State          string // State is the state this transaction is in (Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence).
   305  	TimeoutMillis  int32  // TimeoutMillis is the timeout of this transaction in milliseconds.
   306  	StartTimestamp int64  // StartTimestamp is millisecond when this transaction started.
   307  	ProducerID     int64  // ProducerID is the ID in use by the transactional ID.
   308  	ProducerEpoch  int16  // ProducerEpoch is the epoch associated with the produce rID.
   309  
   310  	// Topics is the set of partitions in the transaction, if active. When
   311  	// preparing to commit or abort, this includes only partitions which do
   312  	// not have markers. This does not include topics the user is not
   313  	// authorized to describe.
   314  	Topics TopicsSet
   315  
   316  	Err error // Err is non-nil if the transaction could not be described.
   317  }
   318  
   319  // DescribedTransactions contains information from a describe transactions
   320  // response.
   321  type DescribedTransactions map[string]DescribedTransaction
   322  
   323  // Sorted returns all described transactions sorted by transactional ID.
   324  func (ds DescribedTransactions) Sorted() []DescribedTransaction {
   325  	s := make([]DescribedTransaction, 0, len(ds))
   326  	for _, d := range ds {
   327  		s = append(s, d)
   328  	}
   329  	sort.Slice(s, func(i, j int) bool { return s[i].TxnID < s[j].TxnID })
   330  	return s
   331  }
   332  
   333  // Each calls fn for each described transaction.
   334  func (ds DescribedTransactions) Each(fn func(DescribedTransaction)) {
   335  	for _, d := range ds {
   336  		fn(d)
   337  	}
   338  }
   339  
   340  // On calls fn for the transactional ID if it exists, returning the transaction
   341  // and the error returned from fn. If fn is nil, this simply returns the
   342  // transaction.
   343  //
   344  // The fn is given a shallow copy of the transaction. This function returns the
   345  // copy as well; any modifications within fn are modifications on the returned
   346  // copy.  Modifications on a described transaction's inner fields are persisted
   347  // to the original map (because slices are pointers).
   348  //
   349  // If the transaction does not exist, this returns
   350  // kerr.TransactionalIDNotFound.
   351  func (rs DescribedTransactions) On(txnID string, fn func(*DescribedTransaction) error) (DescribedTransaction, error) {
   352  	if len(rs) > 0 {
   353  		r, ok := rs[txnID]
   354  		if ok {
   355  			if fn == nil {
   356  				return r, nil
   357  			}
   358  			return r, fn(&r)
   359  		}
   360  	}
   361  	return DescribedTransaction{}, kerr.TransactionalIDNotFound
   362  }
   363  
   364  // TransactionalIDs returns a sorted list of all transactional IDs.
   365  func (ds DescribedTransactions) TransactionalIDs() []string {
   366  	all := make([]string, 0, len(ds))
   367  	for t := range ds {
   368  		all = append(all, t)
   369  	}
   370  	sort.Strings(all)
   371  	return all
   372  }
   373  
   374  // DescribeTransactions describes either all transactional IDs specified, or
   375  // all transactional IDs in the cluster if none are specified.
   376  //
   377  // This may return *ShardErrors or *AuthError.
   378  //
   379  // If no transactional IDs are specified and this method first lists
   380  // transactional IDs, and listing IDs returns a *ShardErrors, this function
   381  // describes all successfully listed IDs and appends the list shard errors to
   382  // any describe shard errors.
   383  //
   384  // If only one ID is described, there will be at most one request issued and
   385  // there is no need to deeply inspect the error.
   386  func (cl *Client) DescribeTransactions(ctx context.Context, txnIDs ...string) (DescribedTransactions, error) {
   387  	var seList *ShardErrors
   388  	if len(txnIDs) == 0 {
   389  		listed, err := cl.ListTransactions(ctx, nil, nil)
   390  		switch {
   391  		case err == nil:
   392  		case errors.As(err, &seList):
   393  		default:
   394  			return nil, err
   395  		}
   396  		txnIDs = listed.TransactionalIDs()
   397  		if len(txnIDs) == 0 {
   398  			return nil, err
   399  		}
   400  	}
   401  
   402  	req := kmsg.NewPtrDescribeTransactionsRequest()
   403  	req.TransactionalIDs = txnIDs
   404  
   405  	shards := cl.cl.RequestSharded(ctx, req)
   406  	described := make(DescribedTransactions)
   407  	err := shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
   408  		resp := kr.(*kmsg.DescribeTransactionsResponse)
   409  		for _, rt := range resp.TransactionStates {
   410  			if err := maybeAuthErr(rt.ErrorCode); err != nil {
   411  				return err
   412  			}
   413  			t := DescribedTransaction{
   414  				Coordinator:    b.NodeID,
   415  				TxnID:          rt.TransactionalID,
   416  				State:          rt.State,
   417  				TimeoutMillis:  rt.TimeoutMillis,
   418  				StartTimestamp: rt.StartTimestamp,
   419  				ProducerID:     rt.ProducerID,
   420  				ProducerEpoch:  rt.ProducerEpoch,
   421  				Err:            kerr.ErrorForCode(rt.ErrorCode),
   422  			}
   423  			for _, rtt := range rt.Topics {
   424  				t.Topics.Add(rtt.Topic, rtt.Partitions...)
   425  			}
   426  			described[t.TxnID] = t // txnID lives on one coordinator, no need to exist-check
   427  		}
   428  		return nil
   429  	})
   430  
   431  	var seDesc *ShardErrors
   432  	switch {
   433  	case err == nil:
   434  		return described, seList.into()
   435  	case errors.As(err, &seDesc):
   436  		if seList != nil {
   437  			seDesc.Errs = append(seList.Errs, seDesc.Errs...)
   438  		}
   439  		return described, seDesc.into()
   440  	default:
   441  		return nil, err
   442  	}
   443  }
   444  
   445  // ListedTransaction contains data from a list transactions response for a
   446  // single transactional ID.
   447  type ListedTransaction struct {
   448  	Coordinator int32  // Coordinator the coordinator broker for this transactional ID.
   449  	TxnID       string // TxnID is the name of this transactional ID.
   450  	ProducerID  int64  // ProducerID is the producer ID for this transaction.
   451  	State       string // State is the state this transaction is in (Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence).
   452  }
   453  
   454  // ListedTransactions contains information from a list transactions response.
   455  type ListedTransactions map[string]ListedTransaction
   456  
   457  // Sorted returns all transactions sorted by transactional ID.
   458  func (ls ListedTransactions) Sorted() []ListedTransaction {
   459  	s := make([]ListedTransaction, 0, len(ls))
   460  	for _, l := range ls {
   461  		s = append(s, l)
   462  	}
   463  	sort.Slice(s, func(i, j int) bool { return s[i].TxnID < s[j].TxnID })
   464  	return s
   465  }
   466  
   467  // Each calls fn for each listed transaction.
   468  func (ls ListedTransactions) Each(fn func(ListedTransaction)) {
   469  	for _, l := range ls {
   470  		fn(l)
   471  	}
   472  }
   473  
   474  // TransactionalIDs returns a sorted list of all transactional IDs.
   475  func (ls ListedTransactions) TransactionalIDs() []string {
   476  	all := make([]string, 0, len(ls))
   477  	for t := range ls {
   478  		all = append(all, t)
   479  	}
   480  	sort.Strings(all)
   481  	return all
   482  }
   483  
   484  // ListTransactions returns all transactions and their states in the cluster.
   485  // Filter states can be used to return transactions only in the requested
   486  // states. By default, this returns all transactions you have DESCRIBE access
   487  // to. Producer IDs can be specified to filter for transactions from the given
   488  // producer.
   489  //
   490  // This may return *ShardErrors or *AuthError.
   491  func (cl *Client) ListTransactions(ctx context.Context, producerIDs []int64, filterStates []string) (ListedTransactions, error) {
   492  	req := kmsg.NewPtrListTransactionsRequest()
   493  	req.ProducerIDFilters = producerIDs
   494  	req.StateFilters = filterStates
   495  	shards := cl.cl.RequestSharded(ctx, req)
   496  	list := make(ListedTransactions)
   497  	return list, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
   498  		resp := kr.(*kmsg.ListTransactionsResponse)
   499  		if err := maybeAuthErr(resp.ErrorCode); err != nil {
   500  			return err
   501  		}
   502  		if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
   503  			return err
   504  		}
   505  		for _, t := range resp.TransactionStates {
   506  			list[t.TransactionalID] = ListedTransaction{ // txnID lives on one coordinator, no need to exist-check
   507  				Coordinator: b.NodeID,
   508  				TxnID:       t.TransactionalID,
   509  				ProducerID:  t.ProducerID,
   510  				State:       t.TransactionState,
   511  			}
   512  		}
   513  		return nil
   514  	})
   515  }
   516  
   517  // TxnMarkers marks the end of a partition: the producer ID / epoch doing the
   518  // writing, whether this is a commit, the coordinator epoch of the broker we
   519  // are writing to (for fencing), and the topics and partitions that we are
   520  // writing this abort or commit for.
   521  //
   522  // This is a very low level admin request and should likely be built from data
   523  // in a DescribeProducers response. See KIP-664 if you are trying to use this.
   524  type TxnMarkers struct {
   525  	ProducerID       int64     // ProducerID is the ID to write markers for.
   526  	ProducerEpoch    int16     // ProducerEpoch is the epoch to write markers for.
   527  	Commit           bool      // Commit is true if we are committing, false if we are aborting.
   528  	CoordinatorEpoch int32     // CoordinatorEpoch is the epoch of the transactional coordinator we are writing to; this is used for fencing.
   529  	Topics           TopicsSet // Topics are topics and partitions to write markers for.
   530  }
   531  
   532  // TxnMarkersPartitionResponse is a response to a topic's partition within a
   533  // single marker written.
   534  type TxnMarkersPartitionResponse struct {
   535  	NodeID     int32  // NodeID is the node that this marker was written to.
   536  	ProducerID int64  // ProducerID corresponds to the PID in the write marker request.
   537  	Topic      string // Topic is the topic being responded to.
   538  	Partition  int32  // Partition is the partition being responded to.
   539  	Err        error  // Err is non-nil if the WriteTxnMarkers request for this pid/topic/partition failed.
   540  }
   541  
   542  // TxnMarkersPartitionResponses contains per-partition responses to a
   543  // WriteTxnMarkers request.
   544  type TxnMarkersPartitionResponses map[int32]TxnMarkersPartitionResponse
   545  
   546  // Sorted returns all partitions sorted by partition.
   547  func (ps TxnMarkersPartitionResponses) Sorted() []TxnMarkersPartitionResponse {
   548  	var all []TxnMarkersPartitionResponse
   549  	ps.Each(func(p TxnMarkersPartitionResponse) {
   550  		all = append(all, p)
   551  	})
   552  	sort.Slice(all, func(i, j int) bool {
   553  		l, r := all[i], all[j]
   554  		return l.Partition < r.Partition
   555  	})
   556  	return all
   557  }
   558  
   559  // Each calls fn for each partition.
   560  func (ps TxnMarkersPartitionResponses) Each(fn func(TxnMarkersPartitionResponse)) {
   561  	for _, p := range ps {
   562  		fn(p)
   563  	}
   564  }
   565  
   566  // TxnMarkersTopicResponse is a response to a topic within a single marker
   567  // written.
   568  type TxnMarkersTopicResponse struct {
   569  	ProducerID int64                        // ProducerID corresponds to the PID in the write marker request.
   570  	Topic      string                       // Topic is the topic being responded to.
   571  	Partitions TxnMarkersPartitionResponses // Partitions are the responses for partitions in this marker.
   572  }
   573  
   574  // TxnMarkersTopicResponses contains per-topic responses to a WriteTxnMarkers
   575  // request.
   576  type TxnMarkersTopicResponses map[string]TxnMarkersTopicResponse
   577  
   578  // Sorted returns all topics sorted by topic.
   579  func (ts TxnMarkersTopicResponses) Sorted() []TxnMarkersTopicResponse {
   580  	var all []TxnMarkersTopicResponse
   581  	ts.Each(func(t TxnMarkersTopicResponse) {
   582  		all = append(all, t)
   583  	})
   584  	sort.Slice(all, func(i, j int) bool {
   585  		l, r := all[i], all[j]
   586  		return l.Topic < r.Topic
   587  	})
   588  	return all
   589  }
   590  
   591  // SortedPartitions returns all topics sorted by topic then partition.
   592  func (ts TxnMarkersTopicResponses) SortedPartitions() []TxnMarkersPartitionResponse {
   593  	var all []TxnMarkersPartitionResponse
   594  	ts.EachPartition(func(p TxnMarkersPartitionResponse) {
   595  		all = append(all, p)
   596  	})
   597  	sort.Slice(all, func(i, j int) bool {
   598  		l, r := all[i], all[j]
   599  		return l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
   600  	})
   601  	return all
   602  }
   603  
   604  // Each calls fn for each topic.
   605  func (ts TxnMarkersTopicResponses) Each(fn func(TxnMarkersTopicResponse)) {
   606  	for _, t := range ts {
   607  		fn(t)
   608  	}
   609  }
   610  
   611  // EachPartition calls fn for every partition in all topics.
   612  func (ts TxnMarkersTopicResponses) EachPartition(fn func(TxnMarkersPartitionResponse)) {
   613  	for _, t := range ts {
   614  		for _, p := range t.Partitions {
   615  			fn(p)
   616  		}
   617  	}
   618  }
   619  
   620  // TxnMarkersResponse is a response for a single marker written.
   621  type TxnMarkersResponse struct {
   622  	ProducerID int64                    // ProducerID corresponds to the PID in the write marker request.
   623  	Topics     TxnMarkersTopicResponses // Topics contains the topics that markers were written for, for this ProducerID.
   624  }
   625  
   626  // TxnMarkersResponse contains per-partition-ID responses to a WriteTxnMarkers
   627  // request.
   628  type TxnMarkersResponses map[int64]TxnMarkersResponse
   629  
   630  // Sorted returns all markers sorted by producer ID.
   631  func (ms TxnMarkersResponses) Sorted() []TxnMarkersResponse {
   632  	var all []TxnMarkersResponse
   633  	ms.Each(func(m TxnMarkersResponse) {
   634  		all = append(all, m)
   635  	})
   636  	sort.Slice(all, func(i, j int) bool {
   637  		l, r := all[i], all[j]
   638  		return l.ProducerID < r.ProducerID
   639  	})
   640  	return all
   641  }
   642  
   643  // SortedTopics returns all marker topics sorted by producer ID then topic.
   644  func (ms TxnMarkersResponses) SortedTopics() []TxnMarkersTopicResponse {
   645  	var all []TxnMarkersTopicResponse
   646  	ms.EachTopic(func(t TxnMarkersTopicResponse) {
   647  		all = append(all, t)
   648  	})
   649  	sort.Slice(all, func(i, j int) bool {
   650  		l, r := all[i], all[j]
   651  		return l.ProducerID < r.ProducerID || l.ProducerID == r.ProducerID && l.Topic < r.Topic
   652  	})
   653  	return all
   654  }
   655  
   656  // SortedPartitions returns all marker topic partitions sorted by producer ID
   657  // then topic then partition.
   658  func (ms TxnMarkersResponses) SortedPartitions() []TxnMarkersPartitionResponse {
   659  	var all []TxnMarkersPartitionResponse
   660  	ms.EachPartition(func(p TxnMarkersPartitionResponse) {
   661  		all = append(all, p)
   662  	})
   663  	sort.Slice(all, func(i, j int) bool {
   664  		l, r := all[i], all[j]
   665  		return l.ProducerID < r.ProducerID || l.ProducerID == r.ProducerID && l.Topic < r.Topic || l.Topic == r.Topic && l.Partition < r.Partition
   666  	})
   667  	return all
   668  }
   669  
   670  // Each calls fn for each marker response.
   671  func (ms TxnMarkersResponses) Each(fn func(TxnMarkersResponse)) {
   672  	for _, m := range ms {
   673  		fn(m)
   674  	}
   675  }
   676  
   677  // EachTopic calls fn for every topic in all marker responses.
   678  func (ms TxnMarkersResponses) EachTopic(fn func(TxnMarkersTopicResponse)) {
   679  	for _, m := range ms {
   680  		for _, t := range m.Topics {
   681  			fn(t)
   682  		}
   683  	}
   684  }
   685  
   686  // EachPartition calls fn for every partition in all topics in all marker
   687  // responses.
   688  func (ms TxnMarkersResponses) EachPartition(fn func(TxnMarkersPartitionResponse)) {
   689  	for _, m := range ms {
   690  		for _, t := range m.Topics {
   691  			for _, p := range t.Partitions {
   692  				fn(p)
   693  			}
   694  		}
   695  	}
   696  }
   697  
   698  // WriteTxnMarkers writes transaction markers to brokers. This is an advanced
   699  // admin way to close out open transactions. See KIP-664 for more details.
   700  //
   701  // This may return *ShardErrors or *AuthError.
   702  func (cl *Client) WriteTxnMarkers(ctx context.Context, markers ...TxnMarkers) (TxnMarkersResponses, error) {
   703  	req := kmsg.NewPtrWriteTxnMarkersRequest()
   704  	for _, m := range markers {
   705  		rm := kmsg.NewWriteTxnMarkersRequestMarker()
   706  		rm.ProducerID = m.ProducerID
   707  		rm.ProducerEpoch = m.ProducerEpoch
   708  		rm.Committed = m.Commit
   709  		rm.CoordinatorEpoch = m.CoordinatorEpoch
   710  		for t, ps := range m.Topics {
   711  			rt := kmsg.NewWriteTxnMarkersRequestMarkerTopic()
   712  			rt.Topic = t
   713  			for p := range ps {
   714  				rt.Partitions = append(rt.Partitions, p)
   715  			}
   716  			rm.Topics = append(rm.Topics, rt)
   717  		}
   718  		req.Markers = append(req.Markers, rm)
   719  	}
   720  	shards := cl.cl.RequestSharded(ctx, req)
   721  	rs := make(TxnMarkersResponses)
   722  	return rs, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
   723  		resp := kr.(*kmsg.WriteTxnMarkersResponse)
   724  		for _, rm := range resp.Markers {
   725  			m, exists := rs[rm.ProducerID] // partitions are spread around, our marker could be split: we need to check existence
   726  			if !exists {
   727  				m = TxnMarkersResponse{
   728  					ProducerID: rm.ProducerID,
   729  					Topics:     make(TxnMarkersTopicResponses),
   730  				}
   731  				rs[rm.ProducerID] = m
   732  			}
   733  			for _, rt := range rm.Topics {
   734  				t, exists := m.Topics[rt.Topic]
   735  				if !exists { // same thought
   736  					t = TxnMarkersTopicResponse{
   737  						ProducerID: rm.ProducerID,
   738  						Topic:      rt.Topic,
   739  						Partitions: make(TxnMarkersPartitionResponses),
   740  					}
   741  					m.Topics[rt.Topic] = t
   742  				}
   743  				for _, rp := range rt.Partitions {
   744  					if err := maybeAuthErr(rp.ErrorCode); err != nil {
   745  						return err
   746  					}
   747  					t.Partitions[rp.Partition] = TxnMarkersPartitionResponse{ // one partition globally, no need to exist-check
   748  						NodeID:     b.NodeID,
   749  						ProducerID: rm.ProducerID,
   750  						Topic:      rt.Topic,
   751  						Partition:  rp.Partition,
   752  						Err:        kerr.ErrorForCode(rp.ErrorCode),
   753  					}
   754  				}
   755  			}
   756  		}
   757  		return nil
   758  	})
   759  }
   760  

View as plain text