1 package kadm
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8
9 "github.com/twmb/franz-go/pkg/kerr"
10 "github.com/twmb/franz-go/pkg/kmsg"
11 )
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 type ACLBuilder struct {
48 any []string
49 anyResource bool
50 topics []string
51 anyTopic bool
52 groups []string
53 anyGroup bool
54 anyCluster bool
55 txnIDs []string
56 anyTxn bool
57 tokens []string
58 anyToken bool
59
60 allow []string
61 anyAllow bool
62 allowHosts []string
63 anyAllowHosts bool
64 deny []string
65 anyDeny bool
66 denyHosts []string
67 anyDenyHosts bool
68
69 ops []ACLOperation
70
71 pattern ACLPattern
72 }
73
74
75 func (b *ACLBuilder) PrefixUser() {
76 b.PrefixUserExcept()
77 }
78
79
80
81 func (b *ACLBuilder) PrefixUserExcept(except ...string) {
82 replace := func(u string) string {
83 if !strings.HasPrefix(u, "User:") {
84 for _, e := range except {
85 if strings.HasPrefix(u, e) {
86 return u
87 }
88 }
89 return "User:" + u
90 }
91 return u
92 }
93
94 for i, u := range b.allow {
95 b.allow[i] = replace(u)
96 }
97 for i, u := range b.deny {
98 b.deny[i] = replace(u)
99 }
100 }
101
102
103 func NewACLs() *ACLBuilder {
104 return new(ACLBuilder)
105 }
106
107
108
109
110
111
112
113 func (b *ACLBuilder) AnyResource(name ...string) *ACLBuilder {
114 b.any = name
115 if len(name) == 0 {
116 b.anyResource = true
117 }
118 return b
119 }
120
121
122
123
124
125
126
127
128
129 func (b *ACLBuilder) Topics(t ...string) *ACLBuilder {
130 b.topics = t
131 if len(t) == 0 {
132 b.anyTopic = true
133 }
134 return b
135 }
136
137
138
139 func (b *ACLBuilder) MaybeTopics(t ...string) *ACLBuilder { b.topics = t; return b }
140
141
142
143
144
145
146
147
148
149 func (b *ACLBuilder) Groups(g ...string) *ACLBuilder {
150 b.groups = g
151 if len(g) == 0 {
152 b.anyGroup = true
153 }
154 return b
155 }
156
157
158
159 func (b *ACLBuilder) MaybeGroups(g ...string) *ACLBuilder { b.groups = g; return b }
160
161
162
163
164
165
166
167
168 func (b *ACLBuilder) Clusters() *ACLBuilder {
169 b.anyCluster = true
170 return b
171 }
172
173
174
175 func (b *ACLBuilder) MaybeClusters(c bool) *ACLBuilder { b.anyCluster = c; return b }
176
177
178
179
180
181
182
183
184
185 func (b *ACLBuilder) TransactionalIDs(x ...string) *ACLBuilder {
186 b.txnIDs = x
187 if len(x) == 0 {
188 b.anyTxn = true
189 }
190 return b
191 }
192
193
194
195 func (b *ACLBuilder) MaybeTransactionalIDs(x ...string) *ACLBuilder { b.txnIDs = x; return b }
196
197
198
199
200
201
202
203
204
205 func (b *ACLBuilder) DelegationTokens(t ...string) *ACLBuilder {
206 b.tokens = t
207 if len(t) == 0 {
208 b.anyToken = true
209 }
210 return b
211 }
212
213
214
215 func (b *ACLBuilder) MaybeDelegationTokens(t ...string) *ACLBuilder { b.tokens = t; return b }
216
217
218
219
220
221
222
223
224
225
226 func (b *ACLBuilder) Allow(principals ...string) *ACLBuilder {
227 b.allow = principals
228 if len(principals) == 0 {
229 b.anyAllow = true
230 }
231 return b
232 }
233
234
235
236 func (b *ACLBuilder) MaybeAllow(principals ...string) *ACLBuilder { b.allow = principals; return b }
237
238
239
240
241
242
243
244
245
246
247 func (b *ACLBuilder) AllowHosts(hosts ...string) *ACLBuilder {
248 b.allowHosts = hosts
249 if len(hosts) == 0 {
250 b.anyAllowHosts = true
251 }
252 return b
253 }
254
255
256
257 func (b *ACLBuilder) MaybeAllowHosts(hosts ...string) *ACLBuilder { b.allowHosts = hosts; return b }
258
259
260
261
262
263
264
265
266
267
268 func (b *ACLBuilder) Deny(principals ...string) *ACLBuilder {
269 b.deny = principals
270 if len(principals) == 0 {
271 b.anyDeny = true
272 }
273 return b
274 }
275
276
277
278 func (b *ACLBuilder) MaybeDeny(principals ...string) *ACLBuilder { b.deny = principals; return b }
279
280
281
282
283
284
285
286
287
288
289 func (b *ACLBuilder) DenyHosts(hosts ...string) *ACLBuilder {
290 b.denyHosts = hosts
291 if len(hosts) == 0 {
292 b.anyDenyHosts = true
293 }
294 return b
295 }
296
297
298
299 func (b *ACLBuilder) MaybeDenyHosts(hosts ...string) *ACLBuilder { b.denyHosts = hosts; return b }
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417 type ACLOperation = kmsg.ACLOperation
418
419 const (
420
421 OpUnknown ACLOperation = kmsg.ACLOperationUnknown
422
423
424 OpAny ACLOperation = kmsg.ACLOperationAny
425
426
427 OpAll ACLOperation = kmsg.ACLOperationAll
428
429
430 OpRead ACLOperation = kmsg.ACLOperationRead
431
432
433 OpWrite ACLOperation = kmsg.ACLOperationWrite
434
435
436 OpCreate ACLOperation = kmsg.ACLOperationCreate
437
438
439 OpDelete ACLOperation = kmsg.ACLOperationDelete
440
441
442 OpAlter ACLOperation = kmsg.ACLOperationAlter
443
444
445 OpDescribe ACLOperation = kmsg.ACLOperationDescribe
446
447
448
449
450 OpClusterAction ACLOperation = kmsg.ACLOperationClusterAction
451
452
453 OpDescribeConfigs ACLOperation = kmsg.ACLOperationDescribeConfigs
454
455
456 OpAlterConfigs ACLOperation = kmsg.ACLOperationAlterConfigs
457
458
459
460
461 OpIdempotentWrite ACLOperation = kmsg.ACLOperationIdempotentWrite
462 )
463
464
465
466
467
468
469
470
471 func (b *ACLBuilder) Operations(operations ...ACLOperation) *ACLBuilder {
472 b.ops = operations
473 if len(operations) == 0 {
474 b.ops = []ACLOperation{OpAny}
475 }
476 return b
477 }
478
479
480
481 func (b *ACLBuilder) MaybeOperations(operations ...ACLOperation) *ACLBuilder {
482 if len(operations) > 0 {
483 b.Operations(operations...)
484 }
485 return b
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 type ACLPattern = kmsg.ACLResourcePatternType
508
509 const (
510
511 ACLPatternUnknown ACLPattern = kmsg.ACLResourcePatternTypeUnknown
512
513
514 ACLPatternAny ACLPattern = kmsg.ACLResourcePatternTypeAny
515
516
517 ACLPatternMatch ACLPattern = kmsg.ACLResourcePatternTypeMatch
518
519
520 ACLPatternLiteral ACLPattern = kmsg.ACLResourcePatternTypeLiteral
521
522
523 ACLPatternPrefixed ACLPattern = kmsg.ACLResourcePatternTypePrefixed
524 )
525
526
527
528
529
530
531
532 func (b *ACLBuilder) ResourcePatternType(pattern ACLPattern) *ACLBuilder {
533 b.pattern = pattern
534 return b
535 }
536
537
538 func (b *ACLBuilder) ValidateCreate() error {
539 for _, op := range b.ops {
540 switch op {
541 case OpAny, OpUnknown:
542 return fmt.Errorf("invalid operation %s for creating ACLs", op)
543 }
544 }
545
546 switch b.pattern {
547 case ACLPatternLiteral, ACLPatternPrefixed:
548 default:
549 return fmt.Errorf("invalid acl resource pattern %s for creating ACLs", b.pattern)
550 }
551
552 if len(b.allowHosts) != 0 && len(b.allow) == 0 {
553 return fmt.Errorf("invalid allow hosts with no allow principals")
554 }
555 if len(b.denyHosts) != 0 && len(b.deny) == 0 {
556 return fmt.Errorf("invalid deny hosts with no deny principals")
557 }
558 return nil
559 }
560
561
562 func (b *ACLBuilder) ValidateDelete() error { return b.ValidateFilter() }
563
564
565 func (b *ACLBuilder) ValidateDescribe() error { return b.ValidateFilter() }
566
567
568
569 func (b *ACLBuilder) ValidateFilter() error {
570 if len(b.allowHosts) != 0 && len(b.allow) == 0 && !b.anyAllow {
571 return fmt.Errorf("invalid allow hosts with no allow principals")
572 }
573 if len(b.allow) != 0 && len(b.allowHosts) == 0 && !b.anyAllowHosts {
574 return fmt.Errorf("invalid allow principals with no allow hosts")
575 }
576 if len(b.denyHosts) != 0 && len(b.deny) == 0 && !b.anyDeny {
577 return fmt.Errorf("invalid deny hosts with no deny principals")
578 }
579 if len(b.deny) != 0 && len(b.denyHosts) == 0 && !b.anyDenyHosts {
580 return fmt.Errorf("invalid deny principals with no deny hosts")
581 }
582 return nil
583 }
584
585
586
587
588
589
590
591
592 func (b *ACLBuilder) HasAnyFilter() bool {
593 return b.anyResource ||
594 b.anyTopic ||
595 b.anyGroup ||
596 b.anyTxn ||
597 b.anyToken ||
598 b.anyAllow ||
599 b.anyAllowHosts ||
600 b.anyDeny ||
601 b.anyDenyHosts ||
602 b.hasOpAny() ||
603 b.pattern == ACLPatternAny
604 }
605
606 func (b *ACLBuilder) hasOpAny() bool {
607 for _, op := range b.ops {
608 if op == OpAny {
609 return true
610 }
611 }
612 return false
613 }
614
615
616
617 func (b *ACLBuilder) HasResource() bool {
618 l := len(b.any) +
619 len(b.topics) +
620 len(b.groups) +
621 len(b.txnIDs) +
622 len(b.tokens)
623 return l > 0 ||
624 b.anyResource ||
625 b.anyTopic ||
626 b.anyGroup ||
627 b.anyCluster ||
628 b.anyTxn ||
629 b.anyToken
630 }
631
632
633
634 func (b *ACLBuilder) HasPrincipals() bool {
635 return len(b.allow) > 0 ||
636 b.anyAllow ||
637 len(b.deny) > 0 ||
638 b.anyDeny
639 }
640
641
642
643 func (b *ACLBuilder) HasHosts() bool {
644 return len(b.allowHosts) > 0 ||
645 b.anyAllowHosts ||
646 len(b.denyHosts) > 0 ||
647 b.anyDenyHosts
648 }
649
650 func (b *ACLBuilder) dup() *ACLBuilder {
651 d := *b
652 return &d
653 }
654
655
656 type CreateACLsResult struct {
657 Principal string
658 Host string
659
660 Type kmsg.ACLResourceType
661 Name string
662 Pattern ACLPattern
663 Operation ACLOperation
664 Permission kmsg.ACLPermissionType
665
666 Err error
667 }
668
669
670 type CreateACLsResults []CreateACLsResult
671
672
673
674
675
676
677
678 func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResults, error) {
679 if err := b.ValidateCreate(); err != nil {
680 return nil, err
681 }
682 if len(b.allow) != 0 && len(b.allowHosts) == 0 {
683 b.allowHosts = []string{"*"}
684 }
685 if len(b.deny) != 0 && len(b.denyHosts) == 0 {
686 b.denyHosts = []string{"*"}
687 }
688
689 var clusters []string
690 if b.anyCluster {
691 clusters = []string{"kafka-cluster"}
692 }
693
694 req := kmsg.NewPtrCreateACLsRequest()
695 for _, typeNames := range []struct {
696 t kmsg.ACLResourceType
697 names []string
698 }{
699 {kmsg.ACLResourceTypeTopic, b.topics},
700 {kmsg.ACLResourceTypeGroup, b.groups},
701 {kmsg.ACLResourceTypeCluster, clusters},
702 {kmsg.ACLResourceTypeTransactionalId, b.txnIDs},
703 {kmsg.ACLResourceTypeDelegationToken, b.tokens},
704 } {
705 for _, name := range typeNames.names {
706 for _, op := range b.ops {
707 for _, perm := range []struct {
708 principals []string
709 hosts []string
710 permType kmsg.ACLPermissionType
711 }{
712 {b.allow, b.allowHosts, kmsg.ACLPermissionTypeAllow},
713 {b.deny, b.denyHosts, kmsg.ACLPermissionTypeDeny},
714 } {
715 for _, principal := range perm.principals {
716 for _, host := range perm.hosts {
717 c := kmsg.NewCreateACLsRequestCreation()
718 c.ResourceType = typeNames.t
719 c.ResourceName = name
720 c.ResourcePatternType = b.pattern
721 c.Operation = op
722 c.Principal = principal
723 c.Host = host
724 c.PermissionType = perm.permType
725 req.Creations = append(req.Creations, c)
726 }
727 }
728 }
729 }
730 }
731 }
732
733 resp, err := req.RequestWith(ctx, cl.cl)
734 if err != nil {
735 return nil, err
736 }
737
738 if len(resp.Results) != len(req.Creations) {
739 return nil, fmt.Errorf("received %d results to %d creations", len(resp.Results), len(req.Creations))
740 }
741
742 var rs CreateACLsResults
743 for i, r := range resp.Results {
744 c := &req.Creations[i]
745 rs = append(rs, CreateACLsResult{
746 Principal: c.Principal,
747 Host: c.Host,
748
749 Type: c.ResourceType,
750 Name: c.ResourceName,
751 Pattern: c.ResourcePatternType,
752 Operation: c.Operation,
753 Permission: c.PermissionType,
754
755 Err: kerr.ErrorForCode(r.ErrorCode),
756 })
757 }
758
759 return rs, nil
760 }
761
762
763 type DeletedACL struct {
764 Principal string
765 Host string
766
767 Type kmsg.ACLResourceType
768 Name string
769 Pattern ACLPattern
770 Operation ACLOperation
771 Permission kmsg.ACLPermissionType
772
773 Err error
774 }
775
776
777 type DeletedACLs []DeletedACL
778
779
780
781
782
783
784
785 type DeleteACLsResult struct {
786 Principal *string
787 Host *string
788
789 Type kmsg.ACLResourceType
790 Name *string
791 Pattern ACLPattern
792 Operation ACLOperation
793 Permission kmsg.ACLPermissionType
794
795 Deleted DeletedACLs
796
797 Err error
798 }
799
800
801 type DeleteACLsResults []DeleteACLsResult
802
803
804
805
806
807
808
809
810
811
812
813
814 func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResults, error) {
815 dels, _, err := createDelDescACL(b)
816 if err != nil {
817 return nil, err
818 }
819
820 req := kmsg.NewPtrDeleteACLsRequest()
821 req.Filters = dels
822 resp, err := req.RequestWith(ctx, cl.cl)
823 if err != nil {
824 return nil, err
825 }
826 if len(resp.Results) != len(req.Filters) {
827 return nil, fmt.Errorf("received %d results to %d filters", len(resp.Results), len(req.Filters))
828 }
829
830 var rs DeleteACLsResults
831 for i, r := range resp.Results {
832 f := &req.Filters[i]
833 var ms DeletedACLs
834 for _, m := range r.MatchingACLs {
835 ms = append(ms, DeletedACL{
836 Principal: m.Principal,
837 Host: m.Host,
838 Type: m.ResourceType,
839 Name: m.ResourceName,
840 Pattern: m.ResourcePatternType,
841 Operation: m.Operation,
842 Permission: m.PermissionType,
843 Err: kerr.ErrorForCode(m.ErrorCode),
844 })
845 }
846 rs = append(rs, DeleteACLsResult{
847 Principal: f.Principal,
848 Host: f.Host,
849 Type: f.ResourceType,
850 Name: f.ResourceName,
851 Pattern: f.ResourcePatternType,
852 Operation: f.Operation,
853 Permission: f.PermissionType,
854 Deleted: ms,
855 Err: kerr.ErrorForCode(r.ErrorCode),
856 })
857 }
858 return rs, nil
859 }
860
861
862 type DescribedACL struct {
863 Principal string
864 Host string
865
866 Type kmsg.ACLResourceType
867 Name string
868 Pattern ACLPattern
869 Operation ACLOperation
870 Permission kmsg.ACLPermissionType
871 }
872
873
874
875 type DescribedACLs []DescribedACL
876
877
878
879
880
881
882
883 type DescribeACLsResult struct {
884 Principal *string
885 Host *string
886
887 Type kmsg.ACLResourceType
888 Name *string
889 Pattern ACLPattern
890 Operation ACLOperation
891 Permission kmsg.ACLPermissionType
892
893 Described DescribedACLs
894
895 Err error
896 }
897
898
899 type DescribeACLsResults []DescribeACLsResult
900
901
902
903
904
905
906
907
908
909
910
911
912 func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLsResults, error) {
913 _, descs, err := createDelDescACL(b)
914 if err != nil {
915 return nil, err
916 }
917
918 var (
919 ictx, cancel = context.WithCancel(ctx)
920 mu sync.Mutex
921 wg sync.WaitGroup
922 firstErr error
923 resps = make([]*kmsg.DescribeACLsResponse, len(descs))
924 )
925 defer cancel()
926 for i := range descs {
927 req := descs[i]
928 myIdx := i
929 wg.Add(1)
930 go func() {
931 defer wg.Done()
932 resp, err := req.RequestWith(ictx, cl.cl)
933 resps[myIdx] = resp
934 if err == nil {
935 return
936 }
937 cancel()
938 mu.Lock()
939 defer mu.Unlock()
940 if firstErr == nil {
941 firstErr = err
942 }
943 }()
944 }
945 wg.Wait()
946 if firstErr != nil {
947 return nil, firstErr
948 }
949
950 var rs DescribeACLsResults
951 for i, r := range resps {
952 f := descs[i]
953 var ds DescribedACLs
954 for _, resource := range r.Resources {
955 for _, acl := range resource.ACLs {
956 ds = append(ds, DescribedACL{
957 Principal: acl.Principal,
958 Host: acl.Host,
959 Type: resource.ResourceType,
960 Name: resource.ResourceName,
961 Pattern: resource.ResourcePatternType,
962 Operation: acl.Operation,
963 Permission: acl.PermissionType,
964 })
965 }
966 }
967 rs = append(rs, DescribeACLsResult{
968 Principal: f.Principal,
969 Host: f.Host,
970 Type: f.ResourceType,
971 Name: f.ResourceName,
972 Pattern: f.ResourcePatternType,
973 Operation: f.Operation,
974 Permission: f.PermissionType,
975 Described: ds,
976 Err: kerr.ErrorForCode(r.ErrorCode),
977 })
978 }
979 return rs, nil
980 }
981
982 var sliceAny = []string{"any"}
983
984 func createDelDescACL(b *ACLBuilder) ([]kmsg.DeleteACLsRequestFilter, []*kmsg.DescribeACLsRequest, error) {
985 if err := b.ValidateFilter(); err != nil {
986 return nil, nil, err
987 }
988
989
990
991
992
993
994 var anyAny, anyAnyHosts bool
995 if b.anyAllow && b.anyDeny && b.anyAllowHosts && b.anyDenyHosts {
996 anyAny = true
997 anyAnyHosts = true
998
999 b = b.dup()
1000 b.allow = nil
1001 b.allowHosts = nil
1002 b.deny = nil
1003 b.denyHosts = nil
1004 b.anyAllow = false
1005 b.anyAllowHosts = false
1006 b.anyDeny = false
1007 b.anyDenyHosts = false
1008 }
1009
1010 var clusters []string
1011 if b.anyCluster {
1012 clusters = []string{"kafka-cluster"}
1013 }
1014 var deletions []kmsg.DeleteACLsRequestFilter
1015 var describes []*kmsg.DescribeACLsRequest
1016 for _, typeNames := range []struct {
1017 t kmsg.ACLResourceType
1018 names []string
1019 any bool
1020 }{
1021 {kmsg.ACLResourceTypeAny, b.any, b.anyResource},
1022 {kmsg.ACLResourceTypeTopic, b.topics, b.anyTopic},
1023 {kmsg.ACLResourceTypeGroup, b.groups, b.anyGroup},
1024 {kmsg.ACLResourceTypeCluster, clusters, b.anyCluster},
1025 {kmsg.ACLResourceTypeTransactionalId, b.txnIDs, b.anyTxn},
1026 {kmsg.ACLResourceTypeDelegationToken, b.tokens, b.anyToken},
1027 } {
1028 if typeNames.any {
1029 typeNames.names = sliceAny
1030 }
1031 for _, name := range typeNames.names {
1032 for _, op := range b.ops {
1033 for _, perm := range []struct {
1034 principals []string
1035 anyPrincipal bool
1036 hosts []string
1037 anyHost bool
1038 permType kmsg.ACLPermissionType
1039 }{
1040 {
1041 b.allow,
1042 b.anyAllow,
1043 b.allowHosts,
1044 b.anyAllowHosts,
1045 kmsg.ACLPermissionTypeAllow,
1046 },
1047 {
1048 b.deny,
1049 b.anyDeny,
1050 b.denyHosts,
1051 b.anyDenyHosts,
1052 kmsg.ACLPermissionTypeDeny,
1053 },
1054 {
1055 nil,
1056 anyAny,
1057 nil,
1058 anyAnyHosts,
1059 kmsg.ACLPermissionTypeAny,
1060 },
1061 } {
1062 if perm.anyPrincipal {
1063 perm.principals = sliceAny
1064 }
1065 if perm.anyHost {
1066 perm.hosts = sliceAny
1067 }
1068 for _, principal := range perm.principals {
1069 for _, host := range perm.hosts {
1070 deletion := kmsg.NewDeleteACLsRequestFilter()
1071 describe := kmsg.NewPtrDescribeACLsRequest()
1072
1073 deletion.ResourceType = typeNames.t
1074 describe.ResourceType = typeNames.t
1075
1076 if !typeNames.any {
1077 deletion.ResourceName = kmsg.StringPtr(name)
1078 describe.ResourceName = kmsg.StringPtr(name)
1079 }
1080
1081 deletion.ResourcePatternType = b.pattern
1082 describe.ResourcePatternType = b.pattern
1083
1084 deletion.Operation = op
1085 describe.Operation = op
1086
1087 if !perm.anyPrincipal {
1088 deletion.Principal = kmsg.StringPtr(principal)
1089 describe.Principal = kmsg.StringPtr(principal)
1090 }
1091
1092 if !perm.anyHost {
1093 deletion.Host = kmsg.StringPtr(host)
1094 describe.Host = kmsg.StringPtr(host)
1095 }
1096
1097 deletion.PermissionType = perm.permType
1098 describe.PermissionType = perm.permType
1099
1100 deletions = append(deletions, deletion)
1101 describes = append(describes, describe)
1102 }
1103 }
1104 }
1105 }
1106 }
1107 }
1108 return deletions, describes, nil
1109 }
1110
View as plain text