1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package gcp
17
18 import (
19 "context"
20 "encoding/base64"
21 "fmt"
22 "os"
23 "regexp"
24 "sync"
25 "time"
26
27 "github.com/sigstore/rekor/pkg/events"
28 sigpubsub "github.com/sigstore/rekor/pkg/pubsub"
29
30 "cloud.google.com/go/pubsub"
31 "google.golang.org/api/option"
32 )
33
34 func init() {
35 sigpubsub.AddProvider(URIIdentifier, func(ctx context.Context, topicResourceID string) (sigpubsub.Publisher, error) {
36 return New(ctx, topicResourceID)
37 })
38 }
39
40 const URIIdentifier = "gcppubsub://"
41
42 var (
43
44 re = regexp.MustCompile(`^gcppubsub://projects/([^/]+)/topics/([^/]+)$`)
45
46
47 requiredIAMPermissions = []string{
48 "pubsub.topics.publish",
49 }
50 )
51
52 type Publisher struct {
53 client *pubsub.Client
54 topic string
55 wg *sync.WaitGroup
56 }
57
58 func New(ctx context.Context, topicResourceID string, opts ...option.ClientOption) (*Publisher, error) {
59 projectID, topic, err := parseRef(topicResourceID)
60 if err != nil {
61 return nil, fmt.Errorf("parse ref: %w", err)
62 }
63 client, err := pubsub.NewClient(ctx, projectID, opts...)
64 if err != nil {
65 return nil, fmt.Errorf("create pubsub client for project %q: %w", projectID, err)
66 }
67
68
69
70
71 if os.Getenv("PUBSUB_EMULATOR_HOST") == "" {
72 if _, err := client.Topic(topic).IAM().TestPermissions(ctx, requiredIAMPermissions); err != nil {
73 return nil, fmt.Errorf("insufficient permissions for topic %q: %w", topic, err)
74 }
75 }
76
77 return &Publisher{
78 client: client,
79 topic: topic,
80 wg: new(sync.WaitGroup),
81 }, nil
82 }
83
84 func (p *Publisher) Publish(ctx context.Context, event *events.Event, encoding events.EventContentType) error {
85 p.wg.Add(1)
86 defer p.wg.Done()
87
88 var data []byte
89 var err error
90 switch encoding {
91 case events.ContentTypeProtobuf:
92 data, err = event.MarshalProto()
93 case events.ContentTypeJSON:
94 data, err = event.MarshalJSON()
95 default:
96 err = fmt.Errorf("unsupported encoding: %s", encoding)
97 }
98 if err != nil {
99 return fmt.Errorf("marshal event: %w", err)
100 }
101
102 msg := &pubsub.Message{
103 Data: data,
104 Attributes: gcpAttrs(event, encoding),
105 }
106
107
108 res := p.client.Topic(p.topic).Publish(ctx, msg)
109
110
111 cctx, cancel := context.WithTimeout(ctx, pubsub.DefaultPublishSettings.Timeout)
112 defer cancel()
113
114
115 if _, err := res.Get(cctx); err != nil {
116 return fmt.Errorf("publish event %s to topic %q: %w", event.ID(), p.topic, err)
117 }
118 return nil
119 }
120
121 func (p *Publisher) Close() error {
122 p.wg.Wait()
123 return p.client.Close()
124 }
125
126 func parseRef(ref string) (projectID, topic string, err error) {
127 v := re.FindStringSubmatch(ref)
128 if len(v) != 3 {
129 err = fmt.Errorf("invalid gcppubsub format %q", ref)
130 return
131 }
132 projectID, topic = v[1], v[2]
133 return
134 }
135
136
137
138 func gcpAttrs(event *events.Event, dataType events.EventContentType) map[string]string {
139 attrs := map[string]string{
140 "source": event.Type().Source(),
141 "type": event.Type().Name(),
142 "datacontenttype": string(dataType),
143 }
144 for name, value := range event.Attributes() {
145 switch v := value.(type) {
146 case string:
147 attrs[name] = v
148 case time.Time:
149 attrs[name] = v.Format(time.RFC3339)
150 case []byte:
151 attrs[name] = base64.StdEncoding.EncodeToString(v)
152 default:
153 attrs[name] = fmt.Sprint(v)
154 }
155 }
156
157 return attrs
158 }
159
View as plain text