...

Source file src/github.com/twmb/franz-go/pkg/kadm/logdirs.go

Documentation: github.com/twmb/franz-go/pkg/kadm

     1  package kadm
     2  
     3  import (
     4  	"context"
     5  	"sort"
     6  
     7  	"github.com/twmb/franz-go/pkg/kerr"
     8  	"github.com/twmb/franz-go/pkg/kmsg"
     9  )
    10  
    11  // AlterReplicaLogDirsReq is the input for a request to alter replica log
    12  // directories. The key is the directory that all topics and partitions in
    13  // the topic set will move to.
    14  type AlterReplicaLogDirsReq map[string]TopicsSet
    15  
    16  // Add merges the input topic set into the given directory.
    17  func (r *AlterReplicaLogDirsReq) Add(d string, s TopicsSet) {
    18  	if *r == nil {
    19  		*r = make(map[string]TopicsSet)
    20  	}
    21  	existing := (*r)[d]
    22  	if existing == nil {
    23  		existing = make(TopicsSet)
    24  		(*r)[d] = existing
    25  	}
    26  	existing.Merge(s)
    27  }
    28  
    29  func (r AlterReplicaLogDirsReq) req() *kmsg.AlterReplicaLogDirsRequest {
    30  	req := kmsg.NewPtrAlterReplicaLogDirsRequest()
    31  	for dir, ts := range r {
    32  		rd := kmsg.NewAlterReplicaLogDirsRequestDir()
    33  		rd.Dir = dir
    34  		for t, ps := range ts {
    35  			rt := kmsg.NewAlterReplicaLogDirsRequestDirTopic()
    36  			rt.Topic = t
    37  			for p := range ps {
    38  				rt.Partitions = append(rt.Partitions, p)
    39  			}
    40  			rd.Topics = append(rd.Topics, rt)
    41  		}
    42  		req.Dirs = append(req.Dirs, rd)
    43  	}
    44  	return req
    45  }
    46  
    47  func (r AlterReplicaLogDirsReq) dirfor(t string, p int32) string {
    48  	for d, dts := range r {
    49  		if dts == nil {
    50  			continue
    51  		}
    52  		dtps, ok := dts[t] // does this dir contain this topic?
    53  		if !ok {
    54  			continue
    55  		}
    56  		if _, ok = dtps[p]; !ok { // does this topic in this dir contain this partition?
    57  			continue
    58  		}
    59  		return d // yes
    60  	}
    61  	return ""
    62  }
    63  
    64  // AlterAllReplicaLogDirsResponses contains per-broker responses to altered
    65  // partition directories.
    66  type AlterAllReplicaLogDirsResponses map[int32]AlterReplicaLogDirsResponses
    67  
    68  // Sorted returns the responses sorted by broker, topic, and partition.
    69  func (rs AlterAllReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse {
    70  	var all []AlterReplicaLogDirsResponse
    71  	rs.Each(func(r AlterReplicaLogDirsResponse) {
    72  		all = append(all, r)
    73  	})
    74  	sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
    75  	return all
    76  }
    77  
    78  // Each calls fn for every response.
    79  func (rs AlterAllReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse)) {
    80  	for _, ts := range rs {
    81  		ts.Each(fn)
    82  	}
    83  }
    84  
    85  // AlterReplicaLogDirsResponses contains responses to altered partition
    86  // directories for a single broker.
    87  type AlterReplicaLogDirsResponses map[string]map[int32]AlterReplicaLogDirsResponse
    88  
    89  // Sorted returns the responses sorted by topic and partition.
    90  func (rs AlterReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse {
    91  	var all []AlterReplicaLogDirsResponse
    92  	rs.Each(func(r AlterReplicaLogDirsResponse) {
    93  		all = append(all, r)
    94  	})
    95  	sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
    96  	return all
    97  }
    98  
    99  // Each calls fn for every response.
   100  func (rs AlterReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse)) {
   101  	for _, ps := range rs {
   102  		for _, r := range ps {
   103  			fn(r)
   104  		}
   105  	}
   106  }
   107  
   108  // AlterReplicaLogDirsResponse contains a the response for an individual
   109  // altered partition directory.
   110  type AlterReplicaLogDirsResponse struct {
   111  	Broker    int32  // Broker is the broker this response came from.
   112  	Dir       string // Dir is the directory this partition was requested to be moved to.
   113  	Topic     string // Topic is the topic for this partition.
   114  	Partition int32  // Partition is the partition that was moved.
   115  	Err       error  // Err is non-nil if this move had an error.
   116  }
   117  
   118  // Less returns if the response is less than the other by broker, dir, topic,
   119  // and partition.
   120  func (a AlterReplicaLogDirsResponse) Less(other AlterReplicaLogDirsResponse) bool {
   121  	if a.Broker < other.Broker {
   122  		return true
   123  	}
   124  	if a.Broker > other.Broker {
   125  		return false
   126  	}
   127  	if a.Dir < other.Dir {
   128  		return true
   129  	}
   130  	if a.Dir > other.Dir {
   131  		return false
   132  	}
   133  	if a.Topic < other.Topic {
   134  		return true
   135  	}
   136  	if a.Topic > other.Topic {
   137  		return false
   138  	}
   139  	return a.Partition < other.Partition
   140  }
   141  
   142  func newAlterLogDirsResp(node int32, req AlterReplicaLogDirsReq, resp *kmsg.AlterReplicaLogDirsResponse) AlterReplicaLogDirsResponses {
   143  	a := make(AlterReplicaLogDirsResponses)
   144  	for _, kt := range resp.Topics {
   145  		ps := make(map[int32]AlterReplicaLogDirsResponse)
   146  		a[kt.Topic] = ps
   147  		for _, kp := range kt.Partitions {
   148  			ps[kp.Partition] = AlterReplicaLogDirsResponse{
   149  				Broker:    node,
   150  				Dir:       req.dirfor(kt.Topic, kp.Partition),
   151  				Topic:     kt.Topic,
   152  				Partition: kp.Partition,
   153  				Err:       kerr.ErrorForCode(kp.ErrorCode),
   154  			}
   155  		}
   156  	}
   157  	return a
   158  }
   159  
   160  // AlterAllReplicaLogDirs alters the log directories for the input topic
   161  // partitions, moving each partition to the requested directory. This function
   162  // moves all replicas on any broker.
   163  //
   164  // This may return *ShardErrors.
   165  func (cl *Client) AlterAllReplicaLogDirs(ctx context.Context, alter AlterReplicaLogDirsReq) (AlterAllReplicaLogDirsResponses, error) {
   166  	if len(alter) == 0 {
   167  		return make(AlterAllReplicaLogDirsResponses), nil
   168  	}
   169  	req := alter.req()
   170  	shards := cl.cl.RequestSharded(ctx, req)
   171  	resps := make(AlterAllReplicaLogDirsResponses)
   172  	return resps, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
   173  		resp := kr.(*kmsg.AlterReplicaLogDirsResponse)
   174  		resps[b.NodeID] = newAlterLogDirsResp(b.NodeID, alter, resp) // one node ID, no need to unique-check
   175  		return nil
   176  	})
   177  }
   178  
   179  // AlterBrokerReplicaLogDirs alters the log directories for the input topic on the
   180  // given broker, moving each partition to the requested directory.
   181  func (cl *Client) AlterBrokerReplicaLogDirs(ctx context.Context, broker int32, alter AlterReplicaLogDirsReq) (AlterReplicaLogDirsResponses, error) {
   182  	if len(alter) == 0 {
   183  		return make(AlterReplicaLogDirsResponses), nil
   184  	}
   185  	b := cl.cl.Broker(int(broker))
   186  	kresp, err := b.RetriableRequest(ctx, alter.req())
   187  	if err != nil {
   188  		return nil, err
   189  	}
   190  	resp := kresp.(*kmsg.AlterReplicaLogDirsResponse)
   191  	return newAlterLogDirsResp(broker, alter, resp), nil
   192  }
   193  
   194  func describeLogDirsReq(s TopicsSet) *kmsg.DescribeLogDirsRequest {
   195  	req := kmsg.NewPtrDescribeLogDirsRequest()
   196  	for t, ps := range s {
   197  		rt := kmsg.NewDescribeLogDirsRequestTopic()
   198  		rt.Topic = t
   199  		for p := range ps {
   200  			rt.Partitions = append(rt.Partitions, p)
   201  		}
   202  		req.Topics = append(req.Topics, rt)
   203  	}
   204  	return req
   205  }
   206  
   207  // DescribedAllLogDirs contains per-broker responses to described log
   208  // directories.
   209  type DescribedAllLogDirs map[int32]DescribedLogDirs
   210  
   211  // Sorted returns each log directory sorted by broker, then by directory.
   212  func (ds DescribedAllLogDirs) Sorted() []DescribedLogDir {
   213  	var all []DescribedLogDir
   214  	ds.Each(func(d DescribedLogDir) {
   215  		all = append(all, d)
   216  	})
   217  	sort.Slice(all, func(i, j int) bool {
   218  		l, r := all[i], all[j]
   219  		return l.Broker < r.Broker || l.Broker == r.Broker && l.Dir < r.Dir
   220  	})
   221  	return all
   222  }
   223  
   224  // Each calls fn for every described log dir in all responses.
   225  func (ds DescribedAllLogDirs) Each(fn func(DescribedLogDir)) {
   226  	for _, bds := range ds {
   227  		bds.Each(fn)
   228  	}
   229  }
   230  
   231  // DescribedLogDirs contains per-directory responses to described log
   232  // directories for a single broker.
   233  type DescribedLogDirs map[string]DescribedLogDir
   234  
   235  // Lookup returns the described partition if it exists.
   236  func (ds DescribedLogDirs) Lookup(d, t string, p int32) (DescribedLogDirPartition, bool) {
   237  	dir, exists := ds[d]
   238  	if !exists {
   239  		return DescribedLogDirPartition{}, false
   240  	}
   241  	ps, exists := dir.Topics[t]
   242  	if !exists {
   243  		return DescribedLogDirPartition{}, false
   244  	}
   245  	dp, exists := ps[p]
   246  	if !exists {
   247  		return DescribedLogDirPartition{}, false
   248  	}
   249  	return dp, true
   250  }
   251  
   252  // LookupPartition returns the described partition if it exists in any
   253  // directory. Brokers should only have one replica of a partition, so this
   254  // should always find at most one partition.
   255  func (ds DescribedLogDirs) LookupPartition(t string, p int32) (DescribedLogDirPartition, bool) {
   256  	for _, dir := range ds {
   257  		ps, exists := dir.Topics[t]
   258  		if !exists {
   259  			continue
   260  		}
   261  		dp, exists := ps[p]
   262  		if !exists {
   263  			continue
   264  		}
   265  		return dp, true
   266  	}
   267  	return DescribedLogDirPartition{}, false
   268  }
   269  
   270  // Size returns the total size of all directories.
   271  func (ds DescribedLogDirs) Size() int64 {
   272  	var tot int64
   273  	ds.EachPartition(func(d DescribedLogDirPartition) {
   274  		tot += d.Size
   275  	})
   276  	return tot
   277  }
   278  
   279  // Error iterates over all directories and returns the first error encounted,
   280  // if any. This can be used to check if describing was entirely successful or
   281  // not.
   282  func (ds DescribedLogDirs) Error() error {
   283  	for _, d := range ds {
   284  		if d.Err != nil {
   285  			return d.Err
   286  		}
   287  	}
   288  	return nil
   289  }
   290  
   291  // Ok returns true if there are no errors. This is a shortcut for ds.Error() ==
   292  // nil.
   293  func (ds DescribedLogDirs) Ok() bool {
   294  	return ds.Error() == nil
   295  }
   296  
   297  // Sorted returns all directories sorted by dir.
   298  func (ds DescribedLogDirs) Sorted() []DescribedLogDir {
   299  	var all []DescribedLogDir
   300  	ds.Each(func(d DescribedLogDir) {
   301  		all = append(all, d)
   302  	})
   303  	sort.Slice(all, func(i, j int) bool {
   304  		l, r := all[i], all[j]
   305  		return l.Broker < r.Broker || l.Broker == r.Broker && l.Dir < r.Dir
   306  	})
   307  	return all
   308  }
   309  
   310  // SortedPartitions returns all partitions sorted by dir, then topic, then
   311  // partition.
   312  func (ds DescribedLogDirs) SortedPartitions() []DescribedLogDirPartition {
   313  	var all []DescribedLogDirPartition
   314  	ds.EachPartition(func(d DescribedLogDirPartition) {
   315  		all = append(all, d)
   316  	})
   317  	sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
   318  	return all
   319  }
   320  
   321  // SortedBySize returns all directories sorted from smallest total directory
   322  // size to largest.
   323  func (ds DescribedLogDirs) SortedBySize() []DescribedLogDir {
   324  	var all []DescribedLogDir
   325  	ds.Each(func(d DescribedLogDir) {
   326  		all = append(all, d)
   327  	})
   328  	sort.Slice(all, func(i, j int) bool {
   329  		l, r := all[i], all[j]
   330  		ls, rs := l.Size(), r.Size()
   331  		return ls < rs || ls == rs &&
   332  			(l.Broker < r.Broker || l.Broker == r.Broker &&
   333  				l.Dir < r.Dir)
   334  	})
   335  	return all
   336  }
   337  
   338  // SortedPartitionsBySize returns all partitions across all directories sorted
   339  // by smallest to largest, falling back to by broker, dir, topic, and
   340  // partition.
   341  func (ds DescribedLogDirs) SortedPartitionsBySize() []DescribedLogDirPartition {
   342  	var all []DescribedLogDirPartition
   343  	ds.EachPartition(func(d DescribedLogDirPartition) {
   344  		all = append(all, d)
   345  	})
   346  	sort.Slice(all, func(i, j int) bool { return all[i].LessBySize(all[j]) })
   347  	return all
   348  }
   349  
   350  // SmallestPartitionBySize returns the smallest partition by directory size, or
   351  // no partition if there are no partitions.
   352  func (ds DescribedLogDirs) SmallestPartitionBySize() (DescribedLogDirPartition, bool) {
   353  	sorted := ds.SortedPartitionsBySize()
   354  	if len(sorted) == 0 {
   355  		return DescribedLogDirPartition{}, false
   356  	}
   357  	return sorted[0], true
   358  }
   359  
   360  // LargestPartitionBySize returns the largest partition by directory size, or
   361  // no partition if there are no partitions.
   362  func (ds DescribedLogDirs) LargestPartitionBySize() (DescribedLogDirPartition, bool) {
   363  	sorted := ds.SortedPartitionsBySize()
   364  	if len(sorted) == 0 {
   365  		return DescribedLogDirPartition{}, false
   366  	}
   367  	return sorted[len(sorted)-1], true
   368  }
   369  
   370  // Each calls fn for each log directory.
   371  func (ds DescribedLogDirs) Each(fn func(DescribedLogDir)) {
   372  	for _, d := range ds {
   373  		fn(d)
   374  	}
   375  }
   376  
   377  // Each calls fn for each partition in any directory.
   378  func (ds DescribedLogDirs) EachPartition(fn func(d DescribedLogDirPartition)) {
   379  	for _, d := range ds {
   380  		d.Topics.Each(fn)
   381  	}
   382  }
   383  
   384  // EachError calls fn for every directory that has a non-nil error.
   385  func (ds DescribedLogDirs) EachError(fn func(DescribedLogDir)) {
   386  	for _, d := range ds {
   387  		if d.Err != nil {
   388  			fn(d)
   389  		}
   390  	}
   391  }
   392  
   393  // DescribedLogDir is a described log directory.
   394  type DescribedLogDir struct {
   395  	Broker int32                 // Broker is the broker being described.
   396  	Dir    string                // Dir is the described directory.
   397  	Topics DescribedLogDirTopics // Partitions are the partitions in this directory.
   398  	Err    error                 // Err is non-nil if this directory could not be described.
   399  }
   400  
   401  // Size returns the total size of all partitions in this directory. This is
   402  // a shortcut for .Topics.Size().
   403  func (ds DescribedLogDir) Size() int64 {
   404  	return ds.Topics.Size()
   405  }
   406  
   407  // DescribedLogDirTopics contains per-partition described log directories.
   408  type DescribedLogDirTopics map[string]map[int32]DescribedLogDirPartition
   409  
   410  // Lookup returns the described partition if it exists.
   411  func (ds DescribedLogDirTopics) Lookup(t string, p int32) (DescribedLogDirPartition, bool) {
   412  	ps, exists := ds[t]
   413  	if !exists {
   414  		return DescribedLogDirPartition{}, false
   415  	}
   416  	d, exists := ps[p]
   417  	return d, exists
   418  }
   419  
   420  // Size returns the total size of all partitions in this directory.
   421  func (ds DescribedLogDirTopics) Size() int64 {
   422  	var tot int64
   423  	ds.Each(func(d DescribedLogDirPartition) {
   424  		tot += d.Size
   425  	})
   426  	return tot
   427  }
   428  
   429  // Sorted returns all partitions sorted by topic then partition.
   430  func (ds DescribedLogDirTopics) Sorted() []DescribedLogDirPartition {
   431  	var all []DescribedLogDirPartition
   432  	ds.Each(func(d DescribedLogDirPartition) {
   433  		all = append(all, d)
   434  	})
   435  	sort.Slice(all, func(i, j int) bool { return all[i].Less(all[j]) })
   436  	return all
   437  }
   438  
   439  // SortedBySize returns all partitions sorted by smallest size to largest. If
   440  // partitions are of equal size, the sorting is topic then partition.
   441  func (ds DescribedLogDirTopics) SortedBySize() []DescribedLogDirPartition {
   442  	var all []DescribedLogDirPartition
   443  	ds.Each(func(d DescribedLogDirPartition) {
   444  		all = append(all, d)
   445  	})
   446  	sort.Slice(all, func(i, j int) bool { return all[i].LessBySize(all[j]) })
   447  	return all
   448  }
   449  
   450  // Each calls fn for every partition.
   451  func (ds DescribedLogDirTopics) Each(fn func(p DescribedLogDirPartition)) {
   452  	for _, ps := range ds {
   453  		for _, d := range ps {
   454  			fn(d)
   455  		}
   456  	}
   457  }
   458  
   459  // DescribedLogDirPartition is the information for a single partitions described
   460  // log directory.
   461  type DescribedLogDirPartition struct {
   462  	Broker    int32  // Broker is the broker this partition is on.
   463  	Dir       string // Dir is the directory this partition lives in.
   464  	Topic     string // Topic is the topic for this partition.
   465  	Partition int32  // Partition is this partition.
   466  	Size      int64  // Size is the total size of the log segments of this partition, in bytes.
   467  
   468  	// OffsetLag is how far behind the log end offset this partition is.
   469  	// The math is:
   470  	//
   471  	//     if IsFuture {
   472  	//         logEndOffset - futureLogEndOffset
   473  	//     } else {
   474  	//         max(highWaterMark - logEndOffset)
   475  	//     }
   476  	//
   477  	OffsetLag int64
   478  	// IsFuture is true if this replica was created by an
   479  	// AlterReplicaLogDirsRequest and will replace the current log of the
   480  	// replica in the future.
   481  	IsFuture bool
   482  }
   483  
   484  // Less returns if one dir partition is less than the other, by dir, topic,
   485  // partition, and size.
   486  func (p DescribedLogDirPartition) Less(other DescribedLogDirPartition) bool {
   487  	if p.Broker < other.Broker {
   488  		return true
   489  	}
   490  	if p.Broker > other.Broker {
   491  		return false
   492  	}
   493  	if p.Dir < other.Dir {
   494  		return true
   495  	}
   496  	if p.Dir > other.Dir {
   497  		return false
   498  	}
   499  	if p.Topic < other.Topic {
   500  		return true
   501  	}
   502  	if p.Topic > other.Topic {
   503  		return false
   504  	}
   505  	if p.Partition < other.Partition {
   506  		return true
   507  	}
   508  	if p.Partition > other.Partition {
   509  		return false
   510  	}
   511  	return p.Size < other.Size
   512  }
   513  
   514  // LessBySize returns if one dir partition is less than the other by size,
   515  // otherwise by normal Less semantics.
   516  func (p DescribedLogDirPartition) LessBySize(other DescribedLogDirPartition) bool {
   517  	if p.Size < other.Size {
   518  		return true
   519  	}
   520  	return p.Less(other)
   521  }
   522  
   523  func newDescribeLogDirsResp(node int32, resp *kmsg.DescribeLogDirsResponse) DescribedLogDirs {
   524  	ds := make(DescribedLogDirs)
   525  	for _, rd := range resp.Dirs {
   526  		d := DescribedLogDir{
   527  			Broker: node,
   528  			Dir:    rd.Dir,
   529  			Topics: make(DescribedLogDirTopics),
   530  			Err:    kerr.ErrorForCode(rd.ErrorCode),
   531  		}
   532  		for _, rt := range rd.Topics {
   533  			t := make(map[int32]DescribedLogDirPartition)
   534  			d.Topics[rt.Topic] = t
   535  			for _, rp := range rt.Partitions {
   536  				t[rp.Partition] = DescribedLogDirPartition{
   537  					Broker:    node,
   538  					Dir:       rd.Dir,
   539  					Topic:     rt.Topic,
   540  					Partition: rp.Partition,
   541  					Size:      rp.Size,
   542  					OffsetLag: rp.OffsetLag,
   543  					IsFuture:  rp.IsFuture,
   544  				}
   545  			}
   546  		}
   547  		ds[rd.Dir] = d
   548  	}
   549  	return ds
   550  }
   551  
   552  // DescribeAllLogDirs describes the log directores for every input topic
   553  // partition on every broker. If the input set is nil, this describes all log
   554  // directories.
   555  //
   556  // This may return *ShardErrors.
   557  func (cl *Client) DescribeAllLogDirs(ctx context.Context, s TopicsSet) (DescribedAllLogDirs, error) {
   558  	req := describeLogDirsReq(s)
   559  	shards := cl.cl.RequestSharded(ctx, req)
   560  	resps := make(DescribedAllLogDirs)
   561  	return resps, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
   562  		resp := kr.(*kmsg.DescribeLogDirsResponse)
   563  		if err := maybeAuthErr(resp.ErrorCode); err != nil {
   564  			return err
   565  		}
   566  		if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
   567  			return err
   568  		}
   569  		resps[b.NodeID] = newDescribeLogDirsResp(b.NodeID, resp) // one node ID, no need to unique-check
   570  		return nil
   571  	})
   572  }
   573  
   574  // DescribeBrokerLogDirs describes the log directories for the input topic
   575  // partitions on the given broker. If the input set is nil, this describes all
   576  // log directories.
   577  func (cl *Client) DescribeBrokerLogDirs(ctx context.Context, broker int32, s TopicsSet) (DescribedLogDirs, error) {
   578  	req := describeLogDirsReq(s)
   579  	b := cl.cl.Broker(int(broker))
   580  	kresp, err := b.RetriableRequest(ctx, req)
   581  	if err != nil {
   582  		return nil, err
   583  	}
   584  	resp := kresp.(*kmsg.DescribeLogDirsResponse)
   585  	if err := maybeAuthErr(resp.ErrorCode); err != nil {
   586  		return nil, err
   587  	}
   588  	if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
   589  		return nil, err
   590  	}
   591  	return newDescribeLogDirsResp(broker, resp), nil
   592  }
   593  

View as plain text