1 package kadm
2
3 import (
4 "context"
5 "strconv"
6
7 "github.com/twmb/franz-go/pkg/kerr"
8 "github.com/twmb/franz-go/pkg/kmsg"
9 )
10
11
12 type ConfigSynonym struct {
13 Key string
14 Value *string
15 Source kmsg.ConfigSource
16 }
17
18
19 type Config struct {
20 Key string
21 Value *string
22 Sensitive bool
23 Source kmsg.ConfigSource
24
25
26
27
28
29
30 Synonyms []ConfigSynonym
31 }
32
33
34
35 func (c *Config) MaybeValue() string {
36 if c.Value != nil {
37 return *c.Value
38 }
39 return ""
40 }
41
42
43
44 type ResourceConfig struct {
45 Name string
46 Configs []Config
47 Err error
48 }
49
50
51 type ResourceConfigs []ResourceConfig
52
53
54
55
56
57
58
59
60 func (rs ResourceConfigs) On(name string, fn func(*ResourceConfig) error) (ResourceConfig, error) {
61 for _, r := range rs {
62 if r.Name == name {
63 if fn == nil {
64 return r, nil
65 }
66 return r, fn(&r)
67 }
68 }
69 return ResourceConfig{}, kerr.UnknownTopicOrPartition
70 }
71
72
73
74
75 func (cl *Client) DescribeTopicConfigs(
76 ctx context.Context,
77 topics ...string,
78 ) (ResourceConfigs, error) {
79 if len(topics) == 0 {
80 return nil, nil
81 }
82 return cl.describeConfigs(ctx, kmsg.ConfigResourceTypeTopic, topics)
83 }
84
85
86
87
88
89
90 func (cl *Client) DescribeBrokerConfigs(
91 ctx context.Context,
92 brokers ...int32,
93 ) (ResourceConfigs, error) {
94 var names []string
95 if len(brokers) == 0 {
96 names = append(names, "")
97 }
98 for _, b := range brokers {
99 names = append(names, strconv.Itoa(int(b)))
100 }
101 return cl.describeConfigs(ctx, kmsg.ConfigResourceTypeBroker, names)
102 }
103
104 func (cl *Client) describeConfigs(
105 ctx context.Context,
106 kind kmsg.ConfigResourceType,
107 names []string,
108 ) (ResourceConfigs, error) {
109 req := kmsg.NewPtrDescribeConfigsRequest()
110 req.IncludeSynonyms = true
111 for _, name := range names {
112 rr := kmsg.NewDescribeConfigsRequestResource()
113 rr.ResourceName = name
114 rr.ResourceType = kind
115 req.Resources = append(req.Resources, rr)
116 }
117 shards := cl.cl.RequestSharded(ctx, req)
118
119 var configs []ResourceConfig
120 return configs, shardErrEach(req, shards, func(kr kmsg.Response) error {
121 resp := kr.(*kmsg.DescribeConfigsResponse)
122 for _, r := range resp.Resources {
123 if err := maybeAuthErr(r.ErrorCode); err != nil {
124 return err
125 }
126 rc := ResourceConfig{
127 Name: r.ResourceName,
128 Err: kerr.ErrorForCode(r.ErrorCode),
129 }
130 for _, c := range r.Configs {
131 rcv := Config{
132 Key: c.Name,
133 Value: c.Value,
134 Sensitive: c.IsSensitive,
135 Source: c.Source,
136 }
137 for _, syn := range c.ConfigSynonyms {
138 rcv.Synonyms = append(rcv.Synonyms, ConfigSynonym{
139 Key: syn.Name,
140 Value: syn.Value,
141 Source: syn.Source,
142 })
143 }
144 rc.Configs = append(rc.Configs, rcv)
145 }
146 configs = append(configs, rc)
147 }
148 return nil
149 })
150 }
151
152
153
154 type IncrementalOp int8
155
156 const (
157
158
159 SetConfig IncrementalOp = iota
160
161
162
163 DeleteConfig
164
165
166
167 AppendConfig
168
169
170
171 SubtractConfig
172 )
173
174
175
176
177
178 type AlterConfig struct {
179 Op IncrementalOp
180 Name string
181 Value *string
182 }
183
184
185 type AlterConfigsResponse struct {
186 Name string
187 Err error
188 }
189
190
191 type AlterConfigsResponses []AlterConfigsResponse
192
193
194
195
196
197
198
199
200 func (rs AlterConfigsResponses) On(name string, fn func(*AlterConfigsResponse) error) (AlterConfigsResponse, error) {
201 for _, r := range rs {
202 if r.Name == name {
203 if fn == nil {
204 return r, nil
205 }
206 return r, fn(&r)
207 }
208 }
209 return AlterConfigsResponse{}, kerr.UnknownTopicOrPartition
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223
224 func (cl *Client) AlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
225 return cl.alterConfigs(ctx, false, configs, kmsg.ConfigResourceTypeTopic, topics)
226 }
227
228
229
230
231
232
233 func (cl *Client) ValidateAlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
234 return cl.alterConfigs(ctx, true, configs, kmsg.ConfigResourceTypeTopic, topics)
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 func (cl *Client) AlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
252 var names []string
253 if len(brokers) == 0 {
254 names = append(names, "")
255 }
256 for _, broker := range brokers {
257 names = append(names, strconv.Itoa(int(broker)))
258 }
259 return cl.alterConfigs(ctx, false, configs, kmsg.ConfigResourceTypeBroker, names)
260 }
261
262
263
264
265
266
267 func (cl *Client) ValidateAlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
268 var names []string
269 if len(brokers) == 0 {
270 names = append(names, "")
271 }
272 for _, broker := range brokers {
273 names = append(names, strconv.Itoa(int(broker)))
274 }
275 return cl.alterConfigs(ctx, true, configs, kmsg.ConfigResourceTypeBroker, names)
276 }
277
278 func (cl *Client) alterConfigs(
279 ctx context.Context,
280 dry bool,
281 configs []AlterConfig,
282 kind kmsg.ConfigResourceType,
283 names []string,
284 ) (AlterConfigsResponses, error) {
285 req := kmsg.NewPtrIncrementalAlterConfigsRequest()
286 req.ValidateOnly = dry
287 for _, name := range names {
288 rr := kmsg.NewIncrementalAlterConfigsRequestResource()
289 rr.ResourceType = kind
290 rr.ResourceName = name
291 for _, config := range configs {
292 rc := kmsg.NewIncrementalAlterConfigsRequestResourceConfig()
293 rc.Name = config.Name
294 rc.Value = config.Value
295 switch config.Op {
296 case SetConfig:
297 rc.Op = kmsg.IncrementalAlterConfigOpSet
298 case DeleteConfig:
299 rc.Op = kmsg.IncrementalAlterConfigOpDelete
300 case AppendConfig:
301 rc.Op = kmsg.IncrementalAlterConfigOpAppend
302 case SubtractConfig:
303 rc.Op = kmsg.IncrementalAlterConfigOpSubtract
304 }
305 rr.Configs = append(rr.Configs, rc)
306 }
307 req.Resources = append(req.Resources, rr)
308 }
309
310 shards := cl.cl.RequestSharded(ctx, req)
311
312 var rs []AlterConfigsResponse
313 return rs, shardErrEach(req, shards, func(kr kmsg.Response) error {
314 resp := kr.(*kmsg.IncrementalAlterConfigsResponse)
315 for _, r := range resp.Resources {
316 rs = append(rs, AlterConfigsResponse{
317 Name: r.ResourceName,
318 Err: kerr.ErrorForCode(r.ErrorCode),
319 })
320 }
321 return nil
322 })
323 }
324
325
326
327
328
329
330 func (cl *Client) AlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
331 return cl.alterConfigsState(ctx, false, configs, kmsg.ConfigResourceTypeTopic, topics)
332 }
333
334
335
336
337
338
339 func (cl *Client) ValidateAlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
340 return cl.alterConfigsState(ctx, true, configs, kmsg.ConfigResourceTypeTopic, topics)
341 }
342
343
344
345
346
347
348
349
350 func (cl *Client) AlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
351 var names []string
352 if len(brokers) == 0 {
353 names = append(names, "")
354 }
355 for _, broker := range brokers {
356 names = append(names, strconv.Itoa(int(broker)))
357 }
358 return cl.alterConfigsState(ctx, false, configs, kmsg.ConfigResourceTypeBroker, names)
359 }
360
361
362
363
364
365
366 func (cl *Client) ValidateAlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
367 var names []string
368 if len(brokers) == 0 {
369 names = append(names, "")
370 }
371 for _, broker := range brokers {
372 names = append(names, strconv.Itoa(int(broker)))
373 }
374 return cl.alterConfigsState(ctx, true, configs, kmsg.ConfigResourceTypeBroker, names)
375 }
376
377 func (cl *Client) alterConfigsState(
378 ctx context.Context,
379 dry bool,
380 configs []AlterConfig,
381 kind kmsg.ConfigResourceType,
382 names []string,
383 ) (AlterConfigsResponses, error) {
384 req := kmsg.NewPtrAlterConfigsRequest()
385 req.ValidateOnly = dry
386 for _, name := range names {
387 rr := kmsg.NewAlterConfigsRequestResource()
388 rr.ResourceType = kind
389 rr.ResourceName = name
390 for _, config := range configs {
391 rc := kmsg.NewAlterConfigsRequestResourceConfig()
392 rc.Name = config.Name
393 rc.Value = config.Value
394 rr.Configs = append(rr.Configs, rc)
395 }
396 req.Resources = append(req.Resources, rr)
397 }
398
399 shards := cl.cl.RequestSharded(ctx, req)
400
401 var rs []AlterConfigsResponse
402 return rs, shardErrEach(req, shards, func(kr kmsg.Response) error {
403 resp := kr.(*kmsg.AlterConfigsResponse)
404 for _, r := range resp.Resources {
405 rs = append(rs, AlterConfigsResponse{
406 Name: r.ResourceName,
407 Err: kerr.ErrorForCode(r.ErrorCode),
408 })
409 }
410 return nil
411 })
412 }
413
View as plain text