...

Source file src/cloud.google.com/go/pubsub/pullstream.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2018 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"io"
    21  	"net/url"
    22  	"sync"
    23  	"time"
    24  
    25  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    26  	gax "github.com/googleapis/gax-go/v2"
    27  	"google.golang.org/grpc"
    28  )
    29  
    30  // A pullStream supports the methods of a StreamingPullClient, but re-opens
    31  // the stream on a retryable error.
    32  type pullStream struct {
    33  	ctx    context.Context
    34  	open   func() (pb.Subscriber_StreamingPullClient, error)
    35  	cancel context.CancelFunc
    36  
    37  	mu  sync.Mutex
    38  	spc *pb.Subscriber_StreamingPullClient
    39  	err error // permanent error
    40  }
    41  
    42  // for testing
    43  type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error)
    44  
    45  func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
    46  	ctx = withSubscriptionKey(ctx, subName)
    47  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(subName))}
    48  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
    49  	ctx, cancel := context.WithCancel(ctx)
    50  	return &pullStream{
    51  		ctx:    ctx,
    52  		cancel: cancel,
    53  		open: func() (pb.Subscriber_StreamingPullClient, error) {
    54  			spc, err := streamingPull(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
    55  			if err == nil {
    56  				recordStat(ctx, StreamRequestCount, 1)
    57  				streamAckDeadline := int32(maxDurationPerLeaseExtension / time.Second)
    58  				// By default, maxDurationPerLeaseExtension, aka MaxExtensionPeriod, is disabled,
    59  				// so in these cases, use a healthy default of 60 seconds.
    60  				if streamAckDeadline <= 0 {
    61  					streamAckDeadline = 60
    62  				}
    63  				err = spc.Send(&pb.StreamingPullRequest{
    64  					Subscription:             subName,
    65  					StreamAckDeadlineSeconds: streamAckDeadline,
    66  					MaxOutstandingMessages:   int64(maxOutstandingMessages),
    67  					MaxOutstandingBytes:      int64(maxOutstandingBytes),
    68  				})
    69  			}
    70  			if err != nil {
    71  				return nil, err
    72  			}
    73  			return spc, nil
    74  		},
    75  	}
    76  }
    77  
    78  // get returns either a valid *StreamingPullClient (SPC), or a permanent error.
    79  // If the argument is nil, this is the first call for an RPC, and the current
    80  // SPC will be returned (or a new one will be opened). Otherwise, this call is a
    81  // request to re-open the stream because of a retryable error, and the argument
    82  // is a pointer to the SPC that returned the error.
    83  func (s *pullStream) get(spc *pb.Subscriber_StreamingPullClient) (*pb.Subscriber_StreamingPullClient, error) {
    84  	s.mu.Lock()
    85  	defer s.mu.Unlock()
    86  	// A stored error is permanent.
    87  	if s.err != nil {
    88  		return nil, s.err
    89  	}
    90  	// If the context is done, so are we.
    91  	s.err = s.ctx.Err()
    92  	if s.err != nil {
    93  		return nil, s.err
    94  	}
    95  
    96  	// If the current and argument SPCs differ, return the current one. This subsumes two cases:
    97  	// 1. We have an SPC and the caller is getting the stream for the first time.
    98  	// 2. The caller wants to retry, but they have an older SPC; we've already retried.
    99  	if spc != s.spc {
   100  		return s.spc, nil
   101  	}
   102  	// Either this is the very first call on this stream (s.spc == nil), or we have a valid
   103  	// retry request. Either way, open a new stream.
   104  	// The lock is held here for a long time, but it doesn't matter because no callers could get
   105  	// anything done anyway.
   106  	s.spc = new(pb.Subscriber_StreamingPullClient)
   107  	*s.spc, s.err = s.openWithRetry() // Any error from openWithRetry is permanent.
   108  	return s.spc, s.err
   109  }
   110  
   111  func (s *pullStream) openWithRetry() (pb.Subscriber_StreamingPullClient, error) {
   112  	r := defaultRetryer{}
   113  	for {
   114  		recordStat(s.ctx, StreamOpenCount, 1)
   115  		spc, err := s.open()
   116  		bo, shouldRetry := r.Retry(err)
   117  		if err != nil && shouldRetry {
   118  			recordStat(s.ctx, StreamRetryCount, 1)
   119  			if err := gax.Sleep(s.ctx, bo); err != nil {
   120  				return nil, err
   121  			}
   122  			continue
   123  		}
   124  		return spc, err
   125  	}
   126  }
   127  
   128  func (s *pullStream) call(f func(pb.Subscriber_StreamingPullClient) error, opts ...gax.CallOption) error {
   129  	var settings gax.CallSettings
   130  	for _, opt := range opts {
   131  		opt.Resolve(&settings)
   132  	}
   133  	var r gax.Retryer = &defaultRetryer{}
   134  	if settings.Retry != nil {
   135  		r = settings.Retry()
   136  	}
   137  
   138  	var (
   139  		spc *pb.Subscriber_StreamingPullClient
   140  		err error
   141  	)
   142  	for {
   143  		spc, err = s.get(spc)
   144  		if err != nil {
   145  			return err
   146  		}
   147  		start := time.Now()
   148  		err = f(*spc)
   149  		if err != nil {
   150  			bo, shouldRetry := r.Retry(err)
   151  			if shouldRetry {
   152  				recordStat(s.ctx, StreamRetryCount, 1)
   153  				if time.Since(start) < 30*time.Second { // don't sleep if we've been blocked for a while
   154  					if err := gax.Sleep(s.ctx, bo); err != nil {
   155  						return err
   156  					}
   157  				}
   158  				continue
   159  			}
   160  			s.mu.Lock()
   161  			s.err = err
   162  			s.mu.Unlock()
   163  		}
   164  		return err
   165  	}
   166  }
   167  
   168  func (s *pullStream) Send(req *pb.StreamingPullRequest) error {
   169  	return s.call(func(spc pb.Subscriber_StreamingPullClient) error {
   170  		recordStat(s.ctx, AckCount, int64(len(req.AckIds)))
   171  		zeroes := 0
   172  		for _, mds := range req.ModifyDeadlineSeconds {
   173  			if mds == 0 {
   174  				zeroes++
   175  			}
   176  		}
   177  		recordStat(s.ctx, NackCount, int64(zeroes))
   178  		recordStat(s.ctx, ModAckCount, int64(len(req.ModifyDeadlineSeconds)-zeroes))
   179  		recordStat(s.ctx, StreamRequestCount, 1)
   180  		return spc.Send(req)
   181  	})
   182  }
   183  
   184  func (s *pullStream) Recv() (*pb.StreamingPullResponse, error) {
   185  	var res *pb.StreamingPullResponse
   186  	err := s.call(func(spc pb.Subscriber_StreamingPullClient) error {
   187  		var err error
   188  		recordStat(s.ctx, StreamResponseCount, 1)
   189  		res, err = spc.Recv()
   190  		return err
   191  	}, gax.WithRetry(func() gax.Retryer { return &streamingPullRetryer{defaultRetryer: &defaultRetryer{}} }))
   192  	return res, err
   193  }
   194  
   195  func (s *pullStream) CloseSend() error {
   196  	err := s.call(func(spc pb.Subscriber_StreamingPullClient) error {
   197  		return spc.CloseSend()
   198  	})
   199  	s.mu.Lock()
   200  	s.err = io.EOF // should not be retried
   201  	s.mu.Unlock()
   202  	return err
   203  }
   204  

View as plain text