...

Source file src/cloud.google.com/go/pubsub/internal/scheduler/receive_scheduler_test.go

Documentation: cloud.google.com/go/pubsub/internal/scheduler

     1  // Copyright 2019 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package scheduler_test
    16  
    17  import (
    18  	"fmt"
    19  	"testing"
    20  	"time"
    21  
    22  	"cloud.google.com/go/pubsub/internal/scheduler"
    23  )
    24  
    25  type pair struct {
    26  	k string
    27  	v int
    28  }
    29  
    30  func TestReceiveScheduler_Put_Basic(t *testing.T) {
    31  	done := make(chan struct{})
    32  	defer close(done)
    33  
    34  	keysHandled := map[string]chan int{}
    35  	handle := func(itemi interface{}) {
    36  		item := itemi.(pair)
    37  		keysHandled[item.k] <- item.v
    38  	}
    39  
    40  	// If these values are too high, the race detector will fail with "race: limit on 8128 simultaneously alive goroutines is exceeded, dying"
    41  	numItems := 100
    42  	numKeys := 10
    43  
    44  	s := scheduler.NewReceiveScheduler(1)
    45  	defer s.Shutdown()
    46  	for ki := 0; ki < numKeys; ki++ {
    47  		k := fmt.Sprintf("some_key_%d", ki)
    48  		keysHandled[k] = make(chan int, numItems)
    49  	}
    50  
    51  	for ki := 0; ki < numKeys; ki++ {
    52  		k := fmt.Sprintf("some_key_%d", ki)
    53  		go func() {
    54  			for i := 0; i < numItems; i++ {
    55  				select {
    56  				case <-done:
    57  					return
    58  				default:
    59  				}
    60  				if err := s.Add(k, pair{k: k, v: i}, handle); err != nil {
    61  					t.Error(err)
    62  				}
    63  			}
    64  		}()
    65  	}
    66  
    67  	for ki := 0; ki < numKeys; ki++ {
    68  		k := fmt.Sprintf("some_key_%d", ki)
    69  		for want := 0; want < numItems; want++ {
    70  			select {
    71  			case got := <-keysHandled[k]:
    72  				if got != want {
    73  					t.Fatalf("%s: got %d, want %d", k, got, want)
    74  				}
    75  			case <-time.After(5 * time.Second):
    76  				t.Fatalf("%s: expected key %s - item %d to be handled but never was", k, k, want)
    77  			}
    78  		}
    79  	}
    80  }
    81  
    82  // Scheduler schedules many items of one key in order even when there are
    83  // many workers.
    84  func TestReceiveScheduler_Put_ManyWithOneKey(t *testing.T) {
    85  	done := make(chan struct{})
    86  	defer close(done)
    87  
    88  	recvd := make(chan int)
    89  	handle := func(itemi interface{}) {
    90  		recvd <- itemi.(int)
    91  	}
    92  
    93  	// If these values are too high, the race detector will fail with "race: limit on 8128 simultaneously alive goroutines is exceeded, dying"
    94  	numItems := 10000
    95  	s := scheduler.NewReceiveScheduler(10)
    96  	defer s.Shutdown()
    97  
    98  	go func() {
    99  		for i := 0; i < numItems; i++ {
   100  			select {
   101  			case <-done:
   102  				return
   103  			default:
   104  			}
   105  			if err := s.Add("some-key", i, handle); err != nil {
   106  				t.Error(err)
   107  			}
   108  		}
   109  	}()
   110  
   111  	for want := 0; want < numItems; want++ {
   112  		select {
   113  		case got := <-recvd:
   114  			if got != want {
   115  				t.Fatalf("got %d, want %d", got, want)
   116  			}
   117  		case <-time.After(5 * time.Second):
   118  			t.Fatalf("timed out waiting for item %d to be handled", want)
   119  		}
   120  	}
   121  }
   122  
   123  // FlushAndStop flushes all messages. (it does not wait for their completion)
   124  func TestReceiveScheduler_FlushAndStop(t *testing.T) {
   125  	for _, tc := range []struct {
   126  		name  string
   127  		input map[string][]int
   128  	}{
   129  		{
   130  			name:  "two messages with the same key",
   131  			input: map[string][]int{"foo": {1, 2}},
   132  		},
   133  		{
   134  			name:  "two messages with different keys",
   135  			input: map[string][]int{"foo": {1}, "bar": {2}},
   136  		},
   137  		{
   138  			name:  "two messages with no key",
   139  			input: map[string][]int{"": {1, 2}},
   140  		},
   141  	} {
   142  		t.Run(tc.name, func(t *testing.T) {
   143  			recvd := make(chan int, 10)
   144  			handle := func(itemi interface{}) {
   145  				recvd <- itemi.(int)
   146  			}
   147  			s := scheduler.NewReceiveScheduler(1)
   148  			for k, vs := range tc.input {
   149  				for _, v := range vs {
   150  					if err := s.Add(k, v, handle); err != nil {
   151  						t.Fatal(err)
   152  					}
   153  				}
   154  			}
   155  
   156  			go func() {
   157  				s.Shutdown()
   158  			}()
   159  
   160  			time.Sleep(10 * time.Millisecond)
   161  
   162  			select {
   163  			case <-recvd:
   164  			case <-time.After(time.Second):
   165  				t.Fatal("timed out waiting for first message to arrive")
   166  			}
   167  
   168  			select {
   169  			case <-recvd:
   170  			case <-time.After(time.Second):
   171  				t.Fatal("timed out waiting for second message to arrive")
   172  			}
   173  		})
   174  	}
   175  }
   176  

View as plain text