
Source file src/github.com/twmb/franz-go/pkg/sasl/aws/aws.go

Documentation: github.com/twmb/franz-go/pkg/sasl/aws

     1  // Package aws provides AWS_MSK_IAM sasl authentication as specified in the
     2  // Java source.
     3  //
     4  // The Java source can be found at https://github.com/aws/aws-msk-iam-auth.
     5  package aws
     7  import (
     8  	"context"
     9  	"crypto/hmac"
    10  	"crypto/sha256"
    11  	"encoding/hex"
    12  	"encoding/json"
    13  	"errors"
    14  	"fmt"
    15  	"net"
    16  	"net/url"
    17  	"os"
    18  	"runtime"
    19  	"strings"
    20  	"time"
    22  	"github.com/twmb/franz-go/pkg/sasl"
    23  )
    25  // Auth contains an AWS AccessKey and SecretKey for authentication.
    26  //
    27  // This client may add fields to this struct in the future if Kafka adds more
    28  // capabilities to MSK IAM.
    29  type Auth struct {
    30  	// AccessKey is an AWS AccessKey.
    31  	AccessKey string
    33  	// AccessKey is an AWS SecretKey.
    34  	SecretKey string
    36  	// SessionToken, if non-empty, is a session / security token to use for
    37  	// authentication.
    38  	//
    39  	// See the following link for more details:
    40  	//
    41  	//     https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
    42  	//
    43  	SessionToken string
    45  	// UserAgent is the user agent to for the client to use when connecting
    46  	// to Kafka, overriding the default "franz-go/<runtime.Version()>/<hostname>".
    47  	//
    48  	// Setting a UserAgent allows authorizing based on the aws:UserAgent
    49  	// condition key; see the following link for more details:
    50  	//
    51  	//     https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent
    52  	//
    53  	UserAgent string
    55  	_ struct{} // require explicit field initialization
    56  }
    58  var hostname, _ = os.Hostname()
    60  func init() {
    61  	if hostname == "" {
    62  		hostname = "unknown"
    63  	}
    64  }
    66  // AsManagedStreamingIAMMechanism returns a sasl mechanism that will use 'a' as
    67  // credentials for all sasl sessions.
    68  //
    69  // This is a shortcut for using the ManagedStreamingIAM function and is useful
    70  // when you do not need to live-rotate credentials.
    71  func (a Auth) AsManagedStreamingIAMMechanism() sasl.Mechanism {
    72  	return ManagedStreamingIAM(func(context.Context) (Auth, error) {
    73  		return a, nil
    74  	})
    75  }
    77  type mskiam func(context.Context) (Auth, error)
    79  // ManagedStreamingIAM returns a sasl mechanism that will call authFn whenever
    80  // sasl authentication is needed. The returned Auth is used for a single
    81  // session.
    82  func ManagedStreamingIAM(authFn func(context.Context) (Auth, error)) sasl.Mechanism {
    83  	return mskiam(authFn)
    84  }
    86  func (mskiam) Name() string { return "AWS_MSK_IAM" }
    88  func (fn mskiam) Authenticate(ctx context.Context, host string) (sasl.Session, []byte, error) {
    89  	auth, err := fn(ctx)
    90  	if err != nil {
    91  		return nil, nil, err
    92  	}
    94  	challenge, err := challenge(auth, host)
    95  	if err != nil {
    96  		return nil, nil, err
    97  	}
    99  	return new(session), challenge, nil
   100  }
   102  type session struct{}
   104  func (session) Challenge(resp []byte) (bool, []byte, error) {
   105  	if len(resp) == 0 {
   106  		return false, nil, errors.New("empty challenge response: failed")
   107  	}
   108  	return true, nil, nil
   109  }
   111  const service = "kafka-cluster"
   113  func challenge(auth Auth, host string) ([]byte, error) {
   114  	host, _, err := net.SplitHostPort(host) // we do not need the port
   115  	if err != nil {
   116  		return nil, err
   117  	}
   118  	region, err := identifyRegion(host)
   119  	if err != nil {
   120  		return nil, err
   121  	}
   123  	var (
   124  		timestamp = time.Now().UTC().Format("20060102T150405Z")
   125  		date      = timestamp[:8] // 20060102
   126  		scope     = scope(date, region)
   127  		v         = make(url.Values)
   128  	)
   130  	v.Set("Action", service+":Connect")
   131  	v.Set("X-Amz-Algorithm", "AWS4-HMAC-SHA256")
   132  	v.Set("X-Amz-Credential", auth.AccessKey+"/"+scope)
   133  	v.Set("X-Amz-Date", timestamp)
   134  	v.Set("X-Amz-Expires", "300") // 5 min
   135  	v.Set("X-Amz-SignedHeaders", "host")
   136  	if auth.SessionToken != "" {
   137  		v.Set("X-Amz-Security-Token", auth.SessionToken)
   138  	}
   140  	qps := strings.ReplaceAll(v.Encode(), "+", "%20")
   142  	canonicalRequest := task1(host, qps)
   143  	sts := task2(timestamp, scope, canonicalRequest)
   144  	signature := task3(auth.SecretKey, region, date, sts)
   146  	v.Set("X-Amz-Signature", signature) // task4
   148  	// According to the Java source and manual testing, all values in our
   149  	// challenge map must be lowercased, and we MUST have host, and we MUST
   150  	// have version, and version MUST be 2020_10_22.
   151  	keyvals := make(map[string]string)
   152  	for key, values := range v {
   153  		keyvals[strings.ToLower(key)] = values[0]
   154  	}
   155  	keyvals["host"] = host
   156  	keyvals["version"] = "2020_10_22"
   157  	ua := auth.UserAgent
   158  	if ua == "" {
   159  		ua = strings.Join([]string{"franz-go", runtime.Version(), hostname}, "/")
   160  	}
   161  	keyvals["user-agent"] = ua
   163  	marshaled, err := json.Marshal(keyvals)
   164  	if err != nil {
   165  		return nil, err
   166  	}
   167  	return marshaled, nil
   168  }
   170  // https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
   171  // "CredentialScope", Part 3
   172  func scope(date, region string) string {
   173  	return strings.Join([]string{date, region, service, "aws4_request"}, "/")
   174  }
   176  // https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
   177  func task1(host, qps string) []byte {
   178  	// We start with our defined method, "GET", and the defined empty path,
   179  	// "/". For query parameters, we have to escape +'s with %20, but we did
   180  	// that above when building our URL.
   181  	//
   182  	//   HTTPRequestMethod + '\n' +
   183  	//   CanonicalURI + '\n' +
   184  	//   CanonicalQueryString + '\n' +
   185  	canon := make([]byte, 0, 200)
   186  	canon = append(canon, "GET\n"...)
   187  	canon = append(canon, "/\n"...)
   188  	canon = append(canon, qps...)
   189  	canon = append(canon, '\n')
   191  	// We only sign one header, the host. Each signed header is followed by
   192  	// a newline, and then the canonical header block is followed itself by
   193  	// a newline.
   194  	//
   195  	//   CanonicalHeaders + '\n' +
   196  	//   SignedHeaders + '\n' +
   197  	canon = append(canon, "host:"...)
   198  	canon = append(canon, host...)
   199  	canon = append(canon, "\n\nhost\n"...)
   201  	// Finally, we add our empty body.
   202  	//
   203  	//   HexEncode(Hash(RequestPayload))
   204  	const emptyBody = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
   205  	return append(canon, emptyBody...)
   206  }
   208  // https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
   209  func task2(timestamp, scope string, canonicalRequest []byte) []byte {
   210  	toSign := make([]byte, 0, 512)
   211  	toSign = append(toSign, "AWS4-HMAC-SHA256\n"...)
   212  	toSign = append(toSign, timestamp...)
   213  	toSign = append(toSign, '\n')
   214  	toSign = append(toSign, scope...)
   215  	toSign = append(toSign, '\n')
   216  	canonHash := sha256.Sum256(canonicalRequest)
   217  	hexBuf := make([]byte, 64) // 32 bytes to 64
   218  	hex.Encode(hexBuf, canonHash[:])
   219  	toSign = append(toSign, hexBuf...)
   220  	return toSign
   221  }
   223  var aws4requestBytes = []byte("aws4_request")
   225  // https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
   226  func task3(secretKey, region, date string, sts []byte) string {
   227  	key := make([]byte, 0, 100)
   228  	key = append(key, "AWS4"...)
   229  	key = append(key, secretKey...)
   231  	h := hmac.New(sha256.New, key)
   232  	h.Write([]byte(date)) // kDate
   234  	key = h.Sum(key[:0])
   235  	h = hmac.New(sha256.New, key)
   236  	h.Write([]byte(region)) // kRegion
   238  	key = h.Sum(key[:0])
   239  	h = hmac.New(sha256.New, key)
   240  	h.Write([]byte(service)) // kService
   242  	key = h.Sum(key[:0])
   243  	h = hmac.New(sha256.New, key)
   244  	h.Write(aws4requestBytes) // kSigning
   246  	key = h.Sum(key[:0])
   247  	h = hmac.New(sha256.New, key)
   248  	h.Write(sts)
   250  	return hex.EncodeToString(h.Sum(key[:0]))
   251  }
   253  // aws-java-sdk-core/src/main/resources/com/amazonaws/partitions/endpoints.json
   254  var suffixes = []string{
   255  	".amazonaws.com",
   256  	".amazonaws.com.cn",
   257  	".c2s.ic.gov",
   258  	".sc2s.sgov.gov",
   259  }
   261  // aws-java-sdk-core/src/main/java/com/amazonaws/partitions/PartitionMetadataProvider.java
   262  // tryGetRegionByEndpointDnsSuffix
   263  func identifyRegion(host string) (string, error) {
   264  	for _, suffix := range suffixes {
   265  		if strings.HasSuffix(host, suffix) {
   266  			serviceRegion := strings.TrimSuffix(host, suffix)
   267  			regionDot := strings.LastIndexByte(serviceRegion, '.')
   268  			if regionDot == -1 {
   269  				break
   270  			}
   271  			return serviceRegion[regionDot+1:], nil
   272  		}
   273  	}
   274  	return "", fmt.Errorf("cannot determine the region in %+q", host)
   275  }

View as plain text