...

Source file src/github.com/sigstore/rekor/pkg/pubsub/gcp/publisher.go

Documentation: github.com/sigstore/rekor/pkg/pubsub/gcp

     1  // Copyright 2023 The Sigstore Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package gcp implements the pubsub.Publisher with Google Cloud Pub/Sub.
    16  package gcp
    17  
    18  import (
    19  	"context"
    20  	"encoding/base64"
    21  	"fmt"
    22  	"os"
    23  	"regexp"
    24  	"sync"
    25  	"time"
    26  
    27  	"github.com/sigstore/rekor/pkg/events"
    28  	sigpubsub "github.com/sigstore/rekor/pkg/pubsub"
    29  
    30  	"cloud.google.com/go/pubsub"
    31  	"google.golang.org/api/option"
    32  )
    33  
    34  func init() {
    35  	sigpubsub.AddProvider(URIIdentifier, func(ctx context.Context, topicResourceID string) (sigpubsub.Publisher, error) {
    36  		return New(ctx, topicResourceID)
    37  	})
    38  }
    39  
    40  const URIIdentifier = "gcppubsub://"
    41  
    42  var (
    43  	// Copied from https://github.com/google/go-cloud/blob/master/pubsub/gcppubsub/gcppubsub.go
    44  	re = regexp.MustCompile(`^gcppubsub://projects/([^/]+)/topics/([^/]+)$`)
    45  	// Minimal set of permissions needed to check if the server can publish to the configured topic.
    46  	// https://cloud.google.com/pubsub/docs/access-control#required_permissions
    47  	requiredIAMPermissions = []string{
    48  		"pubsub.topics.publish",
    49  	}
    50  )
    51  
    52  type Publisher struct {
    53  	client *pubsub.Client
    54  	topic  string
    55  	wg     *sync.WaitGroup
    56  }
    57  
    58  func New(ctx context.Context, topicResourceID string, opts ...option.ClientOption) (*Publisher, error) {
    59  	projectID, topic, err := parseRef(topicResourceID)
    60  	if err != nil {
    61  		return nil, fmt.Errorf("parse ref: %w", err)
    62  	}
    63  	client, err := pubsub.NewClient(ctx, projectID, opts...)
    64  	if err != nil {
    65  		return nil, fmt.Errorf("create pubsub client for project %q: %w", projectID, err)
    66  	}
    67  
    68  	// The PubSub emulator does not support IAM methods, and will block the
    69  	// server start up if they are called. If the environment variable is set,
    70  	// skip this check.
    71  	if os.Getenv("PUBSUB_EMULATOR_HOST") == "" {
    72  		if _, err := client.Topic(topic).IAM().TestPermissions(ctx, requiredIAMPermissions); err != nil {
    73  			return nil, fmt.Errorf("insufficient permissions for topic %q: %w", topic, err)
    74  		}
    75  	}
    76  
    77  	return &Publisher{
    78  		client: client,
    79  		topic:  topic,
    80  		wg:     new(sync.WaitGroup),
    81  	}, nil
    82  }
    83  
    84  func (p *Publisher) Publish(ctx context.Context, event *events.Event, encoding events.EventContentType) error {
    85  	p.wg.Add(1)
    86  	defer p.wg.Done()
    87  
    88  	var data []byte
    89  	var err error
    90  	switch encoding {
    91  	case events.ContentTypeProtobuf:
    92  		data, err = event.MarshalProto()
    93  	case events.ContentTypeJSON:
    94  		data, err = event.MarshalJSON()
    95  	default:
    96  		err = fmt.Errorf("unsupported encoding: %s", encoding)
    97  	}
    98  	if err != nil {
    99  		return fmt.Errorf("marshal event: %w", err)
   100  	}
   101  
   102  	msg := &pubsub.Message{
   103  		Data:       data,
   104  		Attributes: gcpAttrs(event, encoding),
   105  	}
   106  
   107  	// The Publish call does not block.
   108  	res := p.client.Topic(p.topic).Publish(ctx, msg)
   109  
   110  	// TODO: Consider making the timeout configurable.
   111  	cctx, cancel := context.WithTimeout(ctx, pubsub.DefaultPublishSettings.Timeout)
   112  	defer cancel()
   113  
   114  	// This Get call blocks until a response occurs, or the deadline is reached.
   115  	if _, err := res.Get(cctx); err != nil {
   116  		return fmt.Errorf("publish event %s to topic %q: %w", event.ID(), p.topic, err)
   117  	}
   118  	return nil
   119  }
   120  
   121  func (p *Publisher) Close() error {
   122  	p.wg.Wait()
   123  	return p.client.Close()
   124  }
   125  
   126  func parseRef(ref string) (projectID, topic string, err error) {
   127  	v := re.FindStringSubmatch(ref)
   128  	if len(v) != 3 {
   129  		err = fmt.Errorf("invalid gcppubsub format %q", ref)
   130  		return
   131  	}
   132  	projectID, topic = v[1], v[2]
   133  	return
   134  }
   135  
   136  // GCP Pub/Sub attributes can be used to filter events server-side, reducing
   137  // the processing for the client and reducing GCP costs for egress fees.
   138  func gcpAttrs(event *events.Event, dataType events.EventContentType) map[string]string {
   139  	attrs := map[string]string{
   140  		"source":          event.Type().Source(),
   141  		"type":            event.Type().Name(),
   142  		"datacontenttype": string(dataType),
   143  	}
   144  	for name, value := range event.Attributes() {
   145  		switch v := value.(type) {
   146  		case string:
   147  			attrs[name] = v
   148  		case time.Time:
   149  			attrs[name] = v.Format(time.RFC3339)
   150  		case []byte:
   151  			attrs[name] = base64.StdEncoding.EncodeToString(v)
   152  		default:
   153  			attrs[name] = fmt.Sprint(v)
   154  		}
   155  	}
   156  
   157  	return attrs
   158  }
   159  

View as plain text