...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datasource/polling_data_source_test.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datasource

     1  package datasource
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"net/http/httptest"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
    11  
    12  	"github.com/launchdarkly/go-server-sdk-evaluation/v2/ldbuilders"
    13  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
    14  	"github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest"
    15  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    16  	"github.com/launchdarkly/go-server-sdk/v6/testhelpers/ldservices"
    17  
    18  	th "github.com/launchdarkly/go-test-helpers/v3"
    19  	"github.com/launchdarkly/go-test-helpers/v3/httphelpers"
    20  
    21  	"github.com/stretchr/testify/assert"
    22  )
    23  
    24  func TestPollingProcessorClosingItShouldNotBlock(t *testing.T) {
    25  	r := newMockRequestor()
    26  	defer r.Close()
    27  	r.requestAllRespCh <- mockRequestAllResponse{}
    28  
    29  	withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
    30  		p := newPollingProcessor(basicClientContext(), dataSourceUpdates, r, time.Minute)
    31  
    32  		p.Close()
    33  
    34  		closeWhenReady := make(chan struct{})
    35  		p.Start(closeWhenReady)
    36  
    37  		th.AssertChannelClosed(t, closeWhenReady, time.Second, "starting a closed processor shouldn't block")
    38  	})
    39  }
    40  
    41  func TestPollingProcessorInitialization(t *testing.T) {
    42  	flag := ldbuilders.NewFlagBuilder("flagkey").Version(1).Build()
    43  	segment := ldbuilders.NewSegmentBuilder("segmentkey").Version(1).Build()
    44  
    45  	r := newMockRequestor()
    46  	defer r.Close()
    47  	expectedData := sharedtest.NewDataSetBuilder().Flags(flag).Segments(segment)
    48  	resp := mockRequestAllResponse{data: expectedData.Build()}
    49  	r.requestAllRespCh <- resp
    50  
    51  	withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
    52  		p := newPollingProcessor(basicClientContext(), dataSourceUpdates, r, time.Millisecond*10)
    53  		defer p.Close()
    54  
    55  		closeWhenReady := make(chan struct{})
    56  		p.Start(closeWhenReady)
    57  
    58  		if !th.AssertChannelClosed(t, closeWhenReady, time.Second, "Failed to initialize") {
    59  			return
    60  		}
    61  
    62  		assert.True(t, p.IsInitialized())
    63  
    64  		dataSourceUpdates.DataStore.WaitForInit(t, expectedData.ToServerSDKData(), 2*time.Second)
    65  
    66  		for i := 0; i < 2; i++ {
    67  			r.requestAllRespCh <- resp
    68  			if _, ok, closed := th.TryReceive(r.pollsCh, time.Second); !ok || closed {
    69  				assert.Fail(t, "Expected 2 polls", "but only got %d", i)
    70  				return
    71  			}
    72  		}
    73  	})
    74  }
    75  
    76  func TestPollingProcessorRecoverableErrors(t *testing.T) {
    77  	for _, statusCode := range []int{400, 408, 429, 500, 503} {
    78  		t.Run(fmt.Sprintf("HTTP %d", statusCode), func(t *testing.T) {
    79  			testPollingProcessorRecoverableError(
    80  				t,
    81  				httpStatusError{Code: statusCode},
    82  				func(errorInfo interfaces.DataSourceErrorInfo) {
    83  					assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, errorInfo.Kind)
    84  					assert.Equal(t, statusCode, errorInfo.StatusCode)
    85  				},
    86  			)
    87  		})
    88  	}
    89  
    90  	t.Run("network error", func(t *testing.T) {
    91  		testPollingProcessorRecoverableError(
    92  			t,
    93  			errors.New("arbitrary error"),
    94  			func(errorInfo interfaces.DataSourceErrorInfo) {
    95  				assert.Equal(t, interfaces.DataSourceErrorKindNetworkError, errorInfo.Kind)
    96  				assert.Equal(t, "arbitrary error", errorInfo.Message)
    97  			},
    98  		)
    99  	})
   100  
   101  	t.Run("malformed data", func(t *testing.T) {
   102  		testPollingProcessorRecoverableError(
   103  			t,
   104  			malformedJSONError{innerError: errors.New("sorry")},
   105  			func(errorInfo interfaces.DataSourceErrorInfo) {
   106  				assert.Equal(t, string(interfaces.DataSourceErrorKindInvalidData), string(errorInfo.Kind))
   107  				assert.Contains(t, errorInfo.Message, "sorry")
   108  			},
   109  		)
   110  	})
   111  }
   112  
   113  func testPollingProcessorRecoverableError(t *testing.T, err error, verifyError func(interfaces.DataSourceErrorInfo)) {
   114  	req := newMockRequestor()
   115  	defer req.Close()
   116  
   117  	req.requestAllRespCh <- mockRequestAllResponse{err: err}
   118  
   119  	withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
   120  		p := newPollingProcessor(basicClientContext(), dataSourceUpdates, req, time.Millisecond*10)
   121  		defer p.Close()
   122  		closeWhenReady := make(chan struct{})
   123  		p.Start(closeWhenReady)
   124  
   125  		// wait for first poll
   126  		<-req.pollsCh
   127  
   128  		status := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateInterrupted)
   129  		verifyError(status.LastError)
   130  
   131  		if !th.AssertChannelNotClosed(t, closeWhenReady, 0) {
   132  			t.FailNow()
   133  		}
   134  
   135  		req.requestAllRespCh <- mockRequestAllResponse{}
   136  
   137  		// wait for second poll
   138  		th.RequireValue(t, req.pollsCh, time.Second, "failed to retry")
   139  
   140  		waitForReadyWithTimeout(t, closeWhenReady, time.Second)
   141  		_ = dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateValid)
   142  	})
   143  }
   144  
   145  func TestPollingProcessorUnrecoverableErrors(t *testing.T) {
   146  	for _, statusCode := range []int{401, 403, 405} {
   147  		t.Run(fmt.Sprintf("HTTP %d", statusCode), func(t *testing.T) {
   148  			testPollingProcessorUnrecoverableError(
   149  				t,
   150  				httpStatusError{Code: statusCode},
   151  				func(errorInfo interfaces.DataSourceErrorInfo) {
   152  					assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, errorInfo.Kind)
   153  					assert.Equal(t, statusCode, errorInfo.StatusCode)
   154  				},
   155  			)
   156  		})
   157  	}
   158  }
   159  
   160  func testPollingProcessorUnrecoverableError(
   161  	t *testing.T,
   162  	err error,
   163  	verifyError func(interfaces.DataSourceErrorInfo),
   164  ) {
   165  	req := newMockRequestor()
   166  	defer req.Close()
   167  
   168  	req.requestAllRespCh <- mockRequestAllResponse{err: err}
   169  	req.requestAllRespCh <- mockRequestAllResponse{} // we shouldn't get a second request, but just in case
   170  
   171  	withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
   172  		p := newPollingProcessor(basicClientContext(), dataSourceUpdates, req, time.Millisecond*10)
   173  		defer p.Close()
   174  		closeWhenReady := make(chan struct{})
   175  		p.Start(closeWhenReady)
   176  
   177  		// wait for first poll
   178  		<-req.pollsCh
   179  
   180  		waitForReadyWithTimeout(t, closeWhenReady, time.Second)
   181  
   182  		status := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateOff)
   183  		verifyError(status.LastError)
   184  		assert.Len(t, req.pollsCh, 0)
   185  	})
   186  }
   187  
   188  func TestPollingProcessorUsesHTTPClientFactory(t *testing.T) {
   189  	data := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 2))
   190  	pollHandler, requestsCh := httphelpers.RecordingHandler(ldservices.ServerSidePollingServiceHandler(data))
   191  	httphelpers.WithServer(pollHandler, func(ts *httptest.Server) {
   192  		withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
   193  			httpClientFactory := urlAppendingHTTPClientFactory("/transformed")
   194  			httpConfig := subsystems.HTTPConfiguration{CreateHTTPClient: httpClientFactory}
   195  			context := sharedtest.NewTestContext(testSDKKey, &httpConfig, nil)
   196  
   197  			p := NewPollingProcessor(context, dataSourceUpdates, ts.URL, time.Minute*30)
   198  			defer p.Close()
   199  			closeWhenReady := make(chan struct{})
   200  			p.Start(closeWhenReady)
   201  
   202  			r := <-requestsCh
   203  
   204  			assert.Equal(t, "/sdk/latest-all/transformed", r.Request.URL.Path)
   205  		})
   206  	})
   207  }
   208  

View as plain text