...

Source file src/cloud.google.com/go/pubsub/internal/scheduler/publish_scheduler_test.go

Documentation: cloud.google.com/go/pubsub/internal/scheduler

     1  // Copyright 2019 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package scheduler_test
    16  
    17  import (
    18  	"fmt"
    19  	"testing"
    20  	"time"
    21  
    22  	"cloud.google.com/go/pubsub/internal/scheduler"
    23  )
    24  
    25  func TestPublishScheduler_Put_Basic(t *testing.T) {
    26  	done := make(chan struct{})
    27  	defer close(done)
    28  
    29  	keysHandled := map[string]chan int{}
    30  	handle := func(itemi interface{}) {
    31  		items := itemi.([]pair)
    32  		for _, item := range items {
    33  			keysHandled[item.k] <- item.v
    34  		}
    35  	}
    36  	s := scheduler.NewPublishScheduler(2, handle)
    37  	defer s.FlushAndStop()
    38  
    39  	// If these values are too high, the race detector will fail with
    40  	// "race: limit on 8128 simultaneously alive goroutines is exceeded, dying".
    41  	numItems := 100
    42  	numKeys := 10
    43  
    44  	for ki := 0; ki < numKeys; ki++ {
    45  		k := fmt.Sprintf("some_key_%d", ki)
    46  		keysHandled[k] = make(chan int, numItems)
    47  	}
    48  
    49  	for ki := 0; ki < numKeys; ki++ {
    50  		k := fmt.Sprintf("some_key_%d", ki)
    51  		go func() {
    52  			for i := 0; i < numItems; i++ {
    53  				select {
    54  				case <-done:
    55  					return
    56  				default:
    57  				}
    58  				if err := s.Add(k, pair{k, i}, 1); err != nil {
    59  					t.Error(err)
    60  				}
    61  			}
    62  		}()
    63  	}
    64  
    65  	for ki := 0; ki < numKeys; ki++ {
    66  		k := fmt.Sprintf("some_key_%d", ki)
    67  		for want := 0; want < numItems; want++ {
    68  			select {
    69  			case got := <-keysHandled[k]:
    70  				if got != want {
    71  					t.Fatalf("%s: got %d, want %d", k, got, want)
    72  				}
    73  			case <-time.After(5 * time.Second):
    74  				t.Fatalf("%s: expected key %s - item %d to be handled but never was", k, k, want)
    75  			}
    76  		}
    77  	}
    78  }
    79  
    80  // Scheduler schedules many items of one key in order even when there are
    81  // many workers.
    82  func TestPublishScheduler_Put_ManyWithOneKey(t *testing.T) {
    83  	done := make(chan struct{})
    84  	defer close(done)
    85  
    86  	recvd := make(chan int)
    87  	handle := func(itemi interface{}) {
    88  		items := itemi.([]int)
    89  		for _, item := range items {
    90  			recvd <- item
    91  		}
    92  	}
    93  	s := scheduler.NewPublishScheduler(10, handle)
    94  	defer s.FlushAndStop()
    95  
    96  	// If these values are too high, the race detector will fail with
    97  	// "race: limit on 8128 simultaneously alive goroutines is exceeded, dying".
    98  	numItems := 1000
    99  
   100  	go func() {
   101  		for i := 0; i < numItems; i++ {
   102  			select {
   103  			case <-done:
   104  				return
   105  			default:
   106  			}
   107  			if err := s.Add("some-key", i, 1); err != nil {
   108  				t.Error(err)
   109  			}
   110  		}
   111  	}()
   112  
   113  	for want := 0; want < numItems; want++ {
   114  		select {
   115  		case got := <-recvd:
   116  			if got != want {
   117  				t.Fatalf("got %d, want %d", got, want)
   118  			}
   119  		case <-time.After(5 * time.Second):
   120  			t.Fatalf("timed out waiting for item %d to be handled", want)
   121  		}
   122  	}
   123  }
   124  
   125  func TestPublishScheduler_DoesntRaceWithPublisher(t *testing.T) {
   126  	done := make(chan struct{})
   127  	defer close(done)
   128  
   129  	keysHandled := map[string]chan int{}
   130  	handle := func(itemi interface{}) {
   131  		items := itemi.([]pair)
   132  		for _, item := range items {
   133  			keysHandled[item.k] <- item.v
   134  		}
   135  	}
   136  	s := scheduler.NewPublishScheduler(2, handle)
   137  	defer s.FlushAndStop()
   138  
   139  	// If these values are too high, the race detector will fail with
   140  	// "race: limit on 8128 simultaneously alive goroutines is exceeded, dying".
   141  	numItems := 100
   142  	numKeys := 10
   143  
   144  	for ki := 0; ki < numKeys; ki++ {
   145  		k := fmt.Sprintf("some_key_%d", ki)
   146  		keysHandled[k] = make(chan int, numItems)
   147  	}
   148  
   149  	for ki := 0; ki < numKeys; ki++ {
   150  		k := fmt.Sprintf("some_key_%d", ki)
   151  		go func() {
   152  			for i := 0; i < numItems; i++ {
   153  				select {
   154  				case <-done:
   155  					return
   156  				default:
   157  				}
   158  				if err := s.Add(k, pair{k, i}, 1); err != nil {
   159  					t.Error(err)
   160  				}
   161  			}
   162  		}()
   163  	}
   164  
   165  	for ki := 0; ki < numKeys; ki++ {
   166  		k := fmt.Sprintf("some_key_%d", ki)
   167  		for want := 0; want < numItems; want++ {
   168  			select {
   169  			case got := <-keysHandled[k]:
   170  				if got != want {
   171  					t.Fatalf("%s: got %d, want %d", k, got, want)
   172  				}
   173  			case <-time.After(5 * time.Second):
   174  				t.Fatalf("%s: expected key %s - item %d to be handled but never was", k, k, want)
   175  			}
   176  		}
   177  	}
   178  }
   179  
   180  // FlushAndStop blocks until all messages are processed.
   181  func TestPublishScheduler_FlushAndStop(t *testing.T) {
   182  	for _, tc := range []struct {
   183  		name  string
   184  		input map[string][]int
   185  	}{
   186  		{
   187  			name:  "two messages with the same key",
   188  			input: map[string][]int{"foo": {1, 2}},
   189  		},
   190  		{
   191  			name:  "two messages with different keys",
   192  			input: map[string][]int{"foo": {1}, "bar": {2}},
   193  		},
   194  		{
   195  			name:  "two messages with no key",
   196  			input: map[string][]int{"": {1, 2}},
   197  		},
   198  	} {
   199  		t.Run(tc.name, func(t *testing.T) {
   200  			recvd := make(chan int)
   201  			handle := func(itemi interface{}) {
   202  				for _, v := range itemi.([]int) {
   203  					recvd <- v
   204  				}
   205  			}
   206  			s := scheduler.NewPublishScheduler(1, handle)
   207  			for k, vs := range tc.input {
   208  				for _, v := range vs {
   209  					if err := s.Add(k, v, 1); err != nil {
   210  						t.Fatal(err)
   211  					}
   212  				}
   213  			}
   214  
   215  			doneFlushing := make(chan struct{})
   216  			go func() {
   217  				s.FlushAndStop()
   218  				close(doneFlushing)
   219  			}()
   220  
   221  			time.Sleep(10 * time.Millisecond)
   222  
   223  			select {
   224  			case <-doneFlushing:
   225  				t.Fatal("expected FlushAndStop to block until all messages handled, but it didn't")
   226  			default:
   227  			}
   228  
   229  			select {
   230  			case <-recvd:
   231  			case <-time.After(time.Second):
   232  				t.Fatal("timed out waiting for first message to arrive")
   233  			}
   234  
   235  			select {
   236  			case <-recvd:
   237  			case <-time.After(time.Second):
   238  				t.Fatal("timed out waiting for second message to arrive")
   239  			}
   240  
   241  			select {
   242  			case <-doneFlushing:
   243  			case <-time.After(time.Second):
   244  				t.Fatal("timed out waiting for FlushAndStop to finish blocking")
   245  			}
   246  		})
   247  	}
   248  }
   249  

View as plain text