...

Source file src/github.com/twmb/franz-go/pkg/kadm/configs.go

Documentation: github.com/twmb/franz-go/pkg/kadm

     1  package kadm
     2  
     3  import (
     4  	"context"
     5  	"strconv"
     6  
     7  	"github.com/twmb/franz-go/pkg/kerr"
     8  	"github.com/twmb/franz-go/pkg/kmsg"
     9  )
    10  
    11  // ConfigSynonym is a fallback value for a config.
    12  type ConfigSynonym struct {
    13  	Key    string            // Key is the fallback config name.
    14  	Value  *string           // Value is the fallback config value, if any (sensitive is elided).
    15  	Source kmsg.ConfigSource // Source is where this config synonym is defined from.
    16  }
    17  
    18  // Config is a configuration for a resource (topic, broker)
    19  type Config struct {
    20  	Key       string            // Key is the config name.
    21  	Value     *string           // Value is the config value, if any.
    22  	Sensitive bool              // Sensitive is if this config is sensitive (if so, Value is nil).
    23  	Source    kmsg.ConfigSource // Source is where this config is defined from.
    24  
    25  	// Synonyms contains fallback key/value pairs for this same
    26  	// configuration key in order or preference. That is, if a config entry
    27  	// is both dynamically defined and has a default value as well, the top
    28  	// level config will be the dynamic value, while the synonym will be
    29  	// the default.
    30  	Synonyms []ConfigSynonym
    31  }
    32  
    33  // MaybeValue returns the config's value if it is non-nil, otherwise an empty
    34  // string.
    35  func (c *Config) MaybeValue() string {
    36  	if c.Value != nil {
    37  		return *c.Value
    38  	}
    39  	return ""
    40  }
    41  
    42  // ResourceConfig contains the configuration values for a resource (topic,
    43  // broker, broker logger).
    44  type ResourceConfig struct {
    45  	Name    string   // Name is the name of this resource.
    46  	Configs []Config // Configs are the configs for this topic.
    47  	Err     error    // Err is any error preventing configs from loading (likely, an unknown topic).
    48  }
    49  
    50  // ResourceConfigs contains the configuration values for many resources.
    51  type ResourceConfigs []ResourceConfig
    52  
    53  // On calls fn for the response config if it exists, returning the config and
    54  // the error returned from fn. If fn is nil, this simply returns the config.
    55  //
    56  // The fn is given a copy of the config. This function returns the copy as
    57  // well; any modifications within fn are modifications on the returned copy.
    58  //
    59  // If the resource does not exist, this returns kerr.UnknownTopicOrPartition.
    60  func (rs ResourceConfigs) On(name string, fn func(*ResourceConfig) error) (ResourceConfig, error) {
    61  	for _, r := range rs {
    62  		if r.Name == name {
    63  			if fn == nil {
    64  				return r, nil
    65  			}
    66  			return r, fn(&r)
    67  		}
    68  	}
    69  	return ResourceConfig{}, kerr.UnknownTopicOrPartition
    70  }
    71  
    72  // DescribeTopicConfigs returns the configuration for the requested topics.
    73  //
    74  // This may return *ShardErrors.
    75  func (cl *Client) DescribeTopicConfigs(
    76  	ctx context.Context,
    77  	topics ...string,
    78  ) (ResourceConfigs, error) {
    79  	if len(topics) == 0 {
    80  		return nil, nil
    81  	}
    82  	return cl.describeConfigs(ctx, kmsg.ConfigResourceTypeTopic, topics)
    83  }
    84  
    85  // DescribeBrokerConfigs returns configuration for the requested brokers. If no
    86  // brokers are requested, a single request is issued and any broker in the
    87  // cluster replies with the cluster-level dynamic config values.
    88  //
    89  // This may return *ShardErrors.
    90  func (cl *Client) DescribeBrokerConfigs(
    91  	ctx context.Context,
    92  	brokers ...int32,
    93  ) (ResourceConfigs, error) {
    94  	var names []string
    95  	if len(brokers) == 0 {
    96  		names = append(names, "")
    97  	}
    98  	for _, b := range brokers {
    99  		names = append(names, strconv.Itoa(int(b)))
   100  	}
   101  	return cl.describeConfigs(ctx, kmsg.ConfigResourceTypeBroker, names)
   102  }
   103  
   104  func (cl *Client) describeConfigs(
   105  	ctx context.Context,
   106  	kind kmsg.ConfigResourceType,
   107  	names []string,
   108  ) (ResourceConfigs, error) {
   109  	req := kmsg.NewPtrDescribeConfigsRequest()
   110  	req.IncludeSynonyms = true
   111  	for _, name := range names {
   112  		rr := kmsg.NewDescribeConfigsRequestResource()
   113  		rr.ResourceName = name
   114  		rr.ResourceType = kind
   115  		req.Resources = append(req.Resources, rr)
   116  	}
   117  	shards := cl.cl.RequestSharded(ctx, req)
   118  
   119  	var configs []ResourceConfig
   120  	return configs, shardErrEach(req, shards, func(kr kmsg.Response) error {
   121  		resp := kr.(*kmsg.DescribeConfigsResponse)
   122  		for _, r := range resp.Resources {
   123  			if err := maybeAuthErr(r.ErrorCode); err != nil {
   124  				return err
   125  			}
   126  			rc := ResourceConfig{
   127  				Name: r.ResourceName,
   128  				Err:  kerr.ErrorForCode(r.ErrorCode),
   129  			}
   130  			for _, c := range r.Configs {
   131  				rcv := Config{
   132  					Key:       c.Name,
   133  					Value:     c.Value,
   134  					Sensitive: c.IsSensitive,
   135  					Source:    c.Source,
   136  				}
   137  				for _, syn := range c.ConfigSynonyms {
   138  					rcv.Synonyms = append(rcv.Synonyms, ConfigSynonym{
   139  						Key:    syn.Name,
   140  						Value:  syn.Value,
   141  						Source: syn.Source,
   142  					})
   143  				}
   144  				rc.Configs = append(rc.Configs, rcv)
   145  			}
   146  			configs = append(configs, rc) // we are not storing in a map, no existence-check possible
   147  		}
   148  		return nil
   149  	})
   150  }
   151  
   152  // IncrementalOp is a typed int8 that is used for incrementally updating
   153  // configuration keys for topics and brokers.
   154  type IncrementalOp int8
   155  
   156  const (
   157  	// SetConfig is an incremental operation to set an individual config
   158  	// key.
   159  	SetConfig IncrementalOp = iota
   160  
   161  	// DeleteConfig is an incremental operation to delete an individual
   162  	// config key.
   163  	DeleteConfig
   164  
   165  	// AppendConfig is an incremental operation to append a value to a
   166  	// config key that is a list type.
   167  	AppendConfig
   168  
   169  	// SubtractConfig is an incremental operation to remove a value from a
   170  	// config key that is a list type.
   171  	SubtractConfig
   172  )
   173  
   174  // AlterConfig is an individual key/value operation to perform when altering
   175  // configs.
   176  //
   177  // This package includes a StringPtr function to aid in building config values.
   178  type AlterConfig struct {
   179  	Op    IncrementalOp // Op is the incremental alter operation to perform. This is ignored for State alter functions.
   180  	Name  string        // Name is the name of the config to alter.
   181  	Value *string       // Value is the value to use when altering, if any.
   182  }
   183  
   184  // AlteredConfigsResponse contains the response for an individual alteration.
   185  type AlterConfigsResponse struct {
   186  	Name string // Name is the name of this resource (topic name or broker number).
   187  	Err  error  // Err is non-nil if the config could not be altered.
   188  }
   189  
   190  // AlterConfigsResponses contains responses for many alterations.
   191  type AlterConfigsResponses []AlterConfigsResponse
   192  
   193  // On calls fn for the response name if it exists, returning the response and
   194  // the error returned from fn. If fn is nil, this simply returns the response.
   195  //
   196  // The fn is given a copy of the response. This function returns the copy as
   197  // well; any modifications within fn are modifications on the returned copy.
   198  //
   199  // If the resource does not exist, this returns kerr.UnknownTopicOrPartition.
   200  func (rs AlterConfigsResponses) On(name string, fn func(*AlterConfigsResponse) error) (AlterConfigsResponse, error) {
   201  	for _, r := range rs {
   202  		if r.Name == name {
   203  			if fn == nil {
   204  				return r, nil
   205  			}
   206  			return r, fn(&r)
   207  		}
   208  	}
   209  	return AlterConfigsResponse{}, kerr.UnknownTopicOrPartition
   210  }
   211  
   212  // AlterTopicConfigs incrementally alters topic configuration values.
   213  //
   214  // This method requires talking to a cluster that supports
   215  // IncrementalAlterConfigs (officially introduced in Kafka v2.3, but many
   216  // broker reimplementations support this request even if they do not support
   217  // all other requests from Kafka v2.3).
   218  //
   219  // If you want to alter the entire configs state using the older AlterConfigs
   220  // request, use AlterTopicConfigsState.
   221  //
   222  // This may return *ShardErrors. You may consider checking
   223  // ValidateAlterTopicConfigs before using this method.
   224  func (cl *Client) AlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
   225  	return cl.alterConfigs(ctx, false, configs, kmsg.ConfigResourceTypeTopic, topics)
   226  }
   227  
   228  // ValidateAlterTopicConfigs validates an incremental alter config for the given
   229  // topics.
   230  //
   231  // This returns exactly what AlterTopicConfigs returns, but does not actually
   232  // alter configurations.
   233  func (cl *Client) ValidateAlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
   234  	return cl.alterConfigs(ctx, true, configs, kmsg.ConfigResourceTypeTopic, topics)
   235  }
   236  
   237  // AlterBrokerConfigs incrementally alters broker configuration values. If
   238  // brokers are specified, this updates each specific broker. If no brokers are
   239  // specified, this updates whole-cluster broker configuration values.
   240  //
   241  // This method requires talking to a cluster that supports
   242  // IncrementalAlterConfigs (officially introduced in Kafka v2.3, but many
   243  // broker reimplementations support this request even if they do not support
   244  // all other requests from Kafka v2.3).
   245  //
   246  // If you want to alter the entire configs state using the older AlterConfigs
   247  // request, use AlterBrokerConfigsState.
   248  //
   249  // This may return *ShardErrors. You may consider checking
   250  // ValidateAlterBrokerConfigs before using this method.
   251  func (cl *Client) AlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
   252  	var names []string
   253  	if len(brokers) == 0 {
   254  		names = append(names, "")
   255  	}
   256  	for _, broker := range brokers {
   257  		names = append(names, strconv.Itoa(int(broker)))
   258  	}
   259  	return cl.alterConfigs(ctx, false, configs, kmsg.ConfigResourceTypeBroker, names)
   260  }
   261  
   262  // ValidateAlterBrokerConfigs validates an incremental alter config for the given
   263  // brokers.
   264  //
   265  // This returns exactly what AlterBrokerConfigs returns, but does not actually
   266  // alter configurations.
   267  func (cl *Client) ValidateAlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
   268  	var names []string
   269  	if len(brokers) == 0 {
   270  		names = append(names, "")
   271  	}
   272  	for _, broker := range brokers {
   273  		names = append(names, strconv.Itoa(int(broker)))
   274  	}
   275  	return cl.alterConfigs(ctx, true, configs, kmsg.ConfigResourceTypeBroker, names)
   276  }
   277  
   278  func (cl *Client) alterConfigs(
   279  	ctx context.Context,
   280  	dry bool,
   281  	configs []AlterConfig,
   282  	kind kmsg.ConfigResourceType,
   283  	names []string,
   284  ) (AlterConfigsResponses, error) {
   285  	req := kmsg.NewPtrIncrementalAlterConfigsRequest()
   286  	req.ValidateOnly = dry
   287  	for _, name := range names {
   288  		rr := kmsg.NewIncrementalAlterConfigsRequestResource()
   289  		rr.ResourceType = kind
   290  		rr.ResourceName = name
   291  		for _, config := range configs {
   292  			rc := kmsg.NewIncrementalAlterConfigsRequestResourceConfig()
   293  			rc.Name = config.Name
   294  			rc.Value = config.Value
   295  			switch config.Op {
   296  			case SetConfig:
   297  				rc.Op = kmsg.IncrementalAlterConfigOpSet
   298  			case DeleteConfig:
   299  				rc.Op = kmsg.IncrementalAlterConfigOpDelete
   300  			case AppendConfig:
   301  				rc.Op = kmsg.IncrementalAlterConfigOpAppend
   302  			case SubtractConfig:
   303  				rc.Op = kmsg.IncrementalAlterConfigOpSubtract
   304  			}
   305  			rr.Configs = append(rr.Configs, rc)
   306  		}
   307  		req.Resources = append(req.Resources, rr)
   308  	}
   309  
   310  	shards := cl.cl.RequestSharded(ctx, req)
   311  
   312  	var rs []AlterConfigsResponse
   313  	return rs, shardErrEach(req, shards, func(kr kmsg.Response) error {
   314  		resp := kr.(*kmsg.IncrementalAlterConfigsResponse)
   315  		for _, r := range resp.Resources {
   316  			rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible
   317  				Name: r.ResourceName,
   318  				Err:  kerr.ErrorForCode(r.ErrorCode),
   319  			})
   320  		}
   321  		return nil
   322  	})
   323  }
   324  
   325  // AlterTopicConfigsState alters the full state of topic configurations.
   326  // All prior configuration is lost.
   327  //
   328  // This may return *ShardErrors. You may consider checking
   329  // ValidateAlterTopicConfigs before using this method.
   330  func (cl *Client) AlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
   331  	return cl.alterConfigsState(ctx, false, configs, kmsg.ConfigResourceTypeTopic, topics)
   332  }
   333  
   334  // ValidateAlterTopicConfigs validates an AlterTopicConfigsState for the given
   335  // topics.
   336  //
   337  // This returns exactly what AlterTopicConfigsState returns, but does not
   338  // actually alter configurations.
   339  func (cl *Client) ValidateAlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
   340  	return cl.alterConfigsState(ctx, true, configs, kmsg.ConfigResourceTypeTopic, topics)
   341  }
   342  
   343  // AlterBrokerConfigs alters the full state of broker configurations. If
   344  // broker are specified, this updates each specific broker. If no brokers are
   345  // specified, this updates whole-cluster broker configuration values.
   346  // All prior configuration is lost.
   347  //
   348  // This may return *ShardErrors. You may consider checking
   349  // ValidateAlterBrokerConfigs before using this method.
   350  func (cl *Client) AlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
   351  	var names []string
   352  	if len(brokers) == 0 {
   353  		names = append(names, "")
   354  	}
   355  	for _, broker := range brokers {
   356  		names = append(names, strconv.Itoa(int(broker)))
   357  	}
   358  	return cl.alterConfigsState(ctx, false, configs, kmsg.ConfigResourceTypeBroker, names)
   359  }
   360  
   361  // ValidateAlterBrokerConfigs validates an AlterBrokerconfigsState for the
   362  // given brokers.
   363  //
   364  // This returns exactly what AlterBrokerConfigs returns, but does not actually
   365  // alter configurations.
   366  func (cl *Client) ValidateAlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
   367  	var names []string
   368  	if len(brokers) == 0 {
   369  		names = append(names, "")
   370  	}
   371  	for _, broker := range brokers {
   372  		names = append(names, strconv.Itoa(int(broker)))
   373  	}
   374  	return cl.alterConfigsState(ctx, true, configs, kmsg.ConfigResourceTypeBroker, names)
   375  }
   376  
   377  func (cl *Client) alterConfigsState(
   378  	ctx context.Context,
   379  	dry bool,
   380  	configs []AlterConfig,
   381  	kind kmsg.ConfigResourceType,
   382  	names []string,
   383  ) (AlterConfigsResponses, error) {
   384  	req := kmsg.NewPtrAlterConfigsRequest()
   385  	req.ValidateOnly = dry
   386  	for _, name := range names {
   387  		rr := kmsg.NewAlterConfigsRequestResource()
   388  		rr.ResourceType = kind
   389  		rr.ResourceName = name
   390  		for _, config := range configs {
   391  			rc := kmsg.NewAlterConfigsRequestResourceConfig()
   392  			rc.Name = config.Name
   393  			rc.Value = config.Value
   394  			rr.Configs = append(rr.Configs, rc)
   395  		}
   396  		req.Resources = append(req.Resources, rr)
   397  	}
   398  
   399  	shards := cl.cl.RequestSharded(ctx, req)
   400  
   401  	var rs []AlterConfigsResponse
   402  	return rs, shardErrEach(req, shards, func(kr kmsg.Response) error {
   403  		resp := kr.(*kmsg.AlterConfigsResponse)
   404  		for _, r := range resp.Resources {
   405  			rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible
   406  				Name: r.ResourceName,
   407  				Err:  kerr.ErrorForCode(r.ErrorCode),
   408  			})
   409  		}
   410  		return nil
   411  	})
   412  }
   413  

View as plain text