...

Source file src/github.com/twmb/franz-go/pkg/kadm/misc.go

Documentation: github.com/twmb/franz-go/pkg/kadm

     1  package kadm
     2  
     3  import (
     4  	"context"
     5  	"crypto/rand"
     6  	"crypto/sha256"
     7  	"crypto/sha512"
     8  	"fmt"
     9  	"sort"
    10  	"strings"
    11  	"sync"
    12  
    13  	"golang.org/x/crypto/pbkdf2"
    14  
    15  	"github.com/twmb/franz-go/pkg/kerr"
    16  	"github.com/twmb/franz-go/pkg/kmsg"
    17  	"github.com/twmb/franz-go/pkg/kversion"
    18  )
    19  
    20  // FindCoordinatorResponse contains information for the coordinator for a group
    21  // or transactional ID.
    22  type FindCoordinatorResponse struct {
    23  	Name       string // Name is the coordinator key this response is for.
    24  	NodeID     int32  // NodeID is the node ID of the coordinator for this key.
    25  	Host       string // Host is the host of the coordinator for this key.
    26  	Port       int32  // Port is the port of the coordinator for this key.
    27  	Err        error  // Err is any error encountered when requesting the coordinator.
    28  	ErrMessage string // ErrMessage a potential extra message describing any error.
    29  }
    30  
    31  // FindCoordinatorResponses contains responses to finding coordinators for
    32  // groups or transactions.
    33  type FindCoordinatorResponses map[string]FindCoordinatorResponse
    34  
    35  // AllFailed returns whether all responses are errored.
    36  func (rs FindCoordinatorResponses) AllFailed() bool {
    37  	var n int
    38  	rs.EachError(func(FindCoordinatorResponse) { n++ })
    39  	return len(rs) > 0 && n == len(rs)
    40  }
    41  
    42  // Sorted returns all coordinator responses sorted by name.
    43  func (rs FindCoordinatorResponses) Sorted() []FindCoordinatorResponse {
    44  	s := make([]FindCoordinatorResponse, 0, len(rs))
    45  	for _, r := range rs {
    46  		s = append(s, r)
    47  	}
    48  	sort.Slice(s, func(i, j int) bool { return s[i].Name < s[j].Name })
    49  	return s
    50  }
    51  
    52  // EachError calls fn for every response that has a non-nil error.
    53  func (rs FindCoordinatorResponses) EachError(fn func(FindCoordinatorResponse)) {
    54  	for _, r := range rs {
    55  		if r.Err != nil {
    56  			fn(r)
    57  		}
    58  	}
    59  }
    60  
    61  // Each calls fn for every response.
    62  func (rs FindCoordinatorResponses) Each(fn func(FindCoordinatorResponse)) {
    63  	for _, r := range rs {
    64  		fn(r)
    65  	}
    66  }
    67  
    68  // Error iterates over all responses and returns the first error encountered,
    69  // if any.
    70  func (rs FindCoordinatorResponses) Error() error {
    71  	for _, r := range rs {
    72  		if r.Err != nil {
    73  			return r.Err
    74  		}
    75  	}
    76  	return nil
    77  }
    78  
    79  // Ok returns true if there are no errors. This is a shortcut for rs.Error() ==
    80  // nil.
    81  func (rs FindCoordinatorResponses) Ok() bool {
    82  	return rs.Error() == nil
    83  }
    84  
    85  // FindGroupCoordinators returns the coordinator for all requested group names.
    86  //
    87  // This may return *ShardErrors or *AuthError.
    88  func (cl *Client) FindGroupCoordinators(ctx context.Context, groups ...string) FindCoordinatorResponses {
    89  	return cl.findCoordinators(ctx, 0, groups...)
    90  }
    91  
    92  // FindTxnCoordinators returns the coordinator for all requested transactional
    93  // IDs.
    94  //
    95  // This may return *ShardErrors or *AuthError.
    96  func (cl *Client) FindTxnCoordinators(ctx context.Context, txnIDs ...string) FindCoordinatorResponses {
    97  	return cl.findCoordinators(ctx, 1, txnIDs...)
    98  }
    99  
   100  func (cl *Client) findCoordinators(ctx context.Context, kind int8, names ...string) FindCoordinatorResponses {
   101  	resps := make(FindCoordinatorResponses)
   102  	if len(names) == 0 {
   103  		return resps
   104  	}
   105  
   106  	req := kmsg.NewPtrFindCoordinatorRequest()
   107  	req.CoordinatorType = kind
   108  	req.CoordinatorKeys = names
   109  
   110  	keyErr := func(k string, err error) {
   111  		resps[k] = FindCoordinatorResponse{
   112  			Name: k,
   113  			Err:  err,
   114  		}
   115  	}
   116  	allKeysErr := func(req *kmsg.FindCoordinatorRequest, err error) {
   117  		for _, k := range req.CoordinatorKeys {
   118  			keyErr(k, err)
   119  		}
   120  	}
   121  
   122  	shards := cl.cl.RequestSharded(ctx, req)
   123  	for _, shard := range shards {
   124  		req := shard.Req.(*kmsg.FindCoordinatorRequest)
   125  		if shard.Err != nil {
   126  			allKeysErr(req, shard.Err)
   127  			continue
   128  		}
   129  		resp := shard.Resp.(*kmsg.FindCoordinatorResponse)
   130  		if err := maybeAuthErr(resp.ErrorCode); err != nil {
   131  			allKeysErr(req, err)
   132  			continue
   133  		}
   134  		for _, c := range resp.Coordinators {
   135  			if err := maybeAuthErr(c.ErrorCode); err != nil {
   136  				keyErr(c.Key, err)
   137  				continue
   138  			}
   139  			resps[c.Key] = FindCoordinatorResponse{ // key is always on one broker, no need to check existence
   140  				Name:       c.Key,
   141  				NodeID:     c.NodeID,
   142  				Host:       c.Host,
   143  				Port:       c.Port,
   144  				Err:        kerr.ErrorForCode(c.ErrorCode),
   145  				ErrMessage: unptrStr(c.ErrorMessage),
   146  			}
   147  		}
   148  	}
   149  	return resps
   150  }
   151  
   152  type minmax struct {
   153  	min, max int16
   154  }
   155  
   156  // BrokerApiVersions contains the API versions for a single broker.
   157  type BrokerApiVersions struct {
   158  	NodeID int32 // NodeID is the node this API versions response is for.
   159  
   160  	raw         *kmsg.ApiVersionsResponse
   161  	keyVersions map[int16]minmax
   162  
   163  	Err error // Err is non-nil if the API versions request failed.
   164  }
   165  
   166  // Raw returns the raw API versions response.
   167  func (v *BrokerApiVersions) Raw() *kmsg.ApiVersionsResponse {
   168  	return v.raw
   169  }
   170  
   171  // KeyVersions returns the broker's min and max version for an API key and
   172  // whether this broker supports the request.
   173  func (v *BrokerApiVersions) KeyVersions(key int16) (min, max int16, exists bool) {
   174  	vs, exists := v.keyVersions[key]
   175  	return vs.min, vs.max, exists
   176  }
   177  
   178  // KeyVersions returns the broker's min version for an API key and whether this
   179  // broker supports the request.
   180  func (v *BrokerApiVersions) KeyMinVersion(key int16) (min int16, exists bool) {
   181  	min, _, exists = v.KeyVersions(key)
   182  	return min, exists
   183  }
   184  
   185  // KeyVersions returns the broker's max version for an API key and whether this
   186  // broker supports the request.
   187  func (v *BrokerApiVersions) KeyMaxVersion(key int16) (max int16, exists bool) {
   188  	_, max, exists = v.KeyVersions(key)
   189  	return max, exists
   190  }
   191  
   192  // EachKeySorted calls fn for every API key in the broker response, from the
   193  // smallest API key to the largest.
   194  func (v *BrokerApiVersions) EachKeySorted(fn func(key, min, max int16)) {
   195  	type kmm struct {
   196  		k, min, max int16
   197  	}
   198  	kmms := make([]kmm, 0, len(v.keyVersions))
   199  	for key, minmax := range v.keyVersions {
   200  		kmms = append(kmms, kmm{key, minmax.min, minmax.max})
   201  	}
   202  	sort.Slice(kmms, func(i, j int) bool { return kmms[i].k < kmms[j].k })
   203  	for _, kmm := range kmms {
   204  		fn(kmm.k, kmm.min, kmm.max)
   205  	}
   206  }
   207  
   208  // VersionGuess returns the best guess of Kafka that this broker is. This is a
   209  // shorcut for:
   210  //
   211  //	kversion.FromApiVersionsResponse(v.Raw()).VersionGuess(opt...)
   212  //
   213  // Check the kversion.VersionGuess API docs for more details.
   214  func (v *BrokerApiVersions) VersionGuess(opt ...kversion.VersionGuessOpt) string {
   215  	return kversion.FromApiVersionsResponse(v.raw).VersionGuess(opt...)
   216  }
   217  
   218  // BrokerApiVersions contains API versions for all brokers that are reachable
   219  // from a metadata response.
   220  type BrokersApiVersions map[int32]BrokerApiVersions
   221  
   222  // Sorted returns all broker responses sorted by node ID.
   223  func (vs BrokersApiVersions) Sorted() []BrokerApiVersions {
   224  	s := make([]BrokerApiVersions, 0, len(vs))
   225  	for _, v := range vs {
   226  		s = append(s, v)
   227  	}
   228  	sort.Slice(s, func(i, j int) bool { return s[i].NodeID < s[j].NodeID })
   229  	return s
   230  }
   231  
   232  // Each calls fn for every broker response.
   233  func (vs BrokersApiVersions) Each(fn func(BrokerApiVersions)) {
   234  	for _, v := range vs {
   235  		fn(v)
   236  	}
   237  }
   238  
   239  // ApiVersions queries every broker in a metadata response for their API
   240  // versions. This returns an error only if the metadata request fails.
   241  func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error) {
   242  	m, err := cl.BrokerMetadata(ctx)
   243  	if err != nil {
   244  		return nil, err
   245  	}
   246  
   247  	var mu sync.Mutex
   248  	var wg sync.WaitGroup
   249  	vs := make(BrokersApiVersions, len(m.Brokers))
   250  	for _, n := range m.Brokers.NodeIDs() {
   251  		n := n
   252  		wg.Add(1)
   253  		go func() {
   254  			defer wg.Done()
   255  			req := kmsg.NewPtrApiVersionsRequest()
   256  			req.ClientSoftwareName = "kadm"
   257  			req.ClientSoftwareVersion = softwareVersion()
   258  			v := BrokerApiVersions{NodeID: n, keyVersions: make(map[int16]minmax)}
   259  			v.raw, v.Err = req.RequestWith(ctx, cl.cl.Broker(int(n)))
   260  
   261  			mu.Lock()
   262  			defer mu.Unlock()
   263  			defer func() { vs[n] = v }()
   264  			if v.Err != nil {
   265  				return
   266  			}
   267  
   268  			v.Err = kerr.ErrorForCode(v.raw.ErrorCode)
   269  			for _, k := range v.raw.ApiKeys {
   270  				v.keyVersions[k.ApiKey] = minmax{
   271  					min: k.MinVersion,
   272  					max: k.MaxVersion,
   273  				}
   274  			}
   275  		}()
   276  	}
   277  	wg.Wait()
   278  
   279  	return vs, nil
   280  }
   281  
   282  // ClientQuotaEntityComponent is a quota entity component.
   283  type ClientQuotaEntityComponent struct {
   284  	Type string  // Type is the entity type ("user", "client-id", "ip").
   285  	Name *string // Name is the entity name, or null if the default.
   286  }
   287  
   288  // String returns key=value, or key=<default> if value is nil.
   289  func (d ClientQuotaEntityComponent) String() string {
   290  	if d.Name == nil {
   291  		return d.Type + "=<default>"
   292  	}
   293  	return fmt.Sprintf("%s=%s", d.Type, *d.Name)
   294  }
   295  
   296  // ClientQuotaEntity contains the components that make up a single entity.
   297  type ClientQuotaEntity []ClientQuotaEntityComponent
   298  
   299  // String returns {key=value, key=value}, joining all entities with a ", " and
   300  // wrapping in braces.
   301  func (ds ClientQuotaEntity) String() string {
   302  	var ss []string
   303  	for _, d := range ds {
   304  		ss = append(ss, d.String())
   305  	}
   306  	return "{" + strings.Join(ss, ", ") + "}"
   307  }
   308  
   309  // ClientQuotaValue is a quota name and value.
   310  type ClientQuotaValue struct {
   311  	Key   string  // Key is the quota configuration key.
   312  	Value float64 // Value is the quota configuration value.
   313  }
   314  
   315  // String returns key=value.
   316  func (d ClientQuotaValue) String() string {
   317  	return fmt.Sprintf("%s=%f", d.Key, d.Value)
   318  }
   319  
   320  // ClientQuotaValues contains all client quota values.
   321  type ClientQuotaValues []ClientQuotaValue
   322  
   323  // QuotasMatchType specifies how to match a described client quota entity.
   324  //
   325  // 0 means to match the name exactly: user=foo will only match components of
   326  // entity type "user" and entity name "foo".
   327  //
   328  // 1 means to match the default of the name: entity type "user" with a default
   329  // match will return the default quotas for user entities.
   330  //
   331  // 2 means to match any name: entity type "user" with any matching will return
   332  // both names and defaults.
   333  type QuotasMatchType = kmsg.QuotasMatchType
   334  
   335  // DescribeClientQuotaComponent is an input entity component to describing
   336  // client quotas: we define the type of quota ("client-id", "user"), how to
   337  // match, and the match name if needed.
   338  type DescribeClientQuotaComponent struct {
   339  	Type      string          // Type is the type of entity component to describe ("user", "client-id", "ip").
   340  	MatchName *string         // MatchName is the name to match again; this is only needed when MatchType is 0 (exact).
   341  	MatchType QuotasMatchType // MatchType is how to match an entity.
   342  }
   343  
   344  // DescribedClientQuota contains a described quota. A single quota is made up
   345  // of multiple entities and multiple values, for example, "user=foo" is one
   346  // component of the entity, and "client-id=bar" is another.
   347  type DescribedClientQuota struct {
   348  	Entity ClientQuotaEntity // Entity is the entity of this described client quota.
   349  	Values ClientQuotaValues // Values contains the quota valies for this entity.
   350  }
   351  
   352  // DescribedClientQuota contains client quotas that were described.
   353  type DescribedClientQuotas []DescribedClientQuota
   354  
   355  // DescribeClientQuotas describes client quotas. If strict is true, the
   356  // response includes only the requested components.
   357  func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, entityComponents []DescribeClientQuotaComponent) (DescribedClientQuotas, error) {
   358  	req := kmsg.NewPtrDescribeClientQuotasRequest()
   359  	req.Strict = strict
   360  	for _, entity := range entityComponents {
   361  		rc := kmsg.NewDescribeClientQuotasRequestComponent()
   362  		rc.EntityType = entity.Type
   363  		rc.Match = entity.MatchName
   364  		rc.MatchType = entity.MatchType
   365  		req.Components = append(req.Components, rc)
   366  	}
   367  	resp, err := req.RequestWith(ctx, cl.cl)
   368  	if err != nil {
   369  		return nil, err
   370  	}
   371  	if err := maybeAuthErr(resp.ErrorCode); err != nil {
   372  		return nil, err
   373  	}
   374  	if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
   375  		return nil, err
   376  	}
   377  	var qs DescribedClientQuotas
   378  	for _, entry := range resp.Entries {
   379  		var q DescribedClientQuota
   380  		for _, e := range entry.Entity {
   381  			q.Entity = append(q.Entity, ClientQuotaEntityComponent{
   382  				Type: e.Type,
   383  				Name: e.Name,
   384  			})
   385  		}
   386  		for _, v := range entry.Values {
   387  			q.Values = append(q.Values, ClientQuotaValue{
   388  				Key:   v.Key,
   389  				Value: v.Value,
   390  			})
   391  		}
   392  		qs = append(qs, q)
   393  	}
   394  	return qs, nil
   395  }
   396  
   397  // AlterClientQuotaOp sets or remove a client quota.
   398  type AlterClientQuotaOp struct {
   399  	Key    string  // Key is the quota configuration key to set or remove.
   400  	Value  float64 // Value is the quota configuration value to set or remove.
   401  	Remove bool    // Remove, if true, removes this quota rather than sets it.
   402  }
   403  
   404  // AlterClientQuotaEntry pairs an entity with quotas to set or remove.
   405  type AlterClientQuotaEntry struct {
   406  	Entity ClientQuotaEntity    // Entity is the entity to alter quotas for.
   407  	Ops    []AlterClientQuotaOp // Ops are quotas to set or remove.
   408  }
   409  
   410  // AlteredClientQuota is the result for a single entity that was altered.
   411  type AlteredClientQuota struct {
   412  	Entity     ClientQuotaEntity // Entity is the entity this result is for.
   413  	Err        error             // Err is non-nil if the alter operation on this entity failed.
   414  	ErrMessage string            // ErrMessage is an optional additional message on error.
   415  }
   416  
   417  // AlteredClientQuotas contains results for all altered entities.
   418  type AlteredClientQuotas []AlteredClientQuota
   419  
   420  // AlterClientQuotas alters quotas for the input entries. You may consider
   421  // checking ValidateAlterClientQuotas before using this method.
   422  func (cl *Client) AlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) {
   423  	return cl.alterClientQuotas(ctx, false, entries)
   424  }
   425  
   426  // ValidateAlterClientQuotas validates an alter client quota request. This
   427  // returns exactly what AlterClientQuotas returns, but does not actually alter
   428  // quotas.
   429  func (cl *Client) ValidateAlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) {
   430  	return cl.alterClientQuotas(ctx, true, entries)
   431  }
   432  
   433  func (cl *Client) alterClientQuotas(ctx context.Context, validate bool, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) {
   434  	req := kmsg.NewPtrAlterClientQuotasRequest()
   435  	req.ValidateOnly = validate
   436  	for _, entry := range entries {
   437  		re := kmsg.NewAlterClientQuotasRequestEntry()
   438  		for _, c := range entry.Entity {
   439  			rec := kmsg.NewAlterClientQuotasRequestEntryEntity()
   440  			rec.Type = c.Type
   441  			rec.Name = c.Name
   442  			re.Entity = append(re.Entity, rec)
   443  		}
   444  		for _, op := range entry.Ops {
   445  			reo := kmsg.NewAlterClientQuotasRequestEntryOp()
   446  			reo.Key = op.Key
   447  			reo.Value = op.Value
   448  			reo.Remove = op.Remove
   449  			re.Ops = append(re.Ops, reo)
   450  		}
   451  		req.Entries = append(req.Entries, re)
   452  	}
   453  	resp, err := req.RequestWith(ctx, cl.cl)
   454  	if err != nil {
   455  		return nil, err
   456  	}
   457  	var as AlteredClientQuotas
   458  	for _, entry := range resp.Entries {
   459  		var e ClientQuotaEntity
   460  		for _, c := range entry.Entity {
   461  			e = append(e, ClientQuotaEntityComponent{
   462  				Type: c.Type,
   463  				Name: c.Name,
   464  			})
   465  		}
   466  		a := AlteredClientQuota{
   467  			Entity:     e,
   468  			Err:        kerr.ErrorForCode(entry.ErrorCode),
   469  			ErrMessage: unptrStr(entry.ErrorMessage),
   470  		}
   471  		as = append(as, a)
   472  	}
   473  	return as, nil
   474  }
   475  
   476  // ScramMechanism is a SCRAM mechanism.
   477  type ScramMechanism int8
   478  
   479  const (
   480  	// ScramSha256 represents the SCRAM-SHA-256 mechanism.
   481  	ScramSha256 ScramMechanism = 1
   482  	// ScramSha512 represents the SCRAM-SHA-512 mechanism.
   483  	ScramSha512 ScramMechanism = 2
   484  )
   485  
   486  // String returns either SCRAM-SHA-256, SCRAM-SHA-512, or UNKNOWN.
   487  func (s ScramMechanism) String() string {
   488  	switch s {
   489  	case ScramSha256:
   490  		return "SCRAM-SHA-256"
   491  	case ScramSha512:
   492  		return "SCRAM-SHA-512"
   493  	default:
   494  		return "UNKNOWN"
   495  	}
   496  }
   497  
   498  // CredInfo contains the SCRAM mechanism and iterations for a password.
   499  type CredInfo struct {
   500  	// Mechanism is the SCRAM mechanism a password exists for. This is 0
   501  	// for UNKNOWN, 1 for SCRAM-SHA-256, and 2 for SCRAM-SHA-512.
   502  	Mechanism ScramMechanism
   503  	// Iterations is the number of SCRAM iterations for this password.
   504  	Iterations int32
   505  }
   506  
   507  // String returns MECHANISM=iterations={c.Iterations}.
   508  func (c CredInfo) String() string {
   509  	return fmt.Sprintf("%s=iterations=%d", c.Mechanism, c.Iterations)
   510  }
   511  
   512  // DescribedUserSCRAM contains a user, the SCRAM mechanisms that the user has
   513  // passwords for, and if describing the user SCRAM credentials errored.
   514  type DescribedUserSCRAM struct {
   515  	User       string     // User is the user this described user credential is for.
   516  	CredInfos  []CredInfo // CredInfos contains SCRAM mechanisms the user has passwords for.
   517  	Err        error      // Err is any error encountered when describing the user.
   518  	ErrMessage string     // ErrMessage a potential extra message describing any error.
   519  }
   520  
   521  // DescribedUserSCRAMs contains described user SCRAM credentials keyed by user.
   522  type DescribedUserSCRAMs map[string]DescribedUserSCRAM
   523  
   524  // Sorted returns the described user credentials ordered by user.
   525  func (ds DescribedUserSCRAMs) Sorted() []DescribedUserSCRAM {
   526  	s := make([]DescribedUserSCRAM, 0, len(ds))
   527  	for _, d := range ds {
   528  		s = append(s, d)
   529  	}
   530  	sort.Slice(s, func(i, j int) bool { return s[i].User < s[j].User })
   531  	return s
   532  }
   533  
   534  // AllFailed returns whether all described user credentials are errored.
   535  func (ds DescribedUserSCRAMs) AllFailed() bool {
   536  	var n int
   537  	ds.EachError(func(DescribedUserSCRAM) { n++ })
   538  	return len(ds) > 0 && n == len(ds)
   539  }
   540  
   541  // EachError calls fn for every described user that has a non-nil error.
   542  func (ds DescribedUserSCRAMs) EachError(fn func(DescribedUserSCRAM)) {
   543  	for _, d := range ds {
   544  		if d.Err != nil {
   545  			fn(d)
   546  		}
   547  	}
   548  }
   549  
   550  // Each calls fn for every described user.
   551  func (ds DescribedUserSCRAMs) Each(fn func(DescribedUserSCRAM)) {
   552  	for _, d := range ds {
   553  		fn(d)
   554  	}
   555  }
   556  
   557  // Error iterates over all described users and returns the first error
   558  // encountered, if any.
   559  func (ds DescribedUserSCRAMs) Error() error {
   560  	for _, d := range ds {
   561  		if d.Err != nil {
   562  			return d.Err
   563  		}
   564  	}
   565  	return nil
   566  }
   567  
   568  // Ok returns true if there are no errors. This is a shortcut for rs.Error() ==
   569  // nil.
   570  func (ds DescribedUserSCRAMs) Ok() bool {
   571  	return ds.Error() == nil
   572  }
   573  
   574  // DescribeUserSCRAMs returns a small bit of information about all users in the
   575  // input request that have SCRAM passwords configured.  No users requests all
   576  // users.
   577  func (cl *Client) DescribeUserSCRAMs(ctx context.Context, users ...string) (DescribedUserSCRAMs, error) {
   578  	req := kmsg.NewPtrDescribeUserSCRAMCredentialsRequest()
   579  	for _, u := range users {
   580  		ru := kmsg.NewDescribeUserSCRAMCredentialsRequestUser()
   581  		ru.Name = u
   582  		req.Users = append(req.Users, ru)
   583  	}
   584  	resp, err := req.RequestWith(ctx, cl.cl)
   585  	if err != nil {
   586  		return nil, err
   587  	}
   588  	if err := maybeAuthErr(resp.ErrorCode); err != nil {
   589  		return nil, err
   590  	}
   591  	if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
   592  		return nil, err
   593  	}
   594  	rs := make(DescribedUserSCRAMs)
   595  	for _, res := range resp.Results {
   596  		r := DescribedUserSCRAM{
   597  			User:       res.User,
   598  			Err:        kerr.ErrorForCode(res.ErrorCode),
   599  			ErrMessage: unptrStr(res.ErrorMessage),
   600  		}
   601  		for _, i := range res.CredentialInfos {
   602  			r.CredInfos = append(r.CredInfos, CredInfo{
   603  				Mechanism:  ScramMechanism(i.Mechanism),
   604  				Iterations: i.Iterations,
   605  			})
   606  		}
   607  		rs[r.User] = r
   608  	}
   609  	return rs, nil
   610  }
   611  
   612  // DeleteSCRAM deletes a password with the given mechanism for the user.
   613  type DeleteSCRAM struct {
   614  	User      string         // User is the username to match for deletion.
   615  	Mechanism ScramMechanism // Mechanism is the mechanism to match to delete a password for.
   616  }
   617  
   618  // UpsertSCRAM either updates or creates (inserts) a new password for a user.
   619  // There are two ways to specify a password: either with the Password field
   620  // directly, or by specifying both Salt and SaltedPassword. If you specify just
   621  // a password, this package generates a 24 byte salt and uses pbkdf2 to create
   622  // the salted password.
   623  type UpsertSCRAM struct {
   624  	User           string         // User is the username to use.
   625  	Mechanism      ScramMechanism // Mechanism is the mechanism to use.
   626  	Iterations     int32          // Iterations is the SCRAM iterations to use; must be between 4096 and 16384.
   627  	Password       string         // Password is the password to salt and convert to a salted password. Requires Salt and SaltedPassword to be empty.
   628  	Salt           []byte         // Salt must be paired with SaltedPassword and requires Password to be empty.
   629  	SaltedPassword []byte         // SaltedPassword must be paired with Salt and requires Password to be empty.
   630  }
   631  
   632  // AlteredUserSCRAM is the result of an alter operation.
   633  type AlteredUserSCRAM struct {
   634  	User       string // User is the username that was altered.
   635  	Err        error  // Err is any error encountered when altering the user.
   636  	ErrMessage string // ErrMessage a potential extra message describing any error.
   637  }
   638  
   639  // AlteredUserSCRAMs contains altered user SCRAM credentials keyed by user.
   640  type AlteredUserSCRAMs map[string]AlteredUserSCRAM
   641  
   642  // Sorted returns the altered user credentials ordered by user.
   643  func (as AlteredUserSCRAMs) Sorted() []AlteredUserSCRAM {
   644  	s := make([]AlteredUserSCRAM, 0, len(as))
   645  	for _, a := range as {
   646  		s = append(s, a)
   647  	}
   648  	sort.Slice(s, func(i, j int) bool { return s[i].User < s[j].User })
   649  	return s
   650  }
   651  
   652  // AllFailed returns whether all altered user credentials are errored.
   653  func (as AlteredUserSCRAMs) AllFailed() bool {
   654  	var n int
   655  	as.EachError(func(AlteredUserSCRAM) { n++ })
   656  	return len(as) > 0 && n == len(as)
   657  }
   658  
   659  // EachError calls fn for every altered user that has a non-nil error.
   660  func (as AlteredUserSCRAMs) EachError(fn func(AlteredUserSCRAM)) {
   661  	for _, a := range as {
   662  		if a.Err != nil {
   663  			fn(a)
   664  		}
   665  	}
   666  }
   667  
   668  // Each calls fn for every altered user.
   669  func (as AlteredUserSCRAMs) Each(fn func(AlteredUserSCRAM)) {
   670  	for _, a := range as {
   671  		fn(a)
   672  	}
   673  }
   674  
   675  // Error iterates over all altered users and returns the first error
   676  // encountered, if any.
   677  func (as AlteredUserSCRAMs) Error() error {
   678  	for _, a := range as {
   679  		if a.Err != nil {
   680  			return a.Err
   681  		}
   682  	}
   683  	return nil
   684  }
   685  
   686  // Ok returns true if there are no errors. This is a shortcut for rs.Error() ==
   687  // nil.
   688  func (as AlteredUserSCRAMs) Ok() bool {
   689  	return as.Error() == nil
   690  }
   691  
   692  // AlterUserSCRAMs deletes, updates, or creates (inserts) user SCRAM
   693  // credentials. Note that a username can only appear once across both upserts
   694  // and deletes. This modifies elements of the upsert slice that need to have a
   695  // salted password generated.
   696  func (cl *Client) AlterUserSCRAMs(ctx context.Context, del []DeleteSCRAM, upsert []UpsertSCRAM) (AlteredUserSCRAMs, error) {
   697  	for i, u := range upsert {
   698  		if u.Password != "" {
   699  			if len(u.Salt) > 0 || len(u.SaltedPassword) > 0 {
   700  				return nil, fmt.Errorf("user %s: cannot specify both a password and a salt / salted password", u.User)
   701  			}
   702  			u.Salt = make([]byte, 24)
   703  			if _, err := rand.Read(u.Salt); err != nil {
   704  				return nil, fmt.Errorf("user %s: unable to generate salt: %v", u.User, err)
   705  			}
   706  			switch u.Mechanism {
   707  			case ScramSha256:
   708  				u.SaltedPassword = pbkdf2.Key([]byte(u.Password), u.Salt, int(u.Iterations), sha256.Size, sha256.New)
   709  			case ScramSha512:
   710  				u.SaltedPassword = pbkdf2.Key([]byte(u.Password), u.Salt, int(u.Iterations), sha512.Size, sha512.New)
   711  			default:
   712  				return nil, fmt.Errorf("user %s: unknown mechanism, unable to generate password", u.User)
   713  			}
   714  			upsert[i] = u
   715  		} else {
   716  			if len(u.Salt) == 0 || len(u.SaltedPassword) == 0 {
   717  				return nil, fmt.Errorf("user %s: must specify either a password or a salt and salted password", u.User)
   718  			}
   719  		}
   720  	}
   721  
   722  	req := kmsg.NewPtrAlterUserSCRAMCredentialsRequest()
   723  	for _, d := range del {
   724  		rd := kmsg.NewAlterUserSCRAMCredentialsRequestDeletion()
   725  		rd.Name = d.User
   726  		rd.Mechanism = int8(d.Mechanism)
   727  		req.Deletions = append(req.Deletions, rd)
   728  	}
   729  	for _, u := range upsert {
   730  		ru := kmsg.NewAlterUserSCRAMCredentialsRequestUpsertion()
   731  		ru.Name = u.User
   732  		ru.Mechanism = int8(u.Mechanism)
   733  		ru.Iterations = u.Iterations
   734  		ru.Salt = u.Salt
   735  		ru.SaltedPassword = u.SaltedPassword
   736  		req.Upsertions = append(req.Upsertions, ru)
   737  	}
   738  	resp, err := req.RequestWith(ctx, cl.cl)
   739  	if err != nil {
   740  		return nil, err
   741  	}
   742  	rs := make(AlteredUserSCRAMs)
   743  	for _, res := range resp.Results {
   744  		if err := maybeAuthErr(res.ErrorCode); err != nil {
   745  			return nil, err
   746  		}
   747  		r := AlteredUserSCRAM{
   748  			User:       res.User,
   749  			Err:        kerr.ErrorForCode(res.ErrorCode),
   750  			ErrMessage: unptrStr(res.ErrorMessage),
   751  		}
   752  		rs[r.User] = r
   753  	}
   754  	return rs, nil
   755  }
   756  
   757  // ElectLeadersHow is how partition leaders should be elected.
   758  type ElectLeadersHow int8
   759  
   760  const (
   761  	// ElectPreferredReplica elects the preferred replica for a partition.
   762  	ElectPreferredReplica ElectLeadersHow = 0
   763  	// ElectLiveReplica elects the first life replica if there are no
   764  	// in-sync replicas (i.e., this is unclean leader election).
   765  	ElectLiveReplica ElectLeadersHow = 1
   766  )
   767  
   768  // ElectLeadersResult is the result for a single partition in an elect leaders
   769  // request.
   770  type ElectLeadersResult struct {
   771  	Topic      string          // Topic is the topic this result is for.
   772  	Partition  int32           // Partition is the partition this result is for.
   773  	How        ElectLeadersHow // How is the type of election that was performed.
   774  	Err        error           // Err is non-nil if electing this partition's leader failed, such as the partition not existing or the preferred leader is not available and you used ElectPreferredReplica.
   775  	ErrMessage string          // ErrMessage a potential extra message describing any error.
   776  }
   777  
   778  // ElectLeadersResults contains per-topic, per-partition results for an elect
   779  // leaders request.
   780  type ElectLeadersResults map[string]map[int32]ElectLeadersResult
   781  
   782  // ElectLeaders elects leaders for partitions. This request was added in Kafka
   783  // 2.2 to replace the previously-ZooKeeper-only option of triggering leader
   784  // elections. See KIP-183 for more details.
   785  //
   786  // Kafka 2.4 introduced the ability to use unclean leader election. If you use
   787  // unclean leader election on a Kafka 2.2 or 2.3 cluster, the client will
   788  // instead fall back to preferred replica (clean) leader election. You can
   789  // check the result's How function (or field) to see.
   790  //
   791  // If s is nil, this will elect leaders for all partitions.
   792  //
   793  // This will return *AuthError if you do not have ALTER on CLUSTER for
   794  // kafka-cluster.
   795  func (cl *Client) ElectLeaders(ctx context.Context, how ElectLeadersHow, s TopicsSet) (ElectLeadersResults, error) {
   796  	req := kmsg.NewPtrElectLeadersRequest()
   797  	req.ElectionType = int8(how)
   798  	for _, t := range s.IntoList() {
   799  		rt := kmsg.NewElectLeadersRequestTopic()
   800  		rt.Topic = t.Topic
   801  		rt.Partitions = t.Partitions
   802  		req.Topics = append(req.Topics, rt)
   803  	}
   804  	resp, err := req.RequestWith(ctx, cl.cl)
   805  	if err != nil {
   806  		return nil, err
   807  	}
   808  	if err := maybeAuthErr(resp.ErrorCode); err != nil {
   809  		return nil, err
   810  	}
   811  	if resp.Version == 0 { // v0 does not have the election type field
   812  		how = ElectPreferredReplica
   813  	}
   814  	rs := make(ElectLeadersResults)
   815  	for _, t := range resp.Topics {
   816  		rt := make(map[int32]ElectLeadersResult)
   817  		rs[t.Topic] = rt
   818  		for _, p := range t.Partitions {
   819  			if err := maybeAuthErr(p.ErrorCode); err != nil {
   820  				return nil, err // v0 has no top-level err
   821  			}
   822  			rt[p.Partition] = ElectLeadersResult{
   823  				Topic:      t.Topic,
   824  				Partition:  p.Partition,
   825  				How:        how,
   826  				Err:        kerr.ErrorForCode(p.ErrorCode),
   827  				ErrMessage: unptrStr(p.ErrorMessage),
   828  			}
   829  		}
   830  	}
   831  	return rs, nil
   832  }
   833  
   834  // OffsetForLeaderEpochRequest contains topics, partitions, and leader epochs
   835  // to request offsets for in an OffsetForLeaderEpoch.
   836  type OffsetForLeaderEpochRequest map[string]map[int32]int32
   837  
   838  // Add adds a topic, partition, and leader epoch to the request.
   839  func (l *OffsetForLeaderEpochRequest) Add(topic string, partition, leaderEpoch int32) {
   840  	if *l == nil {
   841  		*l = make(map[string]map[int32]int32)
   842  	}
   843  	t := (*l)[topic]
   844  	if t == nil {
   845  		t = make(map[int32]int32)
   846  		(*l)[topic] = t
   847  	}
   848  	t[partition] = leaderEpoch
   849  }
   850  
   851  // OffsetForLeaderEpoch contains a response for a single partition in an
   852  // OffsetForLeaderEpoch request.
   853  type OffsetForLeaderEpoch struct {
   854  	NodeID    int32  // NodeID is the node that is the leader of this topic / partition.
   855  	Topic     string // Topic is the topic this leader epoch response is for.
   856  	Partition int32  // Partition is the partition this leader epoch response is for.
   857  
   858  	// LeaderEpoch is either
   859  	//
   860  	// 1) -1, if the requested LeaderEpoch is unknown.
   861  	//
   862  	// 2) Less than the requested LeaderEpoch, if the requested LeaderEpoch
   863  	// exists but has no records in it. For example, epoch 1 had end offset
   864  	// 37, then epoch 2 and 3 had no records: if you request LeaderEpoch 3,
   865  	// this will return LeaderEpoch 1 with EndOffset 37.
   866  	//
   867  	// 3) Equal to the requested LeaderEpoch, if the requested LeaderEpoch
   868  	// is equal to or less than the current epoch for the partition.
   869  	LeaderEpoch int32
   870  
   871  	// EndOffset is either
   872  	//
   873  	// 1) The LogEndOffset, if the broker has the same LeaderEpoch as the
   874  	// request.
   875  	//
   876  	// 2) the beginning offset of the next LeaderEpoch, if the broker has a
   877  	// higher LeaderEpoch.
   878  	//
   879  	// The second option allows the user to detect data loss: if the
   880  	// consumer consumed past the EndOffset that is returned, then the
   881  	// consumer should reset to the returned offset and the consumer knows
   882  	// that everything from the returned offset to the requested offset was
   883  	// lost.
   884  	EndOffset int64
   885  
   886  	// Err is non-nil if this partition had a response error.
   887  	Err error
   888  }
   889  
   890  // OffsetsForLeaderEpochs contains responses for partitions in a
   891  // OffsetForLeaderEpochRequest.
   892  type OffsetsForLeaderEpochs map[string]map[int32]OffsetForLeaderEpoch
   893  
   894  // OffsetForLeaderEpoch requests end offsets for the requested leader epoch in
   895  // partitions in the request. This is a relatively advanced and client internal
   896  // request, for more details, see the doc comments on the OffsetForLeaderEpoch
   897  // type.
   898  //
   899  // This may return *ShardErrors or *AuthError.
   900  func (cl *Client) OffetForLeaderEpoch(ctx context.Context, r OffsetForLeaderEpochRequest) (OffsetsForLeaderEpochs, error) {
   901  	req := kmsg.NewPtrOffsetForLeaderEpochRequest()
   902  	for t, ps := range r {
   903  		rt := kmsg.NewOffsetForLeaderEpochRequestTopic()
   904  		rt.Topic = t
   905  		for p, e := range ps {
   906  			rp := kmsg.NewOffsetForLeaderEpochRequestTopicPartition()
   907  			rp.Partition = p
   908  			rp.LeaderEpoch = e
   909  			rt.Partitions = append(rt.Partitions, rp)
   910  		}
   911  		req.Topics = append(req.Topics, rt)
   912  	}
   913  	shards := cl.cl.RequestSharded(ctx, req)
   914  	ls := make(OffsetsForLeaderEpochs)
   915  	return ls, shardErrEachBroker(req, shards, func(b BrokerDetail, kr kmsg.Response) error {
   916  		resp := kr.(*kmsg.OffsetForLeaderEpochResponse)
   917  		for _, rt := range resp.Topics {
   918  			lps, exists := ls[rt.Topic]
   919  			if !exists { // topic partitions could be spread around brokers, need to check existence
   920  				lps = make(map[int32]OffsetForLeaderEpoch)
   921  				ls[rt.Topic] = lps
   922  			}
   923  			for _, rp := range rt.Partitions {
   924  				if err := maybeAuthErr(rp.ErrorCode); err != nil {
   925  					return err
   926  				}
   927  				lps[rp.Partition] = OffsetForLeaderEpoch{ // one partition globally, no need to exist check
   928  					NodeID:      b.NodeID,
   929  					Topic:       rt.Topic,
   930  					Partition:   rp.Partition,
   931  					LeaderEpoch: rp.LeaderEpoch,
   932  					EndOffset:   rp.EndOffset,
   933  					Err:         kerr.ErrorForCode(rp.ErrorCode),
   934  				}
   935  			}
   936  		}
   937  		return nil
   938  	})
   939  }
   940  

View as plain text