1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "bufio"
19 "bytes"
20 "context"
21 "errors"
22 "fmt"
23 "io/ioutil"
24 "os"
25 "strings"
26 "sync"
27 "sync/atomic"
28 "testing"
29 "time"
30
31 "cloud.google.com/go/iam"
32 "cloud.google.com/go/internal"
33 "cloud.google.com/go/internal/testutil"
34 "cloud.google.com/go/internal/uid"
35 "cloud.google.com/go/internal/version"
36 kms "cloud.google.com/go/kms/apiv1"
37 "cloud.google.com/go/kms/apiv1/kmspb"
38 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
39 testutil2 "cloud.google.com/go/pubsub/internal/testutil"
40 "github.com/google/go-cmp/cmp"
41 "github.com/google/go-cmp/cmp/cmpopts"
42 gax "github.com/googleapis/gax-go/v2"
43 "golang.org/x/oauth2/google"
44 "google.golang.org/api/iterator"
45 "google.golang.org/api/option"
46 "google.golang.org/grpc"
47 "google.golang.org/grpc/codes"
48 "google.golang.org/grpc/metadata"
49 "google.golang.org/grpc/status"
50 "google.golang.org/protobuf/encoding/protowire"
51 "google.golang.org/protobuf/proto"
52 )
53
54 var (
55 topicIDs = uid.NewSpace("topic", nil)
56 subIDs = uid.NewSpace("sub", nil)
57 schemaIDs = uid.NewSpace("schema", nil)
58 )
59
60
61
62 type messageData struct {
63 ID string
64 Data string
65 Attributes map[string]string
66 }
67
68 func extractMessageData(m *Message) messageData {
69 return messageData{
70 ID: m.ID,
71 Data: string(m.Data),
72 Attributes: m.Attributes,
73 }
74 }
75
76 func withGRPCHeadersAssertion(t *testing.T, opts ...option.ClientOption) []option.ClientOption {
77 grpcHeadersEnforcer := &testutil.HeadersEnforcer{
78 OnFailure: t.Errorf,
79 Checkers: []*testutil.HeaderChecker{
80 testutil.XGoogClientHeaderChecker,
81 },
82 }
83 return append(grpcHeadersEnforcer.CallOptions(), opts...)
84 }
85
86 func integrationTestClient(ctx context.Context, t *testing.T, opts ...option.ClientOption) *Client {
87 if testing.Short() {
88 t.Skip("Integration tests skipped in short mode")
89 }
90 projID := testutil.ProjID()
91 if projID == "" {
92 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
93 }
94 ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
95 if ts == nil {
96 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
97 }
98 opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...)
99 client, err := NewClient(ctx, projID, opts...)
100 if err != nil {
101 t.Fatalf("Creating client error: %v", err)
102 }
103 return client
104 }
105
106 func integrationTestSchemaClient(ctx context.Context, t *testing.T, opts ...option.ClientOption) *SchemaClient {
107 if testing.Short() {
108 t.Skip("Integration tests skipped in short mode")
109 }
110 projID := testutil.ProjID()
111 if projID == "" {
112 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
113 }
114 ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
115 if ts == nil {
116 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
117 }
118 opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...)
119 sc, err := NewSchemaClient(ctx, projID, opts...)
120 if err != nil {
121 t.Fatalf("Creating client error: %v", err)
122 }
123 return sc
124 }
125
126 func TestIntegration_Admin(t *testing.T) {
127 t.Parallel()
128 ctx := context.Background()
129 client := integrationTestClient(ctx, t)
130 defer client.Close()
131
132 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
133 if err != nil {
134 t.Errorf("CreateTopic error: %v", err)
135 }
136 defer topic.Stop()
137 exists, err := topic.Exists(ctx)
138 if err != nil {
139 t.Fatalf("TopicExists error: %v", err)
140 }
141 if !exists {
142 t.Errorf("topic %v should exist, but it doesn't", topic)
143 }
144
145 var sub *Subscription
146 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
147 t.Errorf("CreateSub error: %v", err)
148 }
149 exists, err = sub.Exists(ctx)
150 if err != nil {
151 t.Fatalf("SubExists error: %v", err)
152 }
153 if !exists {
154 t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
155 }
156
157 if msg, ok := testIAM(ctx, topic.IAM(), "pubsub.topics.get"); !ok {
158 t.Errorf("topic IAM: %s", msg)
159 }
160 if msg, ok := testIAM(ctx, sub.IAM(), "pubsub.subscriptions.get"); !ok {
161 t.Errorf("sub IAM: %s", msg)
162 }
163
164 snap, err := sub.CreateSnapshot(ctx, "")
165 if err != nil {
166 t.Fatalf("CreateSnapshot error: %v", err)
167 }
168
169 labels := map[string]string{"foo": "bar"}
170 sc, err := snap.SetLabels(ctx, labels)
171 if err != nil {
172 t.Fatalf("Snapshot.SetLabels error: %v", err)
173 }
174 if diff := testutil.Diff(sc.Labels, labels); diff != "" {
175 t.Fatalf("\ngot: - want: +\n%s", diff)
176 }
177
178 timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute)
179 defer cancel()
180 err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
181 snapIt := client.Snapshots(timeoutCtx)
182 for {
183 s, err := snapIt.Next()
184 if err == nil && s.name == snap.name {
185 return true, nil
186 }
187 if err == iterator.Done {
188 return false, fmt.Errorf("cannot find snapshot: %q", snap.name)
189 }
190 if err != nil {
191 return false, err
192 }
193 }
194 })
195 if err != nil {
196 t.Error(err)
197 }
198
199 err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
200 err := sub.SeekToSnapshot(timeoutCtx, snap.Snapshot)
201 return err == nil, err
202 })
203 if err != nil {
204 t.Error(err)
205 }
206
207 err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
208 err := sub.SeekToTime(timeoutCtx, time.Now())
209 return err == nil, err
210 })
211 if err != nil {
212 t.Error(err)
213 }
214
215 err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
216 snapHandle := client.Snapshot(snap.ID())
217 err := snapHandle.Delete(timeoutCtx)
218 return err == nil, err
219 })
220 if err != nil {
221 t.Error(err)
222 }
223
224 if err := sub.Delete(ctx); err != nil {
225 t.Errorf("DeleteSub error: %v", err)
226 }
227
228 if err := topic.Delete(ctx); err != nil {
229 t.Errorf("DeleteTopic error: %v", err)
230 }
231 }
232
233 func TestIntegration_PublishReceive(t *testing.T) {
234 ctx := context.Background()
235 client := integrationTestClient(ctx, t)
236
237 for _, sync := range []bool{false, true} {
238 for _, maxMsgs := range []int{0, 3, -1} {
239 testPublishAndReceive(t, client, maxMsgs, sync, false, 10, 0)
240 }
241
242
243 testPublishAndReceive(t, client, 0, sync, false, 1, 5*1024*1024)
244 }
245 }
246
247
248
249
250 func withGoogleClientInfo(ctx context.Context) context.Context {
251 ctxMD, _ := metadata.FromOutgoingContext(ctx)
252 kv := []string{
253 "gl-go",
254 version.Go(),
255 "gax",
256 gax.Version,
257 "grpc",
258 grpc.Version,
259 }
260
261 allMDs := append([]metadata.MD{ctxMD}, metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)))
262 return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...))
263 }
264
265 func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) {
266 t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) {
267 t.Parallel()
268 testutil.Retry(t, 3, 10*time.Second, func(r *testutil.R) {
269 ctx := context.Background()
270 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
271 if err != nil {
272 r.Errorf("CreateTopic error: %v", err)
273 }
274 defer topic.Delete(ctx)
275 defer topic.Stop()
276 exists, err := topic.Exists(ctx)
277 if err != nil {
278 r.Errorf("TopicExists error: %v", err)
279 }
280 if !exists {
281 r.Errorf("topic %v should exist, but it doesn't", topic)
282 }
283
284 sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
285 Topic: topic,
286 EnableExactlyOnceDelivery: exactlyOnceDelivery,
287 })
288 if err != nil {
289 r.Errorf("CreateSub error: %v", err)
290 }
291 defer sub.Delete(ctx)
292 exists, err = sub.Exists(ctx)
293 if err != nil {
294 r.Errorf("SubExists error: %v", err)
295 }
296 if !exists {
297 r.Errorf("subscription %s should exist, but it doesn't", sub.ID())
298 }
299 var msgs []*Message
300 for i := 0; i < numMsgs; i++ {
301 text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
302 attrs := make(map[string]string)
303 attrs["foo"] = "bar"
304 msgs = append(msgs, &Message{
305 Data: []byte(text),
306 Attributes: attrs,
307 })
308 }
309
310
311 type pubResult struct {
312 m *Message
313 r *PublishResult
314 }
315 var rs []pubResult
316 for _, m := range msgs {
317 r := topic.Publish(ctx, m)
318 rs = append(rs, pubResult{m, r})
319 }
320 want := make(map[string]messageData)
321 for _, res := range rs {
322 id, err := res.r.Get(ctx)
323 if err != nil {
324 r.Errorf("r.Get: %v", err)
325 }
326 md := extractMessageData(res.m)
327 md.ID = id
328 want[md.ID] = md
329 }
330
331 sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
332 sub.ReceiveSettings.Synchronous = synchronous
333
334
335
336 now := time.Now()
337 timeout := 3 * time.Minute
338 timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
339 defer cancel()
340 gotMsgs, err := pullN(timeoutCtx, sub, len(want), 0, func(ctx context.Context, m *Message) {
341 m.Ack()
342 })
343 if err != nil {
344 if c := status.Convert(err); c.Code() == codes.Canceled {
345 if time.Since(now) >= timeout {
346 r.Errorf("pullN took longer than %v", timeout)
347 }
348 } else {
349 r.Errorf("Pull: %v", err)
350 }
351 }
352 got := make(map[string]messageData)
353 for _, m := range gotMsgs {
354 md := extractMessageData(m)
355 got[md.ID] = md
356 }
357 if !testutil.Equal(got, want) {
358 r.Errorf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
359 maxMsgs, synchronous, got, want)
360 }
361 })
362 })
363 }
364
365
366
367
368
369
370 func testIAM(ctx context.Context, h *iam.Handle, permission string) (msg string, ok bool) {
371
372
373
374 ctx = withGoogleClientInfo(ctx)
375
376
377
378 const member = "domain:google.com"
379
380 var policy *iam.Policy
381 var err error
382
383 if policy, err = h.Policy(ctx); err != nil {
384 return fmt.Sprintf("Policy: %v", err), false
385 }
386
387 if got := policy.Roles(); len(got) > 0 {
388 return fmt.Sprintf("initially: got roles %v, want none", got), false
389 }
390
391 policy.Add(member, iam.Viewer)
392 if err := h.SetPolicy(ctx, policy); err != nil {
393 return fmt.Sprintf("SetPolicy: %v", err), false
394 }
395 if policy, err = h.Policy(ctx); err != nil {
396 return fmt.Sprintf("Policy: %v", err), false
397 }
398 if got, want := policy.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
399 return fmt.Sprintf("after Add: got %v, want %v", got, want), false
400 }
401
402 policy.Remove(member, iam.Viewer)
403 if err := h.SetPolicy(ctx, policy); err != nil {
404 return fmt.Sprintf("SetPolicy: %v", err), false
405 }
406 if policy, err = h.Policy(ctx); err != nil {
407 return fmt.Sprintf("Policy: %v", err), false
408 }
409 if got := policy.Roles(); len(got) > 0 {
410 return fmt.Sprintf("after Remove: got roles %v, want none", got), false
411 }
412
413
414
415
416
417 wantPerms := []string{permission}
418 gotPerms, err := h.TestPermissions(ctx, wantPerms)
419 if err != nil {
420 return fmt.Sprintf("TestPermissions: %v", err), false
421 }
422 if !testutil.Equal(gotPerms, wantPerms) {
423 return fmt.Sprintf("TestPermissions: got %v, want %v", gotPerms, wantPerms), false
424 }
425 return "", true
426 }
427
428 func TestIntegration_LargePublishSize(t *testing.T) {
429 ctx := context.Background()
430 client := integrationTestClient(ctx, t)
431 defer client.Close()
432
433 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
434 if err != nil {
435 t.Fatalf("CreateTopic error: %v", err)
436 }
437 defer topic.Delete(ctx)
438 defer topic.Stop()
439
440
441
442 length := MaxPublishRequestBytes - calcFieldSizeString(topic.String())
443
444
445 pbMsgOverhead := 1 + protowire.SizeVarint(uint64(length))
446 dataOverhead := 1 + protowire.SizeVarint(uint64(length-pbMsgOverhead))
447 maxLengthSingleMessage := length - pbMsgOverhead - dataOverhead
448
449 publishReq := &pb.PublishRequest{
450 Topic: topic.String(),
451 Messages: []*pb.PubsubMessage{
452 {
453 Data: bytes.Repeat([]byte{'A'}, maxLengthSingleMessage),
454 },
455 },
456 }
457
458 if got := proto.Size(publishReq); got != MaxPublishRequestBytes {
459 t.Fatalf("Created request size of %d bytes,\nwant %f bytes", got, MaxPublishRequestBytes)
460 }
461
462
463 msg := &Message{
464 Data: bytes.Repeat([]byte{'A'}, maxLengthSingleMessage),
465 }
466 topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError
467 r := topic.Publish(ctx, msg)
468 if _, err := r.Get(ctx); err != nil {
469 t.Fatalf("Failed to publish max length message: %v", err)
470 }
471
472
473
474 smallMsg := &Message{
475 Data: []byte{'A'},
476 }
477 topic.Publish(ctx, smallMsg)
478 r = topic.Publish(ctx, msg)
479 if _, err := r.Get(ctx); err != nil {
480 t.Fatalf("Failed to publish max length message after a small message: %v", err)
481 }
482
483
484
485 msg.Data = append(msg.Data, 'A')
486 r = topic.Publish(ctx, msg)
487 if _, err := r.Get(ctx); err != ErrOversizedMessage {
488 t.Fatalf("Should throw item size too large error, got %v", err)
489 }
490 }
491
492 func TestIntegration_CancelReceive(t *testing.T) {
493 t.Parallel()
494 ctx := context.Background()
495 client := integrationTestClient(ctx, t)
496 defer client.Close()
497
498 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
499 if err != nil {
500 t.Errorf("failed to create topic: %v", err)
501 }
502 defer topic.Delete(ctx)
503 defer topic.Stop()
504
505 var sub *Subscription
506 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
507 t.Fatalf("failed to create subscription: %v", err)
508 }
509 defer sub.Delete(ctx)
510
511 ctx, cancel := context.WithCancel(context.Background())
512 sub.ReceiveSettings.MaxOutstandingMessages = -1
513 sub.ReceiveSettings.MaxOutstandingBytes = -1
514 sub.ReceiveSettings.NumGoroutines = 1
515
516 doneReceiving := make(chan struct{})
517
518
519 go func() {
520 for {
521 select {
522 case <-doneReceiving:
523 return
524 default:
525 topic.Publish(ctx, &Message{Data: []byte("some msg")})
526 time.Sleep(time.Second)
527 }
528 }
529 }()
530
531 go func() {
532 err = sub.Receive(ctx, func(_ context.Context, msg *Message) {
533 cancel()
534 time.AfterFunc(5*time.Second, msg.Ack)
535 })
536 close(doneReceiving)
537 }()
538
539 select {
540 case <-time.After(60 * time.Second):
541 t.Fatalf("Waited 60 seconds for Receive to finish, should have finished sooner")
542 case <-doneReceiving:
543 }
544 }
545
546 func TestIntegration_CreateSubscription_NeverExpire(t *testing.T) {
547 t.Parallel()
548 ctx := context.Background()
549 client := integrationTestClient(ctx, t)
550 defer client.Close()
551
552 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
553 if err != nil {
554 t.Fatalf("CreateTopic error: %v", err)
555 }
556 defer topic.Delete(ctx)
557 defer topic.Stop()
558
559 cfg := SubscriptionConfig{
560 Topic: topic,
561 ExpirationPolicy: time.Duration(0),
562 }
563 var sub *Subscription
564 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
565 t.Fatalf("CreateSub error: %v", err)
566 }
567 defer sub.Delete(ctx)
568
569 got, err := sub.Config(ctx)
570 if err != nil {
571 t.Fatal(err)
572 }
573 want := time.Duration(0)
574 if got.ExpirationPolicy != want {
575 t.Fatalf("config.ExpirationPolicy mismatch, got: %v, want: %v\n", got.ExpirationPolicy, want)
576 }
577 }
578
579
580
581
582
583 func findServiceAccountEmail(ctx context.Context, t *testing.T) string {
584 jwtConf, err := testutil.JWTConfig()
585 if err == nil && jwtConf != nil {
586 return jwtConf.Email
587 }
588 creds := testutil.Credentials(ctx, ScopePubSub, ScopeCloudPlatform)
589 if creds == nil {
590 t.Fatal("Failed to retrieve credentials")
591 }
592 if len(creds.JSON) == 0 {
593 t.Skip("No JWTConfig JSON was present so can't get serviceAccountEmail")
594 }
595 jwtConf, err = google.JWTConfigFromJSON(creds.JSON)
596 if err != nil {
597 if strings.Contains(err.Error(), "authorized_user") {
598 t.Skip("Found ADC user so can't get serviceAccountEmail")
599 }
600 t.Fatalf("Failed to parse Google JWTConfig from JSON: %v", err)
601 }
602 return jwtConf.Email
603 }
604
605 func TestIntegration_UpdateSubscription(t *testing.T) {
606 t.Parallel()
607 ctx := context.Background()
608
609 client := integrationTestClient(ctx, t)
610 defer client.Close()
611
612 serviceAccountEmail := findServiceAccountEmail(ctx, t)
613
614 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
615 if err != nil {
616 t.Fatalf("CreateTopic error: %v", err)
617 }
618 defer topic.Delete(ctx)
619 defer topic.Stop()
620
621 var sub *Subscription
622 projID := testutil.ProjID()
623 sCfg := SubscriptionConfig{
624 Topic: topic,
625 PushConfig: PushConfig{
626 Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
627 AuthenticationMethod: &OIDCToken{
628 Audience: "client-12345",
629 ServiceAccountEmail: serviceAccountEmail,
630 },
631 },
632 }
633 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), sCfg); err != nil {
634 t.Fatalf("CreateSub error: %v", err)
635 }
636 defer sub.Delete(ctx)
637
638 got, err := sub.Config(ctx)
639 if err != nil {
640 t.Fatal(err)
641 }
642 want := SubscriptionConfig{
643 Topic: topic,
644 AckDeadline: 10 * time.Second,
645 RetainAckedMessages: false,
646 RetentionDuration: defaultRetentionDuration,
647 ExpirationPolicy: defaultExpirationPolicy,
648 PushConfig: PushConfig{
649 Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
650 AuthenticationMethod: &OIDCToken{
651 Audience: "client-12345",
652 ServiceAccountEmail: serviceAccountEmail,
653 },
654 },
655 State: SubscriptionStateActive,
656 }
657 opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
658 if diff := testutil.Diff(got, want, opt); diff != "" {
659 t.Fatalf("\ngot: - want: +\n%s", diff)
660 }
661
662 pc := PushConfig{
663 Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
664 Attributes: map[string]string{"x-goog-version": "v1"},
665 AuthenticationMethod: &OIDCToken{
666 Audience: "client-updated-54321",
667 ServiceAccountEmail: serviceAccountEmail,
668 },
669 }
670 got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
671 PushConfig: &pc,
672 AckDeadline: 2 * time.Minute,
673 RetainAckedMessages: true,
674 RetentionDuration: 2 * time.Hour,
675 Labels: map[string]string{"label": "value"},
676 ExpirationPolicy: 25 * time.Hour,
677 })
678 if err != nil {
679 t.Fatal(err)
680 }
681 want = SubscriptionConfig{
682 Topic: topic,
683 PushConfig: pc,
684 AckDeadline: 2 * time.Minute,
685 RetainAckedMessages: true,
686 RetentionDuration: 2 * time.Hour,
687 Labels: map[string]string{"label": "value"},
688 ExpirationPolicy: 25 * time.Hour,
689 State: SubscriptionStateActive,
690 }
691
692 if !testutil.Equal(got, want, opt) {
693 t.Fatalf("\ngot %+v\nwant %+v", got, want)
694 }
695
696
697 got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
698 ExpirationPolicy: time.Duration(0),
699 })
700 if err != nil {
701 t.Fatal(err)
702 }
703 want.ExpirationPolicy = time.Duration(0)
704
705 if !testutil.Equal(got, want, opt) {
706 t.Fatalf("\ngot %+v\nwant %+v", got, want)
707 }
708
709
710
711 pc = PushConfig{}
712 got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
713 PushConfig: &pc,
714 AckDeadline: 30 * time.Second,
715 Labels: map[string]string{},
716 })
717 if err != nil {
718 t.Fatal(err)
719 }
720 want.PushConfig = pc
721 want.AckDeadline = 30 * time.Second
722 want.Labels = nil
723
724
725 want.PushConfig.Attributes = map[string]string{"x-goog-version": "v1"}
726 if !testutil.Equal(got, want, opt) {
727 t.Fatalf("\ngot %+v\nwant %+v", got, want)
728 }
729
730 _, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
731 if err == nil {
732 t.Fatal("got nil, wanted error")
733 }
734 }
735
736
737
738 func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) {
739 res := topic.Publish(ctx, msg)
740 _, err := res.Get(ctx)
741 if err != nil {
742 t.Fatalf("publishSync err: %v", err)
743 }
744 }
745
746 func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
747 t.Parallel()
748 ctx := context.Background()
749 client := integrationTestClient(ctx, t)
750 defer client.Close()
751
752 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
753 if err != nil {
754 t.Fatalf("CreateTopic error: %v", err)
755 }
756 defer topic.Delete(ctx)
757 defer topic.Stop()
758
759 var sub *Subscription
760 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
761 t.Fatalf("CreateSub error: %v", err)
762 }
763 defer sub.Delete(ctx)
764
765
766 got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
767 RetentionDuration: 2 * time.Hour,
768 ExpirationPolicy: 25 * time.Hour,
769 AckDeadline: 2 * time.Minute,
770 })
771 if err != nil {
772 t.Fatal(err)
773 }
774 want := 25 * time.Hour
775 if got.ExpirationPolicy != want {
776 t.Fatalf("config.ExpirationPolicy mismatch; got: %v, want: %v", got.ExpirationPolicy, want)
777 }
778
779
780 got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
781 ExpirationPolicy: time.Duration(0),
782 })
783 if err != nil {
784 t.Fatalf("Unexpected error: %v\n", err)
785 }
786 want = time.Duration(0)
787 if diff := testutil.Diff(got.ExpirationPolicy, want); diff != "" {
788 t.Fatalf("\ngot: - want: +\n%s", diff)
789 }
790
791
792 got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
793 ExpirationPolicy: nil,
794 })
795 if err == nil || err.Error() != "pubsub: UpdateSubscription call with nothing to update" {
796 t.Fatalf("Expected no attributes to be updated, error: %v", err)
797 }
798
799
800 _, err = sub.Update(ctx, SubscriptionConfigToUpdate{
801 ExpirationPolicy: 26 * time.Hour,
802 })
803 if err != nil {
804 t.Fatal(err)
805 }
806
807 _, err = sub.Update(ctx, SubscriptionConfigToUpdate{
808 ExpirationPolicy: nil,
809 })
810 if err == nil || err.Error() != "pubsub: UpdateSubscription call with nothing to update" {
811 t.Fatalf("Expected no attributes to be updated, error: %v", err)
812 }
813 }
814
815
816
817 func TestIntegration_UpdateTopicLabels(t *testing.T) {
818 t.Parallel()
819 ctx := context.Background()
820 client := integrationTestClient(ctx, t)
821 defer client.Close()
822
823 compareConfig := func(got TopicConfig, wantLabels map[string]string) bool {
824 return testutil.Equal(got.Labels, wantLabels)
825 }
826
827 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
828 if err != nil {
829 t.Fatalf("CreateTopic error: %v", err)
830 }
831 defer topic.Delete(ctx)
832 defer topic.Stop()
833
834 got, err := topic.Config(ctx)
835 if err != nil {
836 t.Fatal(err)
837 }
838 if !compareConfig(got, nil) {
839 t.Fatalf("\ngot %+v\nwant no labels", got)
840 }
841
842 labels := map[string]string{"label": "value"}
843 got, err = topic.Update(ctx, TopicConfigToUpdate{Labels: labels})
844 if err != nil {
845 t.Fatal(err)
846 }
847 if !compareConfig(got, labels) {
848 t.Fatalf("\ngot %+v\nwant labels %+v", got, labels)
849 }
850
851 got, err = topic.Update(ctx, TopicConfigToUpdate{Labels: map[string]string{}})
852 if err != nil {
853 t.Fatal(err)
854 }
855 if !compareConfig(got, nil) {
856 t.Fatalf("\ngot %+v\nwant no labels", got)
857 }
858 }
859
860 func TestIntegration_PublicTopic(t *testing.T) {
861 t.Parallel()
862 ctx := context.Background()
863 client := integrationTestClient(ctx, t)
864 defer client.Close()
865
866 sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
867 Topic: client.TopicInProject("taxirides-realtime", "pubsub-public-data"),
868 })
869 if err != nil {
870 t.Fatal(err)
871 }
872 sub.Delete(ctx)
873 }
874
875 func TestIntegration_Errors(t *testing.T) {
876
877 t.Parallel()
878 ctx := context.Background()
879 client := integrationTestClient(ctx, t)
880 defer client.Close()
881
882 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
883 if err != nil {
884 t.Fatalf("CreateTopic error: %v", err)
885 }
886 defer topic.Delete(ctx)
887 defer topic.Stop()
888
889
890 sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
891 Topic: topic,
892 RetentionDuration: 1 * time.Second,
893 })
894 if want := codes.InvalidArgument; status.Code(err) != want {
895 t.Errorf("got <%v>, want %s", err, want)
896 }
897 if err == nil {
898 sub.Delete(ctx)
899 }
900
901
902 sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
903 Topic: topic,
904 AckDeadline: 5 * time.Second,
905 })
906 if want := codes.Unknown; status.Code(err) != want {
907 t.Errorf("got <%v>, want %s", err, want)
908 }
909 if err == nil {
910 sub.Delete(ctx)
911 }
912
913
914 sub = client.Subscription(subIDs.New())
915 _, err = sub.Update(ctx, SubscriptionConfigToUpdate{AckDeadline: 20 * time.Second})
916 if want := codes.NotFound; status.Code(err) != want {
917 t.Errorf("got <%v>, want %s", err, want)
918 }
919
920 err = sub.Delete(ctx)
921 if want := codes.NotFound; status.Code(err) != want {
922 t.Errorf("got <%v>, want %s", err, want)
923 }
924
925
926 sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic})
927 if err != nil {
928 t.Fatal(err)
929 }
930 defer sub.Delete(ctx)
931 _, err = sub.Update(ctx, SubscriptionConfigToUpdate{RetentionDuration: 1000 * time.Hour})
932 if want := codes.InvalidArgument; status.Code(err) != want {
933 t.Errorf("got <%v>, want %s", err, want)
934 }
935 }
936
937 func TestIntegration_MessageStoragePolicy_TopicLevel(t *testing.T) {
938 t.Parallel()
939 ctx := context.Background()
940 client := integrationTestClient(ctx, t)
941 defer client.Close()
942
943 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
944 if err != nil {
945 t.Fatalf("CreateTopic error: %v", err)
946 }
947 defer topic.Delete(ctx)
948 defer topic.Stop()
949
950
951 regions := []string{"asia-east1", "us-east1"}
952 cfg, err := topic.Update(ctx, TopicConfigToUpdate{
953 MessageStoragePolicy: &MessageStoragePolicy{
954 AllowedPersistenceRegions: regions,
955 },
956 })
957 if err != nil {
958 t.Fatal(err)
959 }
960 got := cfg.MessageStoragePolicy.AllowedPersistenceRegions
961 want := regions
962 if !testutil.Equal(got, want) {
963 t.Fatalf("\ngot %+v\nwant regions%+v", got, want)
964 }
965
966
967 updateCfg := TopicConfigToUpdate{
968 MessageStoragePolicy: &MessageStoragePolicy{
969 AllowedPersistenceRegions: []string{},
970 },
971 }
972 if _, err = topic.Update(ctx, updateCfg); err == nil {
973 t.Fatalf("Unexpected succeeded in removing all regions\n%+v\n", got)
974 }
975 }
976
977
978
979
980
981
982
983 func TestIntegration_MessageStoragePolicy_ProjectLevel(t *testing.T) {
984
985 if testing.Short() {
986 t.Skip("Integration tests skipped in short mode")
987 }
988 t.Parallel()
989 ctx := context.Background()
990
991
992
993
994 projID := "ps-geofencing-test"
995
996
997
998 ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
999 if ts == nil {
1000 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
1001 }
1002 opts := withGRPCHeadersAssertion(t, option.WithTokenSource(ts))
1003 client, err := NewClient(ctx, projID, opts...)
1004 if err != nil {
1005 t.Fatalf("Creating client error: %v", err)
1006 }
1007 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1008 if err != nil {
1009 t.Fatalf("CreateTopic error: %v", err)
1010 }
1011 defer topic.Delete(ctx)
1012 defer topic.Stop()
1013
1014 config, err := topic.Config(ctx)
1015 if err != nil {
1016 t.Fatal(err)
1017 }
1018 got := config.MessageStoragePolicy.AllowedPersistenceRegions
1019 want := []string{"us-east1"}
1020 if !testutil.Equal(got, want) {
1021 t.Errorf("got %v, want %v", got, want)
1022 }
1023 }
1024
1025 func TestIntegration_CreateTopic_KMS(t *testing.T) {
1026 t.Parallel()
1027 ctx := context.Background()
1028 client := integrationTestClient(ctx, t)
1029 defer client.Close()
1030
1031 kmsClient, err := kms.NewKeyManagementClient(ctx)
1032 if err != nil {
1033 t.Fatal(err)
1034 }
1035
1036 keyRingID := "test-key-ring"
1037 want := "test-key2"
1038
1039
1040 keyRing, err := kmsClient.GetKeyRing(ctx, &kmspb.GetKeyRingRequest{
1041 Name: fmt.Sprintf("projects/%s/locations/global/keyRings/%s", testutil.ProjID(), keyRingID),
1042 })
1043 if err != nil {
1044 if status.Code(err) != codes.NotFound {
1045 t.Fatal(err)
1046 }
1047 createKeyRingReq := &kmspb.CreateKeyRingRequest{
1048 Parent: fmt.Sprintf("projects/%s/locations/global", testutil.ProjID()),
1049 KeyRingId: keyRingID,
1050 }
1051 keyRing, err = kmsClient.CreateKeyRing(ctx, createKeyRingReq)
1052 if err != nil {
1053 t.Fatal(err)
1054 }
1055 }
1056
1057
1058 key, err := kmsClient.GetCryptoKey(ctx, &kmspb.GetCryptoKeyRequest{
1059 Name: fmt.Sprintf("%s/cryptoKeys/%s", keyRing.GetName(), want),
1060 })
1061 if err != nil {
1062 if status.Code(err) != codes.NotFound {
1063 t.Fatal(err)
1064 }
1065 createKeyReq := &kmspb.CreateCryptoKeyRequest{
1066 Parent: keyRing.GetName(),
1067 CryptoKeyId: want,
1068 CryptoKey: &kmspb.CryptoKey{
1069 Purpose: 1,
1070 },
1071 }
1072 key, err = kmsClient.CreateCryptoKey(ctx, createKeyReq)
1073 if err != nil {
1074 t.Fatal(err)
1075 }
1076 }
1077
1078 tc := TopicConfig{
1079 KMSKeyName: key.GetName(),
1080 }
1081 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), &tc)
1082 if err != nil {
1083 t.Fatalf("CreateTopicWithConfig error: %v", err)
1084 }
1085 defer topic.Delete(ctx)
1086 defer topic.Stop()
1087
1088 cfg, err := topic.Config(ctx)
1089 if err != nil {
1090 t.Fatal(err)
1091 }
1092 got := cfg.KMSKeyName
1093
1094 if got != key.GetName() {
1095 t.Errorf("got %v, want %v", got, key.GetName())
1096 }
1097 }
1098
1099 func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) {
1100 t.Parallel()
1101 ctx := context.Background()
1102 client := integrationTestClient(ctx, t)
1103 defer client.Close()
1104
1105 tc := TopicConfig{
1106 MessageStoragePolicy: MessageStoragePolicy{
1107 AllowedPersistenceRegions: []string{"us-east1"},
1108 },
1109 }
1110 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), &tc)
1111 if err != nil {
1112 t.Fatalf("CreateTopicWithConfig error: %v", err)
1113 }
1114 defer topic.Delete(ctx)
1115 defer topic.Stop()
1116
1117 got, err := topic.Config(ctx)
1118 if err != nil {
1119 t.Fatal(err)
1120 }
1121 want := tc
1122 if diff := testutil.Diff(got.MessageStoragePolicy, want.MessageStoragePolicy); diff != "" {
1123 t.Fatalf("\ngot: - want: +\n%s", diff)
1124 }
1125 }
1126
1127 func TestIntegration_OrderedKeys_Basic(t *testing.T) {
1128 ctx := context.Background()
1129 client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
1130 defer client.Close()
1131
1132 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1133 if err != nil {
1134 t.Fatal(err)
1135 }
1136 defer topic.Delete(ctx)
1137 defer topic.Stop()
1138 exists, err := topic.Exists(ctx)
1139 if err != nil {
1140 t.Fatal(err)
1141 }
1142 if !exists {
1143 t.Fatalf("topic %v should exist, but it doesn't", topic)
1144 }
1145 var sub *Subscription
1146 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
1147 Topic: topic,
1148 EnableMessageOrdering: true,
1149 }); err != nil {
1150 t.Fatal(err)
1151 }
1152 defer sub.Delete(ctx)
1153 exists, err = sub.Exists(ctx)
1154 if err != nil {
1155 t.Fatal(err)
1156 }
1157 if !exists {
1158 t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
1159 }
1160
1161 topic.PublishSettings.DelayThreshold = time.Second
1162 topic.EnableMessageOrdering = true
1163
1164 orderingKey := "some-ordering-key"
1165 numItems := 1000
1166 for i := 0; i < numItems; i++ {
1167 r := topic.Publish(ctx, &Message{
1168 ID: fmt.Sprintf("id-%d", i),
1169 Data: []byte(fmt.Sprintf("item-%d", i)),
1170 OrderingKey: orderingKey,
1171 })
1172 go func() {
1173 if _, err := r.Get(ctx); err != nil {
1174 t.Error(err)
1175 }
1176 }()
1177 }
1178
1179 received := make(chan string, numItems)
1180 ctx2, cancel := context.WithCancel(ctx)
1181 go func() {
1182 for i := 0; i < numItems; i++ {
1183 select {
1184 case r := <-received:
1185 if got, want := r, fmt.Sprintf("item-%d", i); got != want {
1186 t.Errorf("%d: got %s, want %s", i, got, want)
1187 }
1188 case <-time.After(30 * time.Second):
1189 t.Errorf("timed out after 30s waiting for item %d", i)
1190 cancel()
1191 }
1192 }
1193 cancel()
1194 }()
1195
1196 if err := sub.Receive(ctx2, func(ctx context.Context, msg *Message) {
1197 defer msg.Ack()
1198 if msg.OrderingKey != orderingKey {
1199 t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
1200 }
1201
1202 received <- string(msg.Data)
1203 }); err != nil {
1204 if c := status.Code(err); c != codes.Canceled {
1205 t.Error(err)
1206 }
1207 }
1208 }
1209
1210 func TestIntegration_OrderedKeys_JSON(t *testing.T) {
1211 ctx := context.Background()
1212 client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
1213 defer client.Close()
1214
1215 testutil.Retry(t, 2, 1*time.Second, func(r *testutil.R) {
1216 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1217 if err != nil {
1218 r.Errorf("createTopicWithRetry err: %v", err)
1219 }
1220 defer topic.Delete(ctx)
1221 defer topic.Stop()
1222 exists, err := topic.Exists(ctx)
1223 if err != nil {
1224 r.Errorf("topic.Exists err: %v", err)
1225 }
1226 if !exists {
1227 r.Errorf("topic %v should exist, but it doesn't", topic)
1228 }
1229 var sub *Subscription
1230 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
1231 Topic: topic,
1232 EnableMessageOrdering: true,
1233 }); err != nil {
1234 r.Errorf("creteSubWithRetry err: %v", err)
1235 }
1236 defer sub.Delete(ctx)
1237 exists, err = sub.Exists(ctx)
1238 if err != nil {
1239 r.Errorf("sub.Exists err: %v", err)
1240 }
1241 if !exists {
1242 r.Errorf("subscription %s should exist, but it doesn't", sub.ID())
1243 }
1244
1245 topic.PublishSettings.DelayThreshold = time.Second
1246 topic.EnableMessageOrdering = true
1247
1248 inFile, err := os.Open("testdata/publish.csv")
1249 if err != nil {
1250 r.Errorf("os.Open err: %v", err)
1251 }
1252 defer inFile.Close()
1253
1254 mu := sync.Mutex{}
1255 var publishData []testutil2.OrderedKeyMsg
1256 var receiveData []testutil2.OrderedKeyMsg
1257
1258 receiveSet := make(map[string]struct{})
1259
1260 wg := sync.WaitGroup{}
1261 scanner := bufio.NewScanner(inFile)
1262 for scanner.Scan() {
1263 line := scanner.Text()
1264
1265 line = strings.Replace(line, "\"", "", -1)
1266 parts := strings.Split(line, ",")
1267 key := parts[0]
1268 msg := parts[1]
1269 publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
1270 res := topic.Publish(ctx, &Message{
1271 Data: []byte(msg),
1272 OrderingKey: key,
1273 })
1274 go func() {
1275 _, err := res.Get(ctx)
1276 if err != nil {
1277
1278 r.Logf("publish error for message(%s): %v", msg, err)
1279 }
1280 }()
1281 wg.Add(1)
1282 }
1283 if err := scanner.Err(); err != nil {
1284 r.Errorf("scanner.Err(): %v", err)
1285 }
1286
1287 go func() {
1288 sub.Receive(ctx, func(ctx context.Context, msg *Message) {
1289 mu.Lock()
1290 defer mu.Unlock()
1291
1292
1293 if _, ok := receiveSet[string(msg.Data)]; ok {
1294 r.Logf("received duplicate message: %s", msg.Data)
1295 return
1296 }
1297 receiveSet[string(msg.Data)] = struct{}{}
1298 receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)})
1299 wg.Done()
1300 msg.Ack()
1301 })
1302 }()
1303
1304 done := make(chan struct{})
1305 go func() {
1306 wg.Wait()
1307 close(done)
1308 }()
1309
1310 select {
1311 case <-done:
1312 case <-time.After(2 * time.Minute):
1313 r.Errorf("timed out after 2m waiting for all messages to be received")
1314 }
1315
1316 mu.Lock()
1317 defer mu.Unlock()
1318 if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
1319 r.Errorf("VerifyKeyOrdering error: %v", err)
1320 }
1321 })
1322 }
1323
1324 func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
1325 ctx := context.Background()
1326 client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
1327 defer client.Close()
1328
1329 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1330 if err != nil {
1331 t.Fatal(err)
1332 }
1333 defer topic.Delete(ctx)
1334 defer topic.Stop()
1335 exists, err := topic.Exists(ctx)
1336 if err != nil {
1337 t.Fatal(err)
1338 }
1339 if !exists {
1340 t.Fatalf("topic %v should exist, but it doesn't", topic)
1341 }
1342
1343 topic.PublishSettings.BufferedByteLimit = 100
1344 topic.EnableMessageOrdering = true
1345
1346 orderingKey := "some-ordering-key2"
1347
1348
1349 r := topic.Publish(ctx, &Message{
1350 Data: bytes.Repeat([]byte("A"), 1000),
1351 OrderingKey: orderingKey,
1352 })
1353 if _, err := r.Get(ctx); err == nil {
1354 t.Fatalf("expected bundle byte limit error, got nil")
1355 }
1356
1357
1358 r = topic.Publish(ctx, &Message{
1359 Data: []byte("should fail"),
1360 OrderingKey: orderingKey,
1361 })
1362 if _, err := r.Get(ctx); err == nil || !errors.As(err, &ErrPublishingPaused{}) {
1363 t.Fatalf("expected ordering keys publish error, got %v", err)
1364 }
1365
1366
1367 topic.ResumePublish(orderingKey)
1368 r = topic.Publish(ctx, &Message{
1369 Data: []byte("should succeed"),
1370 OrderingKey: orderingKey,
1371 })
1372 if _, err := r.Get(ctx); err != nil {
1373 t.Fatalf("got error while publishing message: %v", err)
1374 }
1375 }
1376
1377
1378
1379
1380 func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
1381 ctx := context.Background()
1382 client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
1383 defer client.Close()
1384
1385 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1386 if err != nil {
1387 t.Fatal(err)
1388 }
1389 defer topic.Delete(ctx)
1390 defer topic.Stop()
1391 exists, err := topic.Exists(ctx)
1392 if err != nil {
1393 t.Fatal(err)
1394 }
1395 if !exists {
1396 t.Fatalf("topic %v should exist, but it doesn't", topic)
1397 }
1398 topic.EnableMessageOrdering = true
1399
1400
1401 enableMessageOrdering := false
1402 subCfg := SubscriptionConfig{
1403 Topic: topic,
1404 EnableMessageOrdering: enableMessageOrdering,
1405 }
1406 sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), subCfg)
1407 if err != nil {
1408 t.Fatal(err)
1409 }
1410 defer sub.Delete(ctx)
1411
1412 publishSync(ctx, t, topic, &Message{
1413 Data: []byte("message-1"),
1414 OrderingKey: "ordering-key-1",
1415 })
1416
1417 publishSync(ctx, t, topic, &Message{
1418 Data: []byte("message-2"),
1419 OrderingKey: "ordering-key-1",
1420 })
1421
1422 sub.ReceiveSettings.Synchronous = true
1423 ctx2, cancel := context.WithTimeout(ctx, 12*time.Second)
1424 defer cancel()
1425
1426 var numAcked int32
1427 sub.Receive(ctx2, func(_ context.Context, msg *Message) {
1428
1429 if string(msg.Data) == "message-1" {
1430 time.Sleep(10 * time.Second)
1431 } else {
1432 time.Sleep(5 * time.Second)
1433 }
1434 msg.Ack()
1435 atomic.AddInt32(&numAcked, 1)
1436 })
1437
1438
1439 if numAcked < 2 {
1440 t.Fatalf("did not process all messages in time, numAcked: %d", numAcked)
1441 }
1442 }
1443
1444 func TestIntegration_OrderingWithExactlyOnce(t *testing.T) {
1445 ctx := context.Background()
1446 client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
1447 defer client.Close()
1448
1449 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1450 if err != nil {
1451 t.Fatal(err)
1452 }
1453 defer topic.Delete(ctx)
1454 defer topic.Stop()
1455 exists, err := topic.Exists(ctx)
1456 if err != nil {
1457 t.Fatal(err)
1458 }
1459 if !exists {
1460 t.Fatalf("topic %v should exist, but it doesn't", topic)
1461 }
1462 var sub *Subscription
1463 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
1464 Topic: topic,
1465 EnableMessageOrdering: true,
1466 EnableExactlyOnceDelivery: true,
1467 }); err != nil {
1468 t.Fatal(err)
1469 }
1470 defer sub.Delete(ctx)
1471 exists, err = sub.Exists(ctx)
1472 if err != nil {
1473 t.Fatal(err)
1474 }
1475 if !exists {
1476 t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
1477 }
1478
1479 topic.PublishSettings.DelayThreshold = time.Second
1480 topic.EnableMessageOrdering = true
1481
1482 orderingKey := "some-ordering-key"
1483 numItems := 10
1484 for i := 0; i < numItems; i++ {
1485 r := topic.Publish(ctx, &Message{
1486 ID: fmt.Sprintf("id-%d", i),
1487 Data: []byte(fmt.Sprintf("item-%d", i)),
1488 OrderingKey: orderingKey,
1489 })
1490 go func() {
1491 if _, err := r.Get(ctx); err != nil {
1492 t.Error(err)
1493 }
1494 }()
1495 }
1496
1497 received := make(chan string, numItems)
1498 ctx2, cancel := context.WithCancel(ctx)
1499 go func() {
1500 for i := 0; i < numItems; i++ {
1501 select {
1502 case r := <-received:
1503 if got, want := r, fmt.Sprintf("item-%d", i); got != want {
1504 t.Errorf("%d: got %s, want %s", i, got, want)
1505 }
1506 case <-time.After(30 * time.Second):
1507 t.Errorf("timed out after 30s waiting for item %d", i)
1508 cancel()
1509 }
1510 }
1511 cancel()
1512 }()
1513
1514 if err := sub.Receive(ctx2, func(ctx context.Context, msg *Message) {
1515 defer msg.Ack()
1516 if msg.OrderingKey != orderingKey {
1517 t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
1518 }
1519
1520 received <- string(msg.Data)
1521 }); err != nil {
1522 if c := status.Code(err); c != codes.Canceled {
1523 t.Error(err)
1524 }
1525 }
1526
1527 }
1528
1529 func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
1530 t.Parallel()
1531 ctx := context.Background()
1532 client := integrationTestClient(ctx, t)
1533 defer client.Close()
1534
1535 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1536 if err != nil {
1537 t.Fatalf("CreateTopic error: %v", err)
1538 }
1539 defer topic.Delete(ctx)
1540 defer topic.Stop()
1541
1542 deadLetterTopic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1543 if err != nil {
1544 t.Fatalf("CreateTopic error: %v", err)
1545 }
1546 defer deadLetterTopic.Delete(ctx)
1547 defer deadLetterTopic.Stop()
1548
1549
1550
1551 cfg := SubscriptionConfig{
1552 Topic: topic,
1553 DeadLetterPolicy: &DeadLetterPolicy{
1554 DeadLetterTopic: deadLetterTopic.String(),
1555 },
1556 }
1557 var sub *Subscription
1558 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
1559 t.Fatalf("CreateSub error: %v", err)
1560 }
1561 defer sub.Delete(ctx)
1562
1563 got, err := sub.Config(ctx)
1564 if err != nil {
1565 t.Fatal(err)
1566 }
1567 want := &DeadLetterPolicy{
1568 DeadLetterTopic: deadLetterTopic.String(),
1569 MaxDeliveryAttempts: 5,
1570 }
1571 if diff := testutil.Diff(got.DeadLetterPolicy, want); diff != "" {
1572 t.Fatalf("\ngot: - want: +\n%s", diff)
1573 }
1574
1575 res := topic.Publish(ctx, &Message{
1576 Data: []byte("failed message"),
1577 })
1578 if _, err := res.Get(ctx); err != nil {
1579 t.Fatalf("Publish message error: %v", err)
1580 }
1581
1582 ctx2, cancel := context.WithCancel(ctx)
1583 numAttempts := 1
1584 err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
1585 if numAttempts >= 5 {
1586 cancel()
1587 m.Ack()
1588 return
1589 }
1590 if *m.DeliveryAttempt != numAttempts {
1591 t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
1592 }
1593 numAttempts++
1594 m.Nack()
1595 })
1596 if err != nil {
1597 t.Fatalf("Streaming pull error: %v\n", err)
1598 }
1599 }
1600
1601
1602 func TestIntegration_DeadLetterPolicy_DeliveryAttempt(t *testing.T) {
1603 t.Parallel()
1604 ctx := context.Background()
1605 client := integrationTestClient(ctx, t)
1606 defer client.Close()
1607
1608 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1609 if err != nil {
1610 t.Fatalf("CreateTopic error: %v", err)
1611 }
1612 defer topic.Delete(ctx)
1613 defer topic.Stop()
1614
1615 cfg := SubscriptionConfig{
1616 Topic: topic,
1617 }
1618 var sub *Subscription
1619 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
1620 t.Fatalf("CreateSub error: %v", err)
1621 }
1622 defer sub.Delete(ctx)
1623
1624 res := topic.Publish(ctx, &Message{
1625 Data: []byte("failed message"),
1626 })
1627 if _, err := res.Get(ctx); err != nil {
1628 t.Fatalf("Publish message error: %v", err)
1629 }
1630
1631 ctx2, cancel := context.WithCancel(ctx)
1632 err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
1633 defer m.Ack()
1634 defer cancel()
1635 if m.DeliveryAttempt != nil {
1636 t.Fatalf("DeliveryAttempt should be nil when dead lettering is disabled")
1637 }
1638 })
1639 if err != nil {
1640 t.Fatalf("Streaming pull error: %v\n", err)
1641 }
1642 }
1643
1644 func TestIntegration_DeadLetterPolicy_ClearDeadLetter(t *testing.T) {
1645 t.Parallel()
1646 ctx := context.Background()
1647 client := integrationTestClient(ctx, t)
1648 defer client.Close()
1649
1650 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1651 if err != nil {
1652 t.Fatalf("CreateTopic error: %v", err)
1653 }
1654 defer topic.Delete(ctx)
1655 defer topic.Stop()
1656
1657 deadLetterTopic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1658 if err != nil {
1659 t.Fatalf("CreateTopic error: %v", err)
1660 }
1661 defer deadLetterTopic.Delete(ctx)
1662 defer deadLetterTopic.Stop()
1663
1664 cfg := SubscriptionConfig{
1665 Topic: topic,
1666 DeadLetterPolicy: &DeadLetterPolicy{
1667 DeadLetterTopic: deadLetterTopic.String(),
1668 },
1669 }
1670 var sub *Subscription
1671 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
1672 t.Fatalf("CreateSub error: %v", err)
1673 }
1674 defer sub.Delete(ctx)
1675
1676 sub.Update(ctx, SubscriptionConfigToUpdate{
1677 DeadLetterPolicy: &DeadLetterPolicy{},
1678 })
1679
1680 got, err := sub.Config(ctx)
1681 if err != nil {
1682 t.Fatal(err)
1683 }
1684 if got.DeadLetterPolicy != nil {
1685 t.Fatalf("config.DeadLetterPolicy; got: %v want: nil", got.DeadLetterPolicy)
1686 }
1687 }
1688
1689
1690
1691 func TestIntegration_BadEndpoint(t *testing.T) {
1692 t.Parallel()
1693 ctx := context.Background()
1694 opts := withGRPCHeadersAssertion(t,
1695 option.WithEndpoint("example.googleapis.com:443"),
1696 )
1697 client := integrationTestClient(ctx, t, opts...)
1698 defer client.Close()
1699 if _, err := client.CreateTopic(ctx, topicIDs.New()); err == nil {
1700 t.Fatalf("CreateTopic should fail with fake endpoint, got nil err")
1701 }
1702 }
1703
1704 func TestIntegration_Filter_CreateSubscription(t *testing.T) {
1705 t.Parallel()
1706 ctx := context.Background()
1707 client := integrationTestClient(ctx, t)
1708 defer client.Close()
1709 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1710 if err != nil {
1711 t.Fatalf("CreateTopic error: %v", err)
1712 }
1713 defer topic.Delete(ctx)
1714 defer topic.Stop()
1715 cfg := SubscriptionConfig{
1716 Topic: topic,
1717 Filter: "attributes.event_type = \"1\"",
1718 }
1719 var sub *Subscription
1720 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
1721 t.Fatalf("CreateSub error: %v", err)
1722 }
1723 defer sub.Delete(ctx)
1724 got, err := sub.Config(ctx)
1725 if err != nil {
1726 t.Fatal(err)
1727 }
1728 want := cfg.Filter
1729 if got.Filter != want {
1730 t.Fatalf("subcfg.Filter mismatch; got: %s, want: %s", got.Filter, want)
1731 }
1732 attrs := make(map[string]string)
1733 attrs["event_type"] = "1"
1734 res := topic.Publish(ctx, &Message{
1735 Data: []byte("hello world"),
1736 Attributes: attrs,
1737 })
1738 if _, err := res.Get(ctx); err != nil {
1739 t.Fatalf("Publish message error: %v", err)
1740 }
1741
1742
1743 attrs["event_type"] = "2"
1744 res = topic.Publish(ctx, &Message{
1745 Data: []byte("hello world"),
1746 Attributes: attrs,
1747 })
1748 if _, err := res.Get(ctx); err != nil {
1749 t.Fatalf("Publish message error: %v", err)
1750 }
1751 ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
1752 defer cancel()
1753 err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
1754 defer m.Ack()
1755 if m.Attributes["event_type"] != "1" {
1756 t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
1757 }
1758 })
1759 if err != nil {
1760 t.Fatalf("Streaming pull error: %v\n", err)
1761 }
1762 }
1763
1764 func TestIntegration_RetryPolicy(t *testing.T) {
1765 t.Parallel()
1766 ctx := context.Background()
1767 client := integrationTestClient(ctx, t)
1768 defer client.Close()
1769
1770 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1771 if err != nil {
1772 t.Fatalf("CreateTopic error: %v", err)
1773 }
1774 defer topic.Delete(ctx)
1775 defer topic.Stop()
1776
1777 cfg := SubscriptionConfig{
1778 Topic: topic,
1779 RetryPolicy: &RetryPolicy{
1780 MinimumBackoff: 20 * time.Second,
1781 MaximumBackoff: 500 * time.Second,
1782 },
1783 }
1784 var sub *Subscription
1785 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
1786 t.Fatalf("CreateSub error: %v", err)
1787 }
1788 defer sub.Delete(ctx)
1789
1790 got, err := sub.Config(ctx)
1791 if err != nil {
1792 t.Fatal(err)
1793 }
1794 want := SubscriptionConfig{
1795 Topic: topic,
1796 AckDeadline: 10 * time.Second,
1797 RetainAckedMessages: false,
1798 RetentionDuration: defaultRetentionDuration,
1799 ExpirationPolicy: defaultExpirationPolicy,
1800 RetryPolicy: &RetryPolicy{
1801 MinimumBackoff: 20 * time.Second,
1802 MaximumBackoff: 500 * time.Second,
1803 },
1804 }
1805 if diff := testutil.Diff(got.RetryPolicy, want.RetryPolicy); diff != "" {
1806 t.Fatalf("\ngot: - want: +\n%s", diff)
1807 }
1808
1809
1810 cfgToUpdate := SubscriptionConfigToUpdate{
1811 RetryPolicy: &RetryPolicy{},
1812 }
1813 _, err = sub.Update(ctx, cfgToUpdate)
1814 if err != nil {
1815 t.Fatalf("got error while updating sub: %v", err)
1816 }
1817
1818 got, err = sub.Config(ctx)
1819 if err != nil {
1820 t.Fatal(err)
1821 }
1822 want.RetryPolicy = nil
1823 if diff := testutil.Diff(got.RetryPolicy, want.RetryPolicy); diff != "" {
1824 t.Fatalf("\ngot: - want: +\n%s", diff)
1825 }
1826 }
1827
1828 func TestIntegration_DetachSubscription(t *testing.T) {
1829 t.Parallel()
1830 ctx := context.Background()
1831 client := integrationTestClient(ctx, t)
1832 defer client.Close()
1833
1834 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1835 if err != nil {
1836 t.Fatalf("CreateTopic error: %v", err)
1837 }
1838 defer topic.Delete(ctx)
1839 defer topic.Stop()
1840
1841 cfg := SubscriptionConfig{
1842 Topic: topic,
1843 }
1844 var sub *Subscription
1845 if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
1846 t.Fatalf("CreateSub error: %v", err)
1847 }
1848 defer sub.Delete(ctx)
1849
1850 if _, err := client.DetachSubscription(ctx, sub.String()); err != nil {
1851 t.Fatalf("DetachSubscription error: %v", err)
1852 }
1853
1854 newSub := client.Subscription(sub.ID())
1855 got, err := newSub.Config(ctx)
1856 if err != nil {
1857 t.Fatalf("GetSubscription error: %v", err)
1858 }
1859 if !got.Detached {
1860 t.Fatal("SubscriptionConfig not detached after calling detach")
1861 }
1862 }
1863
1864 func TestIntegration_SchemaAdmin(t *testing.T) {
1865 t.Parallel()
1866 ctx := context.Background()
1867 c := integrationTestSchemaClient(ctx, t)
1868 defer c.Close()
1869
1870 for _, tc := range []struct {
1871 desc string
1872 schemaType SchemaType
1873 path string
1874 }{
1875 {
1876 desc: "avro schema",
1877 schemaType: SchemaAvro,
1878 path: "testdata/schema/us-states.avsc",
1879 },
1880 {
1881 desc: "protocol buffer schema",
1882 schemaType: SchemaProtocolBuffer,
1883 path: "testdata/schema/us-states.proto",
1884 },
1885 } {
1886 t.Run(tc.desc, func(t *testing.T) {
1887 content, err := ioutil.ReadFile(tc.path)
1888 if err != nil {
1889 t.Fatal(err)
1890 }
1891 schema := string(content)
1892 schemaID := schemaIDs.New()
1893 schemaPath := fmt.Sprintf("projects/%s/schemas/%s", testutil.ProjID(), schemaID)
1894 sc := SchemaConfig{
1895 Type: tc.schemaType,
1896 Definition: schema,
1897 }
1898 got, err := c.CreateSchema(ctx, schemaID, sc)
1899 if err != nil {
1900 t.Fatalf("SchemaClient.CreateSchema error: %v", err)
1901 }
1902
1903 want := &SchemaConfig{
1904 Name: schemaPath,
1905 Type: tc.schemaType,
1906 Definition: schema,
1907 }
1908 if diff := testutil.Diff(got, want, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
1909 t.Fatalf("\ngot: - want: +\n%s", diff)
1910 }
1911
1912 got, err = c.Schema(ctx, schemaID, SchemaViewFull)
1913 if err != nil {
1914 t.Fatalf("SchemaClient.Schema error: %v", err)
1915 }
1916 if diff := testutil.Diff(got, want, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
1917 t.Fatalf("\ngot: - want: +\n%s", diff)
1918 }
1919
1920 err = c.DeleteSchema(ctx, schemaID)
1921 if err != nil {
1922 t.Fatalf("SchemaClient.DeleteSchema error: %v", err)
1923 }
1924 })
1925 }
1926 }
1927
1928 func TestIntegration_ValidateSchema(t *testing.T) {
1929 t.Parallel()
1930 ctx := context.Background()
1931 c := integrationTestSchemaClient(ctx, t)
1932 defer c.Close()
1933
1934 for _, tc := range []struct {
1935 desc string
1936 schemaType SchemaType
1937 path string
1938 wantErr error
1939 }{
1940 {
1941 desc: "avro schema",
1942 schemaType: SchemaAvro,
1943 path: "testdata/schema/us-states.avsc",
1944 wantErr: nil,
1945 },
1946 {
1947 desc: "protocol buffer schema",
1948 schemaType: SchemaProtocolBuffer,
1949 path: "testdata/schema/us-states.proto",
1950 wantErr: nil,
1951 },
1952 {
1953 desc: "protocol buffer schema",
1954 schemaType: SchemaProtocolBuffer,
1955 path: "testdata/schema/invalid.avsc",
1956 wantErr: status.Errorf(codes.InvalidArgument, "Request contains an invalid argument."),
1957 },
1958 } {
1959 t.Run(tc.desc, func(t *testing.T) {
1960 content, err := ioutil.ReadFile(tc.path)
1961 if err != nil {
1962 t.Fatal(err)
1963 }
1964 def := string(content)
1965 cfg := SchemaConfig{
1966 Type: tc.schemaType,
1967 Definition: def,
1968 }
1969 _, gotErr := c.ValidateSchema(ctx, cfg)
1970 if status.Code(gotErr) != status.Code(tc.wantErr) {
1971 t.Fatalf("got err: %v\nwant err: %v", gotErr, tc.wantErr)
1972 }
1973 })
1974 }
1975 }
1976
1977 func TestIntegration_ValidateMessage(t *testing.T) {
1978 t.Parallel()
1979 ctx := context.Background()
1980 c := integrationTestSchemaClient(ctx, t)
1981 defer c.Close()
1982
1983 for _, tc := range []struct {
1984 desc string
1985 schemaType SchemaType
1986 schemaPath string
1987 encoding SchemaEncoding
1988 messagePath string
1989 wantErr error
1990 }{
1991 {
1992 desc: "avro json encoding",
1993 schemaType: SchemaAvro,
1994 schemaPath: "testdata/schema/us-states.avsc",
1995 encoding: EncodingJSON,
1996 messagePath: "testdata/schema/alaska.json",
1997 wantErr: nil,
1998 },
1999 {
2000 desc: "avro binary encoding",
2001 schemaType: SchemaAvro,
2002 schemaPath: "testdata/schema/us-states.avsc",
2003 encoding: EncodingBinary,
2004 messagePath: "testdata/schema/alaska.avro",
2005 wantErr: nil,
2006 },
2007 {
2008 desc: "proto json encoding",
2009 schemaType: SchemaProtocolBuffer,
2010 schemaPath: "testdata/schema/us-states.proto",
2011 encoding: EncodingJSON,
2012 messagePath: "testdata/schema/alaska.json",
2013 wantErr: nil,
2014 },
2015 {
2016 desc: "protocol buffer schema",
2017 schemaType: SchemaProtocolBuffer,
2018 schemaPath: "testdata/schema/invalid.avsc",
2019 encoding: EncodingBinary,
2020 messagePath: "testdata/schema/invalid.avsc",
2021 wantErr: status.Errorf(codes.InvalidArgument, "Request contains an invalid argument."),
2022 },
2023 } {
2024 t.Run(tc.desc, func(t *testing.T) {
2025 content, err := ioutil.ReadFile(tc.schemaPath)
2026 if err != nil {
2027 t.Fatal(err)
2028 }
2029 def := string(content)
2030 cfg := SchemaConfig{
2031 Type: tc.schemaType,
2032 Definition: def,
2033 }
2034
2035 msg, err := ioutil.ReadFile(tc.messagePath)
2036 if err != nil {
2037 t.Fatal(err)
2038 }
2039 _, gotErr := c.ValidateMessageWithConfig(ctx, msg, tc.encoding, cfg)
2040 if status.Code(gotErr) != status.Code(tc.wantErr) {
2041 t.Fatalf("got err: %v\nwant err: %v", gotErr, tc.wantErr)
2042 }
2043 })
2044 }
2045 }
2046
2047 func TestIntegration_TopicRetention(t *testing.T) {
2048 ctx := context.Background()
2049 c := integrationTestClient(ctx, t)
2050 defer c.Close()
2051
2052 tc := TopicConfig{
2053 RetentionDuration: 31 * 24 * time.Hour,
2054 }
2055
2056 topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), &tc)
2057 if err != nil {
2058 t.Fatalf("failed to create topic: %v", err)
2059 }
2060 defer topic.Delete(ctx)
2061 defer topic.Stop()
2062
2063 newDur := 11 * time.Minute
2064 cfg, err := topic.Update(ctx, TopicConfigToUpdate{
2065 RetentionDuration: newDur,
2066 })
2067 if err != nil {
2068 t.Fatalf("failed to update topic: %v", err)
2069 }
2070 if got := cfg.RetentionDuration; got != newDur {
2071 t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur)
2072 }
2073
2074
2075 s, err := createSubWithRetry(ctx, t, c, subIDs.New(), SubscriptionConfig{
2076 Topic: topic,
2077 })
2078 if err != nil {
2079 t.Fatalf("failed to create subscription: %v", err)
2080 }
2081 defer s.Delete(ctx)
2082 sCfg, err := s.Config(ctx)
2083 if err != nil {
2084 t.Fatalf("failed to get sub config: %v", err)
2085 }
2086 if got := sCfg.TopicMessageRetentionDuration; got != newDur {
2087 t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur)
2088 }
2089
2090
2091 cfg, err = topic.Update(ctx, TopicConfigToUpdate{
2092 RetentionDuration: -1 * time.Minute,
2093 })
2094 if err != nil {
2095 t.Fatal(err)
2096 }
2097 if got := cfg.RetentionDuration; got != nil {
2098 t.Fatalf("expected cleared retention duration, got: %v", got)
2099 }
2100 }
2101
2102 func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
2103 ctx := context.Background()
2104 client := integrationTestClient(ctx, t)
2105
2106 for _, maxMsgs := range []int{0, 3, -1} {
2107 testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0)
2108 }
2109 }
2110
2111 func TestIntegration_TopicUpdateSchema(t *testing.T) {
2112 ctx := context.Background()
2113 c := integrationTestClient(ctx, t)
2114 defer c.Close()
2115
2116 sc := integrationTestSchemaClient(ctx, t)
2117 defer sc.Close()
2118
2119 schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")
2120 if err != nil {
2121 t.Fatal(err)
2122 }
2123
2124 schemaID := schemaIDs.New()
2125 schemaCfg, err := sc.CreateSchema(ctx, schemaID, SchemaConfig{
2126 Type: SchemaAvro,
2127 Definition: string(schemaContent),
2128 })
2129 if err != nil {
2130 t.Fatal(err)
2131 }
2132 defer sc.DeleteSchema(ctx, schemaID)
2133
2134 topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), nil)
2135 if err != nil {
2136 t.Fatal(err)
2137 }
2138 defer topic.Delete(ctx)
2139 defer topic.Stop()
2140
2141 schema := &SchemaSettings{
2142 Schema: schemaCfg.Name,
2143 Encoding: EncodingJSON,
2144 }
2145 cfg, err := topic.Update(ctx, TopicConfigToUpdate{
2146 SchemaSettings: schema,
2147 })
2148 if err != nil {
2149 t.Fatal(err)
2150 }
2151 if diff := cmp.Diff(cfg.SchemaSettings, schema); diff != "" {
2152 t.Fatalf("schema settings for update -want, +got: %v", diff)
2153 }
2154 }
2155
2156 func TestIntegration_DetectProjectID(t *testing.T) {
2157 if testing.Short() {
2158 t.Skip("Integration tests skipped in short mode")
2159 }
2160 ctx := context.Background()
2161 testCreds := testutil.Credentials(ctx)
2162 if testCreds == nil {
2163 t.Skip("test credentials not present, skipping")
2164 }
2165
2166 goodClient, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds))
2167 if err != nil {
2168 t.Errorf("test pubsub.NewClient: %v", err)
2169 }
2170 if goodClient.Project() != testutil.ProjID() {
2171 t.Errorf("client.Project() got %q, want %q", goodClient.Project(), testutil.ProjID())
2172 }
2173
2174 badTS := testutil.ErroringTokenSource{}
2175 if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
2176 t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID)
2177 }
2178 }
2179
2180 func TestIntegration_PublishCompression(t *testing.T) {
2181 ctx := context.Background()
2182 client := integrationTestClient(ctx, t)
2183 defer client.Close()
2184
2185 topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
2186 if err != nil {
2187 t.Fatal(err)
2188 }
2189 defer topic.Delete(ctx)
2190 defer topic.Stop()
2191
2192 topic.PublishSettings.EnableCompression = true
2193 topic.PublishSettings.CompressionBytesThreshold = 50
2194
2195 const messageSizeBytes = 1000
2196
2197 msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
2198 res := topic.Publish(ctx, msg)
2199
2200 _, err = res.Get(ctx)
2201 if err != nil {
2202 t.Errorf("publish result got err: %v", err)
2203 }
2204 }
2205
2206
2207 func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) {
2208 var topic *Topic
2209 var err error
2210 testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) {
2211 if cfg != nil {
2212 topic, err = c.CreateTopicWithConfig(ctx, topicID, cfg)
2213 if err != nil {
2214 r.Errorf("CreateTopic error: %v", err)
2215 }
2216 } else {
2217 topic, err = c.CreateTopic(ctx, topicID)
2218 if err != nil {
2219 r.Errorf("CreateTopic error: %v", err)
2220 }
2221 }
2222 })
2223 return topic, err
2224 }
2225
2226
2227 func createSubWithRetry(ctx context.Context, t *testing.T, c *Client, subID string, cfg SubscriptionConfig) (*Subscription, error) {
2228 var sub *Subscription
2229 var err error
2230 testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) {
2231 sub, err = c.CreateSubscription(ctx, subID, cfg)
2232 if err != nil {
2233 r.Errorf("CreateSub error: %v", err)
2234 }
2235 })
2236 return sub, err
2237 }
2238
View as plain text