...

Source file src/cloud.google.com/go/pubsub/internal/longtest/endtoend_test.go

Documentation: cloud.google.com/go/pubsub/internal/longtest

     1  // Copyright 2014 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package longtest_test
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"log"
    21  	"math/rand"
    22  	"strconv"
    23  	"strings"
    24  	"sync"
    25  	"testing"
    26  	"time"
    27  
    28  	"cloud.google.com/go/internal/testutil"
    29  	"cloud.google.com/go/pubsub"
    30  	"google.golang.org/api/iterator"
    31  	"google.golang.org/api/option"
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/status"
    34  )
    35  
    36  const (
    37  	timeout                 = time.Minute * 10
    38  	ackDeadline             = time.Second * 10
    39  	nMessages               = 1e4
    40  	acceptableDupPercentage = 1
    41  	numAcceptableDups       = int(nMessages * acceptableDupPercentage / 100)
    42  	resourcePrefix          = "endtoend"
    43  )
    44  
    45  // The end-to-end pumps many messages into a topic and tests that they are all
    46  // delivered to each subscription for the topic. It also tests that messages
    47  // are not unexpectedly redelivered.
    48  func TestEndToEnd_Dupes(t *testing.T) {
    49  	t.Skip("https://github.com/googleapis/google-cloud-go/issues/1752")
    50  
    51  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
    52  	defer cancel()
    53  	client, topic, cleanup := prepareEndToEndTest(ctx, t)
    54  	defer cleanup()
    55  	subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano())
    56  
    57  	// Two subscriptions to the same topic.
    58  	var err error
    59  	var subs [2]*pubsub.Subscription
    60  	for i := 0; i < len(subs); i++ {
    61  		subs[i], err = client.CreateSubscription(ctx, fmt.Sprintf("%s-%d", subPrefix, i), pubsub.SubscriptionConfig{
    62  			Topic:       topic,
    63  			AckDeadline: ackDeadline,
    64  		})
    65  		if err != nil {
    66  			t.Fatalf("CreateSub error: %v", err)
    67  		}
    68  		defer subs[i].Delete(ctx)
    69  	}
    70  
    71  	err = publish(ctx, topic, nMessages)
    72  	topic.Stop()
    73  	if err != nil {
    74  		t.Fatalf("publish: %v", err)
    75  	}
    76  
    77  	// recv provides an indication that messages are still arriving.
    78  	recv := make(chan struct{})
    79  	// We have two subscriptions to our topic.
    80  	// Each subscription will get a copy of each published message.
    81  	var wg sync.WaitGroup
    82  	cctx, cancel := context.WithTimeout(ctx, timeout)
    83  	defer cancel()
    84  
    85  	consumers := []*consumer{
    86  		{
    87  			counts:    make(map[string]int),
    88  			recv:      recv,
    89  			durations: []time.Duration{time.Hour},
    90  			done:      make(chan struct{}),
    91  		},
    92  		{
    93  			counts:    make(map[string]int),
    94  			recv:      recv,
    95  			durations: []time.Duration{ackDeadline, ackDeadline, ackDeadline / 2, ackDeadline / 2, time.Hour},
    96  			done:      make(chan struct{}),
    97  		},
    98  	}
    99  	for i, con := range consumers {
   100  		con := con
   101  		sub := subs[i]
   102  		wg.Add(1)
   103  		go func() {
   104  			defer wg.Done()
   105  			con.consume(ctx, t, sub)
   106  		}()
   107  	}
   108  	// Wait for a while after the last message before declaring quiescence.
   109  	// We wait a multiple of the ack deadline, for two reasons:
   110  	// 1. To detect if messages are redelivered after having their ack
   111  	//    deadline extended.
   112  	// 2. To wait for redelivery of messages that were en route when a Receive
   113  	//    is canceled. This can take considerably longer than the ack deadline.
   114  	quiescenceDur := ackDeadline * 6
   115  	quiescenceTimer := time.NewTimer(quiescenceDur)
   116  
   117  loop:
   118  	for {
   119  		select {
   120  		case <-recv:
   121  			// Reset timer so we wait quiescenceDur after the last message.
   122  			// See https://godoc.org/time#Timer.Reset for why the Stop
   123  			// and channel drain are necessary.
   124  			if !quiescenceTimer.Stop() {
   125  				<-quiescenceTimer.C
   126  			}
   127  			quiescenceTimer.Reset(quiescenceDur)
   128  
   129  		case <-quiescenceTimer.C:
   130  			cancel()
   131  			log.Println("quiesced")
   132  			break loop
   133  
   134  		case <-cctx.Done():
   135  			t.Fatal("timed out")
   136  		}
   137  	}
   138  	wg.Wait()
   139  	close(recv)
   140  	for i, con := range consumers {
   141  		var numDups int
   142  		var zeroes int
   143  		for _, v := range con.counts {
   144  			if v == 0 {
   145  				zeroes++
   146  			}
   147  			numDups += v - 1
   148  		}
   149  
   150  		if zeroes > 0 {
   151  			t.Errorf("Consumer %d: %d messages never arrived", i, zeroes)
   152  		} else if numDups > numAcceptableDups {
   153  			t.Errorf("Consumer %d: Willing to accept %d dups (%v%% duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
   154  		}
   155  	}
   156  
   157  	for i, con := range consumers {
   158  		select {
   159  		case <-con.done:
   160  		case <-time.After(15 * time.Second):
   161  			t.Fatalf("timed out waiting for consumer %d to finish", i)
   162  		}
   163  	}
   164  }
   165  
   166  func TestEndToEnd_LongProcessingTime(t *testing.T) {
   167  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   168  	defer cancel()
   169  	client, topic, cleanup := prepareEndToEndTest(ctx, t)
   170  	defer cleanup()
   171  	subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano())
   172  
   173  	// Two subscriptions to the same topic.
   174  	sub, err := client.CreateSubscription(ctx, subPrefix+"-00", pubsub.SubscriptionConfig{
   175  		Topic:       topic,
   176  		AckDeadline: ackDeadline,
   177  	})
   178  	if err != nil {
   179  		t.Fatalf("CreateSub error: %v", err)
   180  	}
   181  	defer sub.Delete(ctx)
   182  
   183  	// Tests the issue found in https://github.com/googleapis/google-cloud-go/issues/1247.
   184  	sub.ReceiveSettings.Synchronous = true
   185  	sub.ReceiveSettings.MaxOutstandingMessages = 500
   186  
   187  	err = publish(ctx, topic, 500)
   188  	topic.Stop()
   189  	if err != nil {
   190  		t.Fatalf("publish: %v", err)
   191  	}
   192  
   193  	// recv provides an indication that messages are still arriving.
   194  	recv := make(chan struct{})
   195  	consumer := consumer{
   196  		counts:    make(map[string]int),
   197  		recv:      recv,
   198  		durations: []time.Duration{time.Hour},
   199  		processingDelay: func() time.Duration {
   200  			return time.Duration(1+rand.Int63n(120)) * time.Second
   201  		},
   202  		done: make(chan struct{}),
   203  	}
   204  	go consumer.consume(ctx, t, sub)
   205  	// Wait for a while after the last message before declaring quiescence.
   206  	// We wait a multiple of the ack deadline, for two reasons:
   207  	// 1. To detect if messages are redelivered after having their ack
   208  	//    deadline extended.
   209  	// 2. To wait for redelivery of messages that were en route when a Receive
   210  	//    is canceled. This can take considerably longer than the ack deadline.
   211  	quiescenceDur := 12 * ackDeadline
   212  	quiescenceTimer := time.NewTimer(quiescenceDur)
   213  loop:
   214  	for {
   215  		select {
   216  		case <-recv:
   217  			// Reset timer so we wait quiescenceDur after the last message.
   218  			// See https://godoc.org/time#Timer.Reset for why the Stop
   219  			// and channel drain are necessary.
   220  			if !quiescenceTimer.Stop() {
   221  				<-quiescenceTimer.C
   222  			}
   223  			quiescenceTimer.Reset(quiescenceDur)
   224  
   225  		case <-quiescenceTimer.C:
   226  			cancel()
   227  			log.Println("quiesced")
   228  			break loop
   229  
   230  		case <-ctx.Done():
   231  			t.Fatal("timed out")
   232  		}
   233  	}
   234  	close(recv)
   235  	var numDups int
   236  	var zeroes int
   237  	for _, v := range consumer.counts {
   238  		if v == 0 {
   239  			zeroes++
   240  		}
   241  		numDups += v - 1
   242  	}
   243  
   244  	if zeroes > 0 {
   245  		t.Errorf("%d messages never arrived", zeroes)
   246  	} else if numDups > numAcceptableDups {
   247  		t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
   248  	}
   249  
   250  	select {
   251  	case <-consumer.done:
   252  	case <-time.After(15 * time.Second):
   253  		t.Fatal("timed out waiting for consumer to finish")
   254  	}
   255  }
   256  
   257  // publish publishes n messages to topic.
   258  func publish(ctx context.Context, topic *pubsub.Topic, n int) error {
   259  	var rs []*pubsub.PublishResult
   260  	for i := 0; i < n; i++ {
   261  		m := &pubsub.Message{Data: []byte(fmt.Sprintf("msg %d", i))}
   262  		rs = append(rs, topic.Publish(ctx, m))
   263  	}
   264  	for _, r := range rs {
   265  		_, err := r.Get(ctx)
   266  		if err != nil {
   267  			return err
   268  		}
   269  	}
   270  	return nil
   271  }
   272  
   273  // consumer consumes messages according to its configuration.
   274  type consumer struct {
   275  	// A consumer will spin out a Receive for each duration, which will be
   276  	// canceled after each duration and the next one spun up. For example, if
   277  	// there are 5 3 second durations, then there will be 5 3 second Receives.
   278  	durations []time.Duration
   279  
   280  	// A value is sent to recv each time process is called.
   281  	recv chan struct{}
   282  
   283  	// How long to wait for before acking.
   284  	processingDelay func() time.Duration
   285  
   286  	mu         sync.Mutex
   287  	counts     map[string]int // msgID: recvdAmt
   288  	totalRecvd int
   289  
   290  	// Done consuming.
   291  	done chan struct{}
   292  }
   293  
   294  // consume reads messages from a subscription, and keeps track of what it receives in mc.
   295  // After consume returns, the caller should wait on wg to ensure that no more updates to mc will be made.
   296  func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscription) {
   297  	defer close(c.done)
   298  	for _, dur := range c.durations {
   299  		ctx2, cancel := context.WithTimeout(ctx, dur)
   300  		defer cancel()
   301  		id := sub.String()[len(sub.String())-1:]
   302  		t.Logf("%s: start receive", id)
   303  		prev := c.totalRecvd
   304  		err := sub.Receive(ctx2, c.process)
   305  		t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev)
   306  		if serr, _ := status.FromError(err); err != nil && serr.Code() != codes.Canceled {
   307  			panic(err)
   308  		}
   309  		select {
   310  		case <-ctx.Done():
   311  			return
   312  		default:
   313  		}
   314  	}
   315  }
   316  
   317  // process handles a message and records it in mc.
   318  func (c *consumer) process(_ context.Context, m *pubsub.Message) {
   319  	c.mu.Lock()
   320  	c.counts[m.ID]++
   321  	c.totalRecvd++
   322  	c.mu.Unlock()
   323  	c.recv <- struct{}{}
   324  
   325  	var delay time.Duration
   326  	if c.processingDelay == nil {
   327  		delay = time.Duration(rand.Intn(int(ackDeadline * 3)))
   328  	} else {
   329  		delay = c.processingDelay()
   330  	}
   331  
   332  	// Simulate time taken to process m, while continuing to process more messages.
   333  	// Some messages will need to have their ack deadline extended due to this delay.
   334  	time.AfterFunc(delay, func() {
   335  		m.Ack()
   336  	})
   337  }
   338  
   339  // Remember to call cleanup!
   340  func prepareEndToEndTest(ctx context.Context, t *testing.T) (*pubsub.Client, *pubsub.Topic, func()) {
   341  	if testing.Short() {
   342  		t.Skip("Integration tests skipped in short mode")
   343  	}
   344  	ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
   345  	if ts == nil {
   346  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
   347  	}
   348  
   349  	now := time.Now()
   350  	topicName := fmt.Sprintf("%s-%d", resourcePrefix, now.UnixNano())
   351  
   352  	client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
   353  	if err != nil {
   354  		t.Fatalf("Creating client error: %v", err)
   355  	}
   356  
   357  	// Don't stop the test if cleanup failed.
   358  	if err := cleanupSubscription(ctx, client); err != nil {
   359  		t.Logf("Pre-test subscription cleanup failed: %v", err)
   360  	}
   361  	if err := cleanupTopic(ctx, client); err != nil {
   362  		t.Logf("Pre-test topic cleanup failed: %v", err)
   363  	}
   364  
   365  	var topic *pubsub.Topic
   366  	if topic, err = client.CreateTopic(ctx, topicName); err != nil {
   367  		t.Fatalf("CreateTopic error: %v", err)
   368  	}
   369  
   370  	return client, topic, func() {
   371  		topic.Delete(ctx)
   372  		client.Close()
   373  	}
   374  }
   375  
   376  // cleanupTopic deletes stale testing topics.
   377  func cleanupTopic(ctx context.Context, client *pubsub.Client) error {
   378  	if testing.Short() {
   379  		return nil // Don't clean up in short mode.
   380  	}
   381  	// Delete topics which were	created a while ago.
   382  	const expireAge = 24 * time.Hour
   383  
   384  	it := client.Topics(ctx)
   385  	for {
   386  		t, err := it.Next()
   387  		if err == iterator.Done {
   388  			break
   389  		}
   390  		if err != nil {
   391  			return err
   392  		}
   393  		// Take timestamp from id.
   394  		tID := t.ID()
   395  		p := strings.Split(tID, "-")
   396  
   397  		// Only delete resources created from the endtoend test.
   398  		// Otherwise, this will affect other tests running midflight.
   399  		if p[0] == resourcePrefix {
   400  			tCreated := p[len(p)-1]
   401  			timestamp, err := strconv.ParseInt(tCreated, 10, 64)
   402  			if err != nil {
   403  				continue
   404  			}
   405  			timeTCreated := time.Unix(0, timestamp)
   406  			if time.Since(timeTCreated) > expireAge {
   407  				log.Printf("deleting topic %q", tID)
   408  				if err := t.Delete(ctx); err != nil {
   409  					return fmt.Errorf("Delete topic: %v: %v", t.String(), err)
   410  				}
   411  			}
   412  		}
   413  	}
   414  	return nil
   415  }
   416  
   417  // cleanupSubscription deletes stale testing subscriptions.
   418  func cleanupSubscription(ctx context.Context, client *pubsub.Client) error {
   419  	if testing.Short() {
   420  		return nil // Don't clean up in short mode.
   421  	}
   422  	// Delete subscriptions which were created a while ago.
   423  	const expireAge = 24 * time.Hour
   424  
   425  	it := client.Subscriptions(ctx)
   426  	for {
   427  		s, err := it.Next()
   428  		if err == iterator.Done {
   429  			break
   430  		}
   431  		if err != nil {
   432  			return err
   433  		}
   434  		sID := s.ID()
   435  		p := strings.Split(sID, "-")
   436  
   437  		// Only delete resources created from the endtoend test.
   438  		// Otherwise, this will affect other tests running midflight.
   439  		if p[0] == resourcePrefix {
   440  			sCreated := p[len(p)-2]
   441  			timestamp, err := strconv.ParseInt(sCreated, 10, 64)
   442  			if err != nil {
   443  				continue
   444  			}
   445  			timeSCreated := time.Unix(0, timestamp)
   446  			if time.Since(timeSCreated) > expireAge {
   447  				log.Printf("deleting subscription %q", sID)
   448  				if err := s.Delete(ctx); err != nil {
   449  					return fmt.Errorf("Delete subscription: %v: %v", s.String(), err)
   450  				}
   451  			}
   452  		}
   453  	}
   454  	return nil
   455  }
   456  

View as plain text