...

Source file src/github.com/twmb/franz-go/pkg/kadm/acls.go

Documentation: github.com/twmb/franz-go/pkg/kadm

     1  package kadm
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  	"sync"
     8  
     9  	"github.com/twmb/franz-go/pkg/kerr"
    10  	"github.com/twmb/franz-go/pkg/kmsg"
    11  )
    12  
    13  // ACLBuilder is a builder that is used for batch creating / listing / deleting
    14  // ACLS.
    15  //
    16  // An ACL consists of five components:
    17  //
    18  //   - the user (principal)
    19  //   - the host the user runs on
    20  //   - what resource to access (topic name, group id, etc.)
    21  //   - the operation (read, write)
    22  //   - whether to allow or deny the above
    23  //
    24  // This builder allows for adding the above five components in batches and then
    25  // creating, listing, or deleting a batch of ACLs in one go. This builder
    26  // merges the fifth component (allowing or denying) into allowing principals
    27  // and hosts and denying principals and hosts. The builder must always have an
    28  // Allow or Deny. For creating, the host is optional and defaults to the
    29  // wildcard * that allows or denies all hosts. For listing / deleting, the host
    30  // is also required (specifying no hosts matches all hosts, but you must
    31  // specify this).
    32  //
    33  // Building works on a multiplying factor: every user, every host, every
    34  // resource, and every operation is combined (principals * hosts * resources *
    35  // operations).
    36  //
    37  // With the Kafka simple authorizer (and most reimplementations), all
    38  // principals are required to have the "User:" prefix. The PrefixUserExcept
    39  // function can be used to easily add the "User:" prefix if missing.
    40  //
    41  // The full set of operations and which requests require what operations is
    42  // described in a large doc comment on the ACLOperation type.
    43  //
    44  // Lastly, resources to access / deny access to can be created / matched based
    45  // on literal (exact) names, or on prefix names, or more. See the ACLPattern
    46  // docs for more information.
    47  type ACLBuilder struct {
    48  	any         []string
    49  	anyResource bool
    50  	topics      []string
    51  	anyTopic    bool
    52  	groups      []string
    53  	anyGroup    bool
    54  	anyCluster  bool
    55  	txnIDs      []string
    56  	anyTxn      bool
    57  	tokens      []string
    58  	anyToken    bool
    59  
    60  	allow         []string
    61  	anyAllow      bool
    62  	allowHosts    []string
    63  	anyAllowHosts bool
    64  	deny          []string
    65  	anyDeny       bool
    66  	denyHosts     []string
    67  	anyDenyHosts  bool
    68  
    69  	ops []ACLOperation
    70  
    71  	pattern ACLPattern
    72  }
    73  
    74  // PrefixUser prefixes all allowed and denied principals with "User:".
    75  func (b *ACLBuilder) PrefixUser() {
    76  	b.PrefixUserExcept()
    77  }
    78  
    79  // PrefixUserExcept prefixes all allowed and denied principals with "User:",
    80  // unless they have any of the given except prefixes.
    81  func (b *ACLBuilder) PrefixUserExcept(except ...string) {
    82  	replace := func(u string) string {
    83  		if !strings.HasPrefix(u, "User:") {
    84  			for _, e := range except {
    85  				if strings.HasPrefix(u, e) {
    86  					return u
    87  				}
    88  			}
    89  			return "User:" + u
    90  		}
    91  		return u
    92  	}
    93  
    94  	for i, u := range b.allow {
    95  		b.allow[i] = replace(u)
    96  	}
    97  	for i, u := range b.deny {
    98  		b.deny[i] = replace(u)
    99  	}
   100  }
   101  
   102  // NewACLs returns a new ACL builder.
   103  func NewACLs() *ACLBuilder {
   104  	return new(ACLBuilder)
   105  }
   106  
   107  // AnyResource lists & deletes ACLs of any type matching the given names
   108  // (pending other filters). If no names are given, this matches all names.
   109  //
   110  // This returns the input pointer.
   111  //
   112  // This function does nothing for creating.
   113  func (b *ACLBuilder) AnyResource(name ...string) *ACLBuilder {
   114  	b.any = name
   115  	if len(name) == 0 {
   116  		b.anyResource = true
   117  	}
   118  	return b
   119  }
   120  
   121  // Topics lists/deletes/creates ACLs of resource type "topic" for the given
   122  // topics.
   123  //
   124  // This returns the input pointer.
   125  //
   126  // For listing or deleting, if this is provided no topics, all "topic" resource
   127  // type ACLs are matched. For creating, if no topics are provided, this
   128  // function does nothing.
   129  func (b *ACLBuilder) Topics(t ...string) *ACLBuilder {
   130  	b.topics = t
   131  	if len(t) == 0 {
   132  		b.anyTopic = true
   133  	}
   134  	return b
   135  }
   136  
   137  // MaybeTopics is the same as Topics, but does not match all topics if none are
   138  // provided.
   139  func (b *ACLBuilder) MaybeTopics(t ...string) *ACLBuilder { b.topics = t; return b }
   140  
   141  // Groups lists/deletes/creates ACLs of resource type "group" for the given
   142  // groups.
   143  //
   144  // This returns the input pointer.
   145  //
   146  // For listing or deleting, if this is provided no groups, all "group" resource
   147  // type ACLs are matched. For creating, if no groups are provided, this
   148  // function does nothing.
   149  func (b *ACLBuilder) Groups(g ...string) *ACLBuilder {
   150  	b.groups = g
   151  	if len(g) == 0 {
   152  		b.anyGroup = true
   153  	}
   154  	return b
   155  }
   156  
   157  // MaybeGroups is the same as Groups, but does not match all groups if none are
   158  // provided.
   159  func (b *ACLBuilder) MaybeGroups(g ...string) *ACLBuilder { b.groups = g; return b }
   160  
   161  // Clusters lists/deletes/creates ACLs of resource type "cluster".
   162  //
   163  // This returns the input pointer.
   164  //
   165  // There is only one type of cluster in Kafka, "kafka-cluster". Opting in to
   166  // listing or deleting by cluster inherently matches all ACLS of resource type
   167  // cluster. For creating, this function allows for creating cluster ACLs.
   168  func (b *ACLBuilder) Clusters() *ACLBuilder {
   169  	b.anyCluster = true
   170  	return b
   171  }
   172  
   173  // MaybeClusters is the same as Clusters, but only matches clusters if c is
   174  // true.
   175  func (b *ACLBuilder) MaybeClusters(c bool) *ACLBuilder { b.anyCluster = c; return b }
   176  
   177  // TransactionalIDs lists/deletes/creates ACLs of resource type
   178  // "transactional_id" for the given transactional IDs.
   179  //
   180  // This returns the input pointer.
   181  //
   182  // For listing or deleting, if this is provided no IDs, all "transactional_id"
   183  // resource type ACLs matched. For creating, if no IDs are provided, this
   184  // function does nothing.
   185  func (b *ACLBuilder) TransactionalIDs(x ...string) *ACLBuilder {
   186  	b.txnIDs = x
   187  	if len(x) == 0 {
   188  		b.anyTxn = true
   189  	}
   190  	return b
   191  }
   192  
   193  // MaybeTransactionalIDs is the same as TransactionalIDs, but does not match
   194  // all transactional ID's if none are provided.
   195  func (b *ACLBuilder) MaybeTransactionalIDs(x ...string) *ACLBuilder { b.txnIDs = x; return b }
   196  
   197  // DelegationTokens lists/deletes/creates ACLs of resource type
   198  // "delegation_token" for the given delegation tokens.
   199  //
   200  // This returns the input pointer.
   201  //
   202  // For listing or deleting, if this is provided no tokens, all
   203  // "delegation_token" resource type ACLs are matched. For creating, if no
   204  // tokens are provided, this function does nothing.
   205  func (b *ACLBuilder) DelegationTokens(t ...string) *ACLBuilder {
   206  	b.tokens = t
   207  	if len(t) == 0 {
   208  		b.anyToken = true
   209  	}
   210  	return b
   211  }
   212  
   213  // MaybeDelegationTokens is the same as DelegationTokens, but does not match
   214  // all tokens if none are provided.
   215  func (b *ACLBuilder) MaybeDelegationTokens(t ...string) *ACLBuilder { b.tokens = t; return b }
   216  
   217  // Allow sets the principals to add allow permissions for. For listing and
   218  // deleting, you must also use AllowHosts.
   219  //
   220  // This returns the input pointer.
   221  //
   222  // For creating, if this is not paired with AllowHosts, the user will have
   223  // access to all hosts (the wildcard *).
   224  //
   225  // For listing & deleting, if the principals are empty, this matches any user.
   226  func (b *ACLBuilder) Allow(principals ...string) *ACLBuilder {
   227  	b.allow = principals
   228  	if len(principals) == 0 {
   229  		b.anyAllow = true
   230  	}
   231  	return b
   232  }
   233  
   234  // MaybeAllow is the same as Allow, but does not match all allowed principals
   235  // if none are provided.
   236  func (b *ACLBuilder) MaybeAllow(principals ...string) *ACLBuilder { b.allow = principals; return b }
   237  
   238  // AllowHosts sets the hosts to add allow permissions for. If using this, you
   239  // must also use Allow.
   240  //
   241  // This returns the input pointer.
   242  //
   243  // For creating, if this is empty, the user will have access to all hosts (the
   244  // wildcard *) and this function is actually not necessary.
   245  //
   246  // For listing & deleting, if the hosts are empty, this matches any host.
   247  func (b *ACLBuilder) AllowHosts(hosts ...string) *ACLBuilder {
   248  	b.allowHosts = hosts
   249  	if len(hosts) == 0 {
   250  		b.anyAllowHosts = true
   251  	}
   252  	return b
   253  }
   254  
   255  // MaybeAllowHosts is the same as AllowHosts, but does not match all allowed
   256  // hosts if none are provided.
   257  func (b *ACLBuilder) MaybeAllowHosts(hosts ...string) *ACLBuilder { b.allowHosts = hosts; return b }
   258  
   259  // Deny sets the principals to add deny permissions for. For listing and
   260  // deleting, you must also use DenyHosts.
   261  //
   262  // This returns the input pointer.
   263  //
   264  // For creating, if this is not paired with DenyHosts, the user will be denied
   265  // access to all hosts (the wildcard *).
   266  //
   267  // For listing & deleting, if the principals are empty, this matches any user.
   268  func (b *ACLBuilder) Deny(principals ...string) *ACLBuilder {
   269  	b.deny = principals
   270  	if len(principals) == 0 {
   271  		b.anyDeny = true
   272  	}
   273  	return b
   274  }
   275  
   276  // MaybeDeny is the same as Deny, but does not match all denied principals if
   277  // none are provided.
   278  func (b *ACLBuilder) MaybeDeny(principals ...string) *ACLBuilder { b.deny = principals; return b }
   279  
   280  // DenyHosts sets the hosts to add deny permissions for. If using this, you
   281  // must also use Deny.
   282  //
   283  // This returns the input pointer.
   284  //
   285  // For creating, if this is empty, the user will be denied access to all hosts
   286  // (the wildcard *) and this function is actually not necessary.
   287  //
   288  // For listing & deleting, if the hosts are empty, this matches any host.
   289  func (b *ACLBuilder) DenyHosts(hosts ...string) *ACLBuilder {
   290  	b.denyHosts = hosts
   291  	if len(hosts) == 0 {
   292  		b.anyDenyHosts = true
   293  	}
   294  	return b
   295  }
   296  
   297  // MaybeDenyHosts is the same as DenyHosts, but does not match all denied
   298  // hosts if none are provided.
   299  func (b *ACLBuilder) MaybeDenyHosts(hosts ...string) *ACLBuilder { b.denyHosts = hosts; return b }
   300  
   301  // ACLOperation is a type alias for kmsg.ACLOperation, which is an enum
   302  // containing all Kafka ACL operations and has helper functions.
   303  //
   304  // Kafka requests require the following operations (broker <=> broker ACLs
   305  // elided):
   306  //
   307  //	PRODUCING/CONSUMING
   308  //	===================
   309  //	Produce      WRITE on TOPIC for topics
   310  //	             WRITE on TRANSACTIONAL_ID for txn id (if transactionally producing)
   311  //
   312  //	Fetch        READ on TOPIC for topics
   313  //
   314  //	ListOffsets  DESCRIBE on TOPIC for topics
   315  //
   316  //	Metadata     DESCRIBE on TOPIC for topics
   317  //	             CREATE on CLUSTER for kafka-cluster (if automatically creating new topics)
   318  //	             CREATE on TOPIC for topics (if automatically creating new topics)
   319  //
   320  //	OffsetForLeaderEpoch  DESCRIBE on TOPIC for topics
   321  //
   322  //	GROUPS
   323  //	======
   324  //	FindCoordinator  DESCRIBE on GROUP for group (if finding group coordinator)
   325  //	                 DESCRIBE on TRANSACTIONAL_ID for id (if finding transactiona coordinator)
   326  //
   327  //	OffsetCommit     READ on GROUP for group
   328  //	                 READ on TOPIC for topics
   329  //
   330  //	OffsetFetch      DESCRIBE on GROUP for group
   331  //	                 DESCRIBE on TOPIC for topics
   332  //
   333  //	OffsetDelete     DELETE on GROUP For group
   334  //	                 READ on TOPIC for topics
   335  //
   336  //	JoinGroup        READ on GROUP for group
   337  //	Heartbeat        READ on GROUP for group
   338  //	LeaveGroup       READ on GROUP for group
   339  //	SyncGroup        READ on GROUP for group
   340  //
   341  //	DescribeGroup    DESCRIBE on GROUP for groups
   342  //
   343  //	ListGroups       DESCRIBE on GROUP for groups
   344  //	                 or, DESCRIBE on CLUSTER for kafka-cluster
   345  //
   346  //	DeleteGroups     DELETE on GROUP for groups
   347  //
   348  //	TRANSACTIONS (including FindCoordinator above)
   349  //	============
   350  //	InitProducerID      WRITE on TRANSACTIONAL_ID for id, if using transactions
   351  //	                    or, IDEMPOTENT_WRITE on CLUSTER for kafka-cluster, if pre Kafka 3.0
   352  //	                    or, WRITE on TOPIC for any topic, if Kafka 3.0+
   353  //
   354  //	AddPartitionsToTxn  WRITE on TRANSACTIONAL_ID for id
   355  //	                    WRITE on TOPIC for topics
   356  //
   357  //	AddOffsetsToTxn     WRITE on TRANSACTIONAL_ID for id
   358  //	                    READ on GROUP for group
   359  //
   360  //	EndTxn              WRITE on TRANSACTIONAL_ID for id
   361  //
   362  //	TxnOffsetCommit     WRITE on TRANSACTIONAL_ID for id
   363  //	                    READ on GROUP for group
   364  //	                    READ on TOPIC for topics
   365  //
   366  //	TOPIC ADMIN
   367  //	===========
   368  //	CreateTopics      CREATE on CLUSTER for kafka-cluster
   369  //	                  CREATE on TOPIC for topics
   370  //	                  DESCRIBE_CONFIGS on TOPIC for topics, for returning topic configs on create
   371  //
   372  //	CreatePartitions  ALTER on TOPIC for topics
   373  //
   374  //	DeleteTopics      DELETE on TOPIC for topics
   375  //	                  DESCRIBE on TOPIC for topics, if deleting by topic id (in addition to prior ACL)
   376  //
   377  //	DeleteRecords     DELETE on TOPIC for topics
   378  //
   379  //	CONFIG ADMIN
   380  //	============
   381  //	DescribeConfigs          DESCRIBE_CONFIGS on CLUSTER for kafka-cluster, for broker or broker-logger describing
   382  //	                         DESCRIBE_CONFIGS on TOPIC for topics, for topic describing
   383  //
   384  //	AlterConfigs             ALTER_CONFIGS on CLUSTER for kafka-cluster, for broker altering
   385  //	                         ALTER_CONFIGS on TOPIC for topics, for topic altering
   386  //
   387  //	IncrementalAlterConfigs  ALTER_CONFIGS on CLUSTER for kafka-cluster, for broker or broker-logger altering
   388  //	                         ALTER_CONFIGS on TOPIC for topics, for topic altering
   389  //
   390  //
   391  //	MISC ADMIN
   392  //	==========
   393  //	AlterReplicaLogDirs  ALTER on CLUSTER for kafka-cluster
   394  //	DescribeLogDirs      DESCRIBE on CLUSTER for kafka-cluster
   395  //
   396  //	AlterPartitionAssignments   ALTER on CLUSTER for kafka-cluster
   397  //	ListPartitionReassignments  DESCRIBE on CLUSTER for kafka-cluster
   398  //
   399  //	DescribeDelegationTokens    DESCRIBE on DELEGATION_TOKEN for id
   400  //
   401  //	ElectLeaders          ALTER on CLUSTER for kafka-cluster
   402  //
   403  //	DescribeClientQuotas  DESCRIBE_CONFIGS on CLUSTER for kafka-cluster
   404  //	AlterClientQuotas     ALTER_CONFIGS on CLUSTER for kafka-cluster
   405  //
   406  //	DescribeUserScramCredentials  DESCRIBE on CLUSTER for kafka-cluster
   407  //	AlterUserScramCredentials     ALTER on CLUSTER for kafka-cluster
   408  //
   409  //	UpdateFeatures        ALTER on CLUSTER for kafka-cluster
   410  //
   411  //	DescribeCluster       DESCRIBE on CLUSTER for kafka-cluster
   412  //
   413  //	DescribeProducerIDs   READ on TOPIC for topics
   414  //	DescribeTransactions  DESCRIBE on TRANSACTIONAL_ID for ids
   415  //	                      DESCRIBE on TOPIC for topics
   416  //	ListTransactions      DESCRIBE on TRANSACTIONAL_ID for ids
   417  type ACLOperation = kmsg.ACLOperation
   418  
   419  const (
   420  	// OpUnknown is returned for unknown operations.
   421  	OpUnknown ACLOperation = kmsg.ACLOperationUnknown
   422  
   423  	// OpAny, used for listing and deleting, matches any operation.
   424  	OpAny ACLOperation = kmsg.ACLOperationAny
   425  
   426  	// OpAll is a shortcut for allowing / denying all operations.
   427  	OpAll ACLOperation = kmsg.ACLOperationAll
   428  
   429  	// OpRead is the READ operation.
   430  	OpRead ACLOperation = kmsg.ACLOperationRead
   431  
   432  	// OpWrite is the WRITE operation.
   433  	OpWrite ACLOperation = kmsg.ACLOperationWrite
   434  
   435  	// OpCreate is the CREATE operation.
   436  	OpCreate ACLOperation = kmsg.ACLOperationCreate
   437  
   438  	// OpDelete is the DELETE operation.
   439  	OpDelete ACLOperation = kmsg.ACLOperationDelete
   440  
   441  	// OpAlter is the ALTER operation.
   442  	OpAlter ACLOperation = kmsg.ACLOperationAlter
   443  
   444  	// OpDescribe is the DESCRIBE operation.
   445  	OpDescribe ACLOperation = kmsg.ACLOperationDescribe
   446  
   447  	// OpClusterAction is the CLUSTER_ACTION operation. This operation is
   448  	// used for any broker<=>broker communication and is not needed by
   449  	// clients.
   450  	OpClusterAction ACLOperation = kmsg.ACLOperationClusterAction
   451  
   452  	// OpDescribeConfigs is the DESCRIBE_CONFIGS operation.
   453  	OpDescribeConfigs ACLOperation = kmsg.ACLOperationDescribeConfigs
   454  
   455  	// OpAlterConfigs is the ALTER_CONFIGS operation.
   456  	OpAlterConfigs ACLOperation = kmsg.ACLOperationAlterConfigs
   457  
   458  	// OpIdempotentWrite is the IDEMPOTENT_WRITE operation. As of Kafka
   459  	// 3.0+, this has been deprecated and replaced by the ability to WRITE
   460  	// on any topic.
   461  	OpIdempotentWrite ACLOperation = kmsg.ACLOperationIdempotentWrite
   462  )
   463  
   464  // Operations sets operations to allow or deny. Passing no operations defaults
   465  // to OpAny.
   466  //
   467  // This returns the input pointer.
   468  //
   469  // For creating, OpAny returns an error, for it is strictly used for filters
   470  // (listing & deleting).
   471  func (b *ACLBuilder) Operations(operations ...ACLOperation) *ACLBuilder {
   472  	b.ops = operations
   473  	if len(operations) == 0 {
   474  		b.ops = []ACLOperation{OpAny}
   475  	}
   476  	return b
   477  }
   478  
   479  // MaybeOperations is the same as Operations, but does not match all operations
   480  // if none are provided.
   481  func (b *ACLBuilder) MaybeOperations(operations ...ACLOperation) *ACLBuilder {
   482  	if len(operations) > 0 {
   483  		b.Operations(operations...)
   484  	}
   485  	return b
   486  }
   487  
   488  // ACLPattern is a type alias for kmsg.ACLResourcePatternType, which is an enum
   489  // containing all Kafka ACL resource pattern options.
   490  //
   491  // Creating/listing/deleting ACLs works on a resource name basis: every ACL
   492  // created has a name, and every ACL filtered for listing / deleting matches by
   493  // name. The name by default is "literal", meaning created ACLs will have the
   494  // exact name, and matched ACLs must match completely.
   495  //
   496  // Prefixed names allow for creating an ACL that matches any prefix: principals
   497  // foo-bar and foo-baz both have the prefix "foo-", meaning a READ on TOPIC for
   498  // User:foo- with prefix pattern will allow both of those principals to read
   499  // the topic.
   500  //
   501  // Any and match are used for listing and deleting. Any will match any name, be
   502  // it literal or prefix or a wildcard name. There is no need for specifying
   503  // topics, groups, etc. when using any resource pattern.
   504  //
   505  // Alternatively, match requires a name, but it matches any literal name (exact
   506  // match), any prefix, and any wildcard.
   507  type ACLPattern = kmsg.ACLResourcePatternType
   508  
   509  const (
   510  	// ACLPatternUnknown is returned for unknown patterns.
   511  	ACLPatternUnknown ACLPattern = kmsg.ACLResourcePatternTypeUnknown
   512  
   513  	// ACLPatternAny is the ANY resource pattern.
   514  	ACLPatternAny ACLPattern = kmsg.ACLResourcePatternTypeAny
   515  
   516  	// ACLPatternMatch is the MATCH resource pattern.
   517  	ACLPatternMatch ACLPattern = kmsg.ACLResourcePatternTypeMatch
   518  
   519  	// ACLPatternLiteral is the LITERAL resource pattern, the default.
   520  	ACLPatternLiteral ACLPattern = kmsg.ACLResourcePatternTypeLiteral
   521  
   522  	// ACLPatternPrefixed is the PREFIXED resource pattern.
   523  	ACLPatternPrefixed ACLPattern = kmsg.ACLResourcePatternTypePrefixed
   524  )
   525  
   526  // ResourcePatternType sets the pattern type to use when creating or filtering
   527  // ACL resource names, overriding the default of LITERAL.
   528  //
   529  // This returns the input pointer.
   530  //
   531  // For creating, only LITERAL and PREFIXED are supported.
   532  func (b *ACLBuilder) ResourcePatternType(pattern ACLPattern) *ACLBuilder {
   533  	b.pattern = pattern
   534  	return b
   535  }
   536  
   537  // ValidateCreate returns an error if the builder is invalid for creating ACLs.
   538  func (b *ACLBuilder) ValidateCreate() error {
   539  	for _, op := range b.ops {
   540  		switch op {
   541  		case OpAny, OpUnknown:
   542  			return fmt.Errorf("invalid operation %s for creating ACLs", op)
   543  		}
   544  	}
   545  
   546  	switch b.pattern {
   547  	case ACLPatternLiteral, ACLPatternPrefixed:
   548  	default:
   549  		return fmt.Errorf("invalid acl resource pattern %s for creating ACLs", b.pattern)
   550  	}
   551  
   552  	if len(b.allowHosts) != 0 && len(b.allow) == 0 {
   553  		return fmt.Errorf("invalid allow hosts with no allow principals")
   554  	}
   555  	if len(b.denyHosts) != 0 && len(b.deny) == 0 {
   556  		return fmt.Errorf("invalid deny hosts with no deny principals")
   557  	}
   558  	return nil
   559  }
   560  
   561  // ValidateDelete is an alias for ValidateFilter.
   562  func (b *ACLBuilder) ValidateDelete() error { return b.ValidateFilter() }
   563  
   564  // ValidateDescribe is an alias for ValidateFilter.
   565  func (b *ACLBuilder) ValidateDescribe() error { return b.ValidateFilter() }
   566  
   567  // ValidateFilter returns an error if the builder is invalid for deleting or
   568  // describing ACLs (which both operate on a filter basis).
   569  func (b *ACLBuilder) ValidateFilter() error {
   570  	if len(b.allowHosts) != 0 && len(b.allow) == 0 && !b.anyAllow {
   571  		return fmt.Errorf("invalid allow hosts with no allow principals")
   572  	}
   573  	if len(b.allow) != 0 && len(b.allowHosts) == 0 && !b.anyAllowHosts {
   574  		return fmt.Errorf("invalid allow principals with no allow hosts")
   575  	}
   576  	if len(b.denyHosts) != 0 && len(b.deny) == 0 && !b.anyDeny {
   577  		return fmt.Errorf("invalid deny hosts with no deny principals")
   578  	}
   579  	if len(b.deny) != 0 && len(b.denyHosts) == 0 && !b.anyDenyHosts {
   580  		return fmt.Errorf("invalid deny principals with no deny hosts")
   581  	}
   582  	return nil
   583  }
   584  
   585  // HasAnyFilter returns whether any field in this builder is opted into "any",
   586  // meaning a wide glob. This would be if you used Topics with no topics, and so
   587  // on. This function can be used to detect if you accidentally opted into a
   588  // non-specific ACL.
   589  //
   590  // The evaluated fields are: resources, principals/hosts, a single OpAny
   591  // operation, and an Any pattern.
   592  func (b *ACLBuilder) HasAnyFilter() bool {
   593  	return b.anyResource ||
   594  		b.anyTopic ||
   595  		b.anyGroup ||
   596  		b.anyTxn ||
   597  		b.anyToken ||
   598  		b.anyAllow ||
   599  		b.anyAllowHosts ||
   600  		b.anyDeny ||
   601  		b.anyDenyHosts ||
   602  		b.hasOpAny() ||
   603  		b.pattern == ACLPatternAny
   604  }
   605  
   606  func (b *ACLBuilder) hasOpAny() bool {
   607  	for _, op := range b.ops {
   608  		if op == OpAny {
   609  			return true
   610  		}
   611  	}
   612  	return false
   613  }
   614  
   615  // HasResource returns true if the builder has a non-empty resource (topic,
   616  // group, ...), or if any resource has "any" set to true.
   617  func (b *ACLBuilder) HasResource() bool {
   618  	l := len(b.any) +
   619  		len(b.topics) +
   620  		len(b.groups) +
   621  		len(b.txnIDs) +
   622  		len(b.tokens)
   623  	return l > 0 ||
   624  		b.anyResource ||
   625  		b.anyTopic ||
   626  		b.anyGroup ||
   627  		b.anyCluster ||
   628  		b.anyTxn ||
   629  		b.anyToken
   630  }
   631  
   632  // HasPrincipals returns if any allow or deny principals have been set, or if
   633  // their "any" field is true.
   634  func (b *ACLBuilder) HasPrincipals() bool {
   635  	return len(b.allow) > 0 ||
   636  		b.anyAllow ||
   637  		len(b.deny) > 0 ||
   638  		b.anyDeny
   639  }
   640  
   641  // HasHosts returns if any allow or deny hosts have been set, or if their "any"
   642  // field is true.
   643  func (b *ACLBuilder) HasHosts() bool {
   644  	return len(b.allowHosts) > 0 ||
   645  		b.anyAllowHosts ||
   646  		len(b.denyHosts) > 0 ||
   647  		b.anyDenyHosts
   648  }
   649  
   650  func (b *ACLBuilder) dup() *ACLBuilder { // shallow copy
   651  	d := *b
   652  	return &d
   653  }
   654  
   655  // CreateACLsResult is a result for an individual ACL creation.
   656  type CreateACLsResult struct {
   657  	Principal string
   658  	Host      string
   659  
   660  	Type       kmsg.ACLResourceType   // Type is the type of resource this is.
   661  	Name       string                 // Name is the name of the resource allowed / denied.
   662  	Pattern    ACLPattern             // Pattern is the name pattern.
   663  	Operation  ACLOperation           // Operation is the operation allowed / denied.
   664  	Permission kmsg.ACLPermissionType // Permission is whether this is allowed / denied.
   665  
   666  	Err error // Err is the error for this ACL creation.
   667  }
   668  
   669  // CreateACLsResults contains all results to created ACLs.
   670  type CreateACLsResults []CreateACLsResult
   671  
   672  // CreateACLs creates a batch of ACLs using the ACL builder, validating the
   673  // input before issuing the CreateACLs request.
   674  //
   675  // If the input is invalid, or if the response fails, or if the response does
   676  // not contain as many ACLs as we issued in our create request, this returns an
   677  // error.
   678  func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResults, error) {
   679  	if err := b.ValidateCreate(); err != nil {
   680  		return nil, err
   681  	}
   682  	if len(b.allow) != 0 && len(b.allowHosts) == 0 {
   683  		b.allowHosts = []string{"*"}
   684  	}
   685  	if len(b.deny) != 0 && len(b.denyHosts) == 0 {
   686  		b.denyHosts = []string{"*"}
   687  	}
   688  
   689  	var clusters []string
   690  	if b.anyCluster {
   691  		clusters = []string{"kafka-cluster"}
   692  	}
   693  
   694  	req := kmsg.NewPtrCreateACLsRequest()
   695  	for _, typeNames := range []struct {
   696  		t     kmsg.ACLResourceType
   697  		names []string
   698  	}{
   699  		{kmsg.ACLResourceTypeTopic, b.topics},
   700  		{kmsg.ACLResourceTypeGroup, b.groups},
   701  		{kmsg.ACLResourceTypeCluster, clusters},
   702  		{kmsg.ACLResourceTypeTransactionalId, b.txnIDs},
   703  		{kmsg.ACLResourceTypeDelegationToken, b.tokens},
   704  	} {
   705  		for _, name := range typeNames.names {
   706  			for _, op := range b.ops {
   707  				for _, perm := range []struct {
   708  					principals []string
   709  					hosts      []string
   710  					permType   kmsg.ACLPermissionType
   711  				}{
   712  					{b.allow, b.allowHosts, kmsg.ACLPermissionTypeAllow},
   713  					{b.deny, b.denyHosts, kmsg.ACLPermissionTypeDeny},
   714  				} {
   715  					for _, principal := range perm.principals {
   716  						for _, host := range perm.hosts {
   717  							c := kmsg.NewCreateACLsRequestCreation()
   718  							c.ResourceType = typeNames.t
   719  							c.ResourceName = name
   720  							c.ResourcePatternType = b.pattern
   721  							c.Operation = op
   722  							c.Principal = principal
   723  							c.Host = host
   724  							c.PermissionType = perm.permType
   725  							req.Creations = append(req.Creations, c)
   726  						}
   727  					}
   728  				}
   729  			}
   730  		}
   731  	}
   732  
   733  	resp, err := req.RequestWith(ctx, cl.cl)
   734  	if err != nil {
   735  		return nil, err
   736  	}
   737  
   738  	if len(resp.Results) != len(req.Creations) {
   739  		return nil, fmt.Errorf("received %d results to %d creations", len(resp.Results), len(req.Creations))
   740  	}
   741  
   742  	var rs CreateACLsResults
   743  	for i, r := range resp.Results {
   744  		c := &req.Creations[i]
   745  		rs = append(rs, CreateACLsResult{
   746  			Principal: c.Principal,
   747  			Host:      c.Host,
   748  
   749  			Type:       c.ResourceType,
   750  			Name:       c.ResourceName,
   751  			Pattern:    c.ResourcePatternType,
   752  			Operation:  c.Operation,
   753  			Permission: c.PermissionType,
   754  
   755  			Err: kerr.ErrorForCode(r.ErrorCode),
   756  		})
   757  	}
   758  
   759  	return rs, nil
   760  }
   761  
   762  // DeletedACL an ACL that was deleted.
   763  type DeletedACL struct {
   764  	Principal string // Principal is this deleted ACL's principal.
   765  	Host      string // Host is this deleted ACL's host.
   766  
   767  	Type       kmsg.ACLResourceType   // Type is this deleted ACL's resource type.
   768  	Name       string                 // Name is this deleted ACL's resource name.
   769  	Pattern    ACLPattern             // Pattern is this deleted ACL's resource name pattern.
   770  	Operation  ACLOperation           // Operation is this deleted ACL's operation.
   771  	Permission kmsg.ACLPermissionType // Permission this deleted ACLs permission.
   772  
   773  	Err error // Err is non-nil if this match has an error.
   774  }
   775  
   776  // DeletedACLs contains ACLs that were deleted from a single delete filter.
   777  type DeletedACLs []DeletedACL
   778  
   779  // DeleteACLsResult contains the input used for a delete ACL filter, and the
   780  // deletes that the filter matched or the error for this filter.
   781  //
   782  // All fields but Deleted and Err are set from the request input. The response
   783  // sets either Deleted (potentially to nothing if the filter matched nothing)
   784  // or Err.
   785  type DeleteACLsResult struct {
   786  	Principal *string // Principal is the optional user that was used in this filter.
   787  	Host      *string // Host is the optional host that was used in this filter.
   788  
   789  	Type       kmsg.ACLResourceType   // Type is the type of resource used for this filter.
   790  	Name       *string                // Name is the name of the resource used for this filter.
   791  	Pattern    ACLPattern             // Pattern is the name pattern used for this filter.
   792  	Operation  ACLOperation           // Operation is the operation used for this filter.
   793  	Permission kmsg.ACLPermissionType // Permission is permission used for this filter.
   794  
   795  	Deleted DeletedACLs // Deleted contains all ACLs this delete filter matched.
   796  
   797  	Err error // Err is non-nil if this filter has an error.
   798  }
   799  
   800  // DeleteACLsResults contains all results to deleted ACLs.
   801  type DeleteACLsResults []DeleteACLsResult
   802  
   803  // DeleteACLs deletes a batch of ACLs using the ACL builder, validating the
   804  // input before issuing the DeleteACLs request.
   805  //
   806  // If the input is invalid, or if the response fails, or if the response does
   807  // not contain as many ACL results as we issued in our delete request, this
   808  // returns an error.
   809  //
   810  // Deleting ACLs works on a filter basis: a single filter can match many ACLs.
   811  // For example, deleting with operation ANY matches any operation. For safety /
   812  // verification purposes, you an DescribeACLs with the same builder first to
   813  // see what would be deleted.
   814  func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResults, error) {
   815  	dels, _, err := createDelDescACL(b)
   816  	if err != nil {
   817  		return nil, err
   818  	}
   819  
   820  	req := kmsg.NewPtrDeleteACLsRequest()
   821  	req.Filters = dels
   822  	resp, err := req.RequestWith(ctx, cl.cl)
   823  	if err != nil {
   824  		return nil, err
   825  	}
   826  	if len(resp.Results) != len(req.Filters) {
   827  		return nil, fmt.Errorf("received %d results to %d filters", len(resp.Results), len(req.Filters))
   828  	}
   829  
   830  	var rs DeleteACLsResults
   831  	for i, r := range resp.Results {
   832  		f := &req.Filters[i]
   833  		var ms DeletedACLs
   834  		for _, m := range r.MatchingACLs {
   835  			ms = append(ms, DeletedACL{
   836  				Principal:  m.Principal,
   837  				Host:       m.Host,
   838  				Type:       m.ResourceType,
   839  				Name:       m.ResourceName,
   840  				Pattern:    m.ResourcePatternType,
   841  				Operation:  m.Operation,
   842  				Permission: m.PermissionType,
   843  				Err:        kerr.ErrorForCode(m.ErrorCode),
   844  			})
   845  		}
   846  		rs = append(rs, DeleteACLsResult{
   847  			Principal:  f.Principal,
   848  			Host:       f.Host,
   849  			Type:       f.ResourceType,
   850  			Name:       f.ResourceName,
   851  			Pattern:    f.ResourcePatternType,
   852  			Operation:  f.Operation,
   853  			Permission: f.PermissionType,
   854  			Deleted:    ms,
   855  			Err:        kerr.ErrorForCode(r.ErrorCode),
   856  		})
   857  	}
   858  	return rs, nil
   859  }
   860  
   861  // DescribedACL is an ACL that was described.
   862  type DescribedACL struct {
   863  	Principal string // Principal is this described ACL's principal.
   864  	Host      string // Host is this described ACL's host.
   865  
   866  	Type       kmsg.ACLResourceType   // Type is this described ACL's resource type.
   867  	Name       string                 // Name is this described ACL's resource name.
   868  	Pattern    ACLPattern             // Pattern is this described ACL's resource name pattern.
   869  	Operation  ACLOperation           // Operation is this described ACL's operation.
   870  	Permission kmsg.ACLPermissionType // Permission this described ACLs permission.
   871  }
   872  
   873  // DescribedACLs contains ACLs that were described from a single describe
   874  // filter.
   875  type DescribedACLs []DescribedACL
   876  
   877  // DescribeACLsResults contains the input used for a describe ACL filter, and
   878  // the describes that the filter matched or the error for this filter.
   879  //
   880  // All fields but Described and Err are set from the request input. The
   881  // response sets either Described (potentially to nothing if the filter matched
   882  // nothing) or Err.
   883  type DescribeACLsResult struct {
   884  	Principal *string // Principal is the optional user that was used in this filter.
   885  	Host      *string // Host is the optional host that was used in this filter.
   886  
   887  	Type       kmsg.ACLResourceType   // Type is the type of resource used for this filter.
   888  	Name       *string                // Name is the name of the resource used for this filter.
   889  	Pattern    ACLPattern             // Pattern is the name pattern used for this filter.
   890  	Operation  ACLOperation           // Operation is the operation used for this filter.
   891  	Permission kmsg.ACLPermissionType // Permission is permission used for this filter.
   892  
   893  	Described DescribedACLs // Described contains all ACLs this describe filter matched.
   894  
   895  	Err error // Err is non-nil if this filter has an error.
   896  }
   897  
   898  // DescribeACLsResults contains all results to described ACLs.
   899  type DescribeACLsResults []DescribeACLsResult
   900  
   901  // DescribeACLs describes a batch of ACLs using the ACL builder, validating the
   902  // input before issuing DescribeACLs requests.
   903  //
   904  // If the input is invalid, or if any response fails, this returns an error.
   905  //
   906  // Listing ACLs works on a filter basis: a single filter can match many ACLs.
   907  // For example, describing with operation ANY matches any operation. Under the
   908  // hood, this method issues one describe request per filter, because describing
   909  // ACLs does not work on a batch basis (unlike creating & deleting). The return
   910  // of this function can be used to see what would be deleted given the same
   911  // builder input.
   912  func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLsResults, error) {
   913  	_, descs, err := createDelDescACL(b)
   914  	if err != nil {
   915  		return nil, err
   916  	}
   917  
   918  	var (
   919  		ictx, cancel = context.WithCancel(ctx)
   920  		mu           sync.Mutex
   921  		wg           sync.WaitGroup
   922  		firstErr     error
   923  		resps        = make([]*kmsg.DescribeACLsResponse, len(descs))
   924  	)
   925  	defer cancel()
   926  	for i := range descs {
   927  		req := descs[i] // each req is unique per loop, we are not reusing req, this is safe
   928  		myIdx := i
   929  		wg.Add(1)
   930  		go func() {
   931  			defer wg.Done()
   932  			resp, err := req.RequestWith(ictx, cl.cl)
   933  			resps[myIdx] = resp
   934  			if err == nil {
   935  				return
   936  			}
   937  			cancel()
   938  			mu.Lock()
   939  			defer mu.Unlock()
   940  			if firstErr == nil { // keep the first err
   941  				firstErr = err
   942  			}
   943  		}()
   944  	}
   945  	wg.Wait()
   946  	if firstErr != nil {
   947  		return nil, firstErr
   948  	}
   949  
   950  	var rs DescribeACLsResults
   951  	for i, r := range resps {
   952  		f := descs[i]
   953  		var ds DescribedACLs
   954  		for _, resource := range r.Resources {
   955  			for _, acl := range resource.ACLs {
   956  				ds = append(ds, DescribedACL{
   957  					Principal:  acl.Principal,
   958  					Host:       acl.Host,
   959  					Type:       resource.ResourceType,
   960  					Name:       resource.ResourceName,
   961  					Pattern:    resource.ResourcePatternType,
   962  					Operation:  acl.Operation,
   963  					Permission: acl.PermissionType,
   964  				})
   965  			}
   966  		}
   967  		rs = append(rs, DescribeACLsResult{
   968  			Principal:  f.Principal,
   969  			Host:       f.Host,
   970  			Type:       f.ResourceType,
   971  			Name:       f.ResourceName,
   972  			Pattern:    f.ResourcePatternType,
   973  			Operation:  f.Operation,
   974  			Permission: f.PermissionType,
   975  			Described:  ds,
   976  			Err:        kerr.ErrorForCode(r.ErrorCode),
   977  		})
   978  	}
   979  	return rs, nil
   980  }
   981  
   982  var sliceAny = []string{"any"}
   983  
   984  func createDelDescACL(b *ACLBuilder) ([]kmsg.DeleteACLsRequestFilter, []*kmsg.DescribeACLsRequest, error) {
   985  	if err := b.ValidateFilter(); err != nil {
   986  		return nil, nil, err
   987  	}
   988  
   989  	// As a special shortcut, if we have any allow and deny principals and
   990  	// hosts, we collapse these into one "any" group. The anyAny and
   991  	// anyAnyHosts vars are used in our looping below, and if we do this,
   992  	// we dup and set all the relevant fields to false to not expand them
   993  	// in our loops.
   994  	var anyAny, anyAnyHosts bool
   995  	if b.anyAllow && b.anyDeny && b.anyAllowHosts && b.anyDenyHosts {
   996  		anyAny = true
   997  		anyAnyHosts = true
   998  
   999  		b = b.dup()
  1000  		b.allow = nil
  1001  		b.allowHosts = nil
  1002  		b.deny = nil
  1003  		b.denyHosts = nil
  1004  		b.anyAllow = false
  1005  		b.anyAllowHosts = false
  1006  		b.anyDeny = false
  1007  		b.anyDenyHosts = false
  1008  	}
  1009  
  1010  	var clusters []string
  1011  	if b.anyCluster {
  1012  		clusters = []string{"kafka-cluster"}
  1013  	}
  1014  	var deletions []kmsg.DeleteACLsRequestFilter
  1015  	var describes []*kmsg.DescribeACLsRequest
  1016  	for _, typeNames := range []struct {
  1017  		t     kmsg.ACLResourceType
  1018  		names []string
  1019  		any   bool
  1020  	}{
  1021  		{kmsg.ACLResourceTypeAny, b.any, b.anyResource},
  1022  		{kmsg.ACLResourceTypeTopic, b.topics, b.anyTopic},
  1023  		{kmsg.ACLResourceTypeGroup, b.groups, b.anyGroup},
  1024  		{kmsg.ACLResourceTypeCluster, clusters, b.anyCluster},
  1025  		{kmsg.ACLResourceTypeTransactionalId, b.txnIDs, b.anyTxn},
  1026  		{kmsg.ACLResourceTypeDelegationToken, b.tokens, b.anyToken},
  1027  	} {
  1028  		if typeNames.any {
  1029  			typeNames.names = sliceAny
  1030  		}
  1031  		for _, name := range typeNames.names {
  1032  			for _, op := range b.ops {
  1033  				for _, perm := range []struct {
  1034  					principals   []string
  1035  					anyPrincipal bool
  1036  					hosts        []string
  1037  					anyHost      bool
  1038  					permType     kmsg.ACLPermissionType
  1039  				}{
  1040  					{
  1041  						b.allow,
  1042  						b.anyAllow,
  1043  						b.allowHosts,
  1044  						b.anyAllowHosts,
  1045  						kmsg.ACLPermissionTypeAllow,
  1046  					},
  1047  					{
  1048  						b.deny,
  1049  						b.anyDeny,
  1050  						b.denyHosts,
  1051  						b.anyDenyHosts,
  1052  						kmsg.ACLPermissionTypeDeny,
  1053  					},
  1054  					{
  1055  						nil,
  1056  						anyAny,
  1057  						nil,
  1058  						anyAnyHosts,
  1059  						kmsg.ACLPermissionTypeAny,
  1060  					},
  1061  				} {
  1062  					if perm.anyPrincipal {
  1063  						perm.principals = sliceAny
  1064  					}
  1065  					if perm.anyHost {
  1066  						perm.hosts = sliceAny
  1067  					}
  1068  					for _, principal := range perm.principals {
  1069  						for _, host := range perm.hosts {
  1070  							deletion := kmsg.NewDeleteACLsRequestFilter()
  1071  							describe := kmsg.NewPtrDescribeACLsRequest()
  1072  
  1073  							deletion.ResourceType = typeNames.t
  1074  							describe.ResourceType = typeNames.t
  1075  
  1076  							if !typeNames.any {
  1077  								deletion.ResourceName = kmsg.StringPtr(name)
  1078  								describe.ResourceName = kmsg.StringPtr(name)
  1079  							}
  1080  
  1081  							deletion.ResourcePatternType = b.pattern
  1082  							describe.ResourcePatternType = b.pattern
  1083  
  1084  							deletion.Operation = op
  1085  							describe.Operation = op
  1086  
  1087  							if !perm.anyPrincipal {
  1088  								deletion.Principal = kmsg.StringPtr(principal)
  1089  								describe.Principal = kmsg.StringPtr(principal)
  1090  							}
  1091  
  1092  							if !perm.anyHost {
  1093  								deletion.Host = kmsg.StringPtr(host)
  1094  								describe.Host = kmsg.StringPtr(host)
  1095  							}
  1096  
  1097  							deletion.PermissionType = perm.permType
  1098  							describe.PermissionType = perm.permType
  1099  
  1100  							deletions = append(deletions, deletion)
  1101  							describes = append(describes, describe)
  1102  						}
  1103  					}
  1104  				}
  1105  			}
  1106  		}
  1107  	}
  1108  	return deletions, describes, nil
  1109  }
  1110  

View as plain text