...

Source file src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go

Documentation: k8s.io/kube-aggregator/pkg/apiserver

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"net/http"
    24  	"net/http/httptest"
    25  	"strconv"
    26  	"strings"
    27  	"sync"
    28  	"sync/atomic"
    29  	"testing"
    30  	"time"
    31  
    32  	"github.com/google/go-cmp/cmp"
    33  	fuzz "github.com/google/gofuzz"
    34  	"github.com/stretchr/testify/require"
    35  
    36  	apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
    37  	apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/runtime"
    40  	"k8s.io/apimachinery/pkg/runtime/schema"
    41  	apidiscoveryv2scheme "k8s.io/apiserver/pkg/apis/apidiscovery/v2"
    42  	"k8s.io/apiserver/pkg/endpoints"
    43  	"k8s.io/apiserver/pkg/endpoints/discovery"
    44  	discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
    45  	scheme "k8s.io/client-go/kubernetes/scheme"
    46  	"k8s.io/client-go/tools/cache"
    47  	"k8s.io/client-go/util/workqueue"
    48  	apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    49  )
    50  
    51  func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager {
    52  	dm := NewDiscoveryManager(rm).(*discoveryManager)
    53  	dm.dirtyAPIServiceQueue = newCompleterWorkqueue(dm.dirtyAPIServiceQueue)
    54  
    55  	return dm
    56  }
    57  
    58  // Returns true if the queue of services to sync is complete which means
    59  // everything has been reconciled and placed into merged document
    60  func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool {
    61  	return cache.WaitForCacheSync(stopCh, func() bool {
    62  		return dm.dirtyAPIServiceQueue.(*completerWorkqueue).isComplete()
    63  	})
    64  }
    65  
    66  // Test that the discovery manager starts and aggregates from two local API services
    67  func TestBasic(t *testing.T) {
    68  	service1 := discoveryendpoint.NewResourceManager("apis")
    69  	service2 := discoveryendpoint.NewResourceManager("apis")
    70  	service3 := discoveryendpoint.NewResourceManager("apis")
    71  	apiGroup1 := fuzzAPIGroups(2, 5, 25)
    72  	apiGroup2 := fuzzAPIGroups(2, 5, 50)
    73  	apiGroup3 := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
    74  		{
    75  			ObjectMeta: metav1.ObjectMeta{Name: "weird.example.com"},
    76  			Versions: []apidiscoveryv2.APIVersionDiscovery{
    77  				{
    78  					Version:   "v1",
    79  					Freshness: "Current",
    80  					Resources: []apidiscoveryv2.APIResourceDiscovery{
    81  						{
    82  							Resource: "parent-missing-kind",
    83  							Subresources: []apidiscoveryv2.APISubresourceDiscovery{
    84  								{Subresource: "subresource-missing-kind"},
    85  							},
    86  						},
    87  						{
    88  							Resource:     "parent-empty-kind",
    89  							ResponseKind: &metav1.GroupVersionKind{},
    90  							Subresources: []apidiscoveryv2.APISubresourceDiscovery{
    91  								{Subresource: "subresource-empty-kind", ResponseKind: &metav1.GroupVersionKind{}},
    92  							},
    93  						},
    94  						{
    95  							Resource:     "parent-with-kind",
    96  							ResponseKind: &metav1.GroupVersionKind{Kind: "ParentWithKind"},
    97  							Subresources: []apidiscoveryv2.APISubresourceDiscovery{
    98  								{Subresource: "subresource-with-kind", ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithKind"}},
    99  							},
   100  						},
   101  					},
   102  				},
   103  			},
   104  		},
   105  	}}
   106  	apiGroup3WithFixup := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
   107  		{
   108  			ObjectMeta: metav1.ObjectMeta{Name: "weird.example.com"},
   109  			Versions: []apidiscoveryv2.APIVersionDiscovery{
   110  				{
   111  					Version:   "v1",
   112  					Freshness: "Current",
   113  					Resources: []apidiscoveryv2.APIResourceDiscovery{
   114  						{
   115  							Resource:     "parent-missing-kind",
   116  							ResponseKind: &metav1.GroupVersionKind{}, // defaulted by aggregator
   117  							Subresources: []apidiscoveryv2.APISubresourceDiscovery{
   118  								{Subresource: "subresource-missing-kind", ResponseKind: &metav1.GroupVersionKind{}}, // defaulted by aggregator
   119  							},
   120  						},
   121  						{
   122  							Resource:     "parent-empty-kind",
   123  							ResponseKind: &metav1.GroupVersionKind{},
   124  							Subresources: []apidiscoveryv2.APISubresourceDiscovery{
   125  								{Subresource: "subresource-empty-kind", ResponseKind: &metav1.GroupVersionKind{}},
   126  							},
   127  						},
   128  						{
   129  							Resource:     "parent-with-kind",
   130  							ResponseKind: &metav1.GroupVersionKind{Kind: "ParentWithKind"},
   131  							Subresources: []apidiscoveryv2.APISubresourceDiscovery{
   132  								{Subresource: "subresource-with-kind", ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithKind"}},
   133  							},
   134  						},
   135  					},
   136  				},
   137  			},
   138  		},
   139  	}}
   140  	service1.SetGroups(apiGroup1.Items)
   141  	service2.SetGroups(apiGroup2.Items)
   142  	service3.SetGroups(apiGroup3.Items)
   143  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   144  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   145  
   146  	for _, g := range apiGroup1.Items {
   147  		versionPriority := int32(len(g.Versions) + 1)
   148  		for _, v := range g.Versions {
   149  			versionPriority--
   150  			aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   151  				ObjectMeta: metav1.ObjectMeta{
   152  					Name: v.Version + "." + g.Name,
   153  				},
   154  				Spec: apiregistrationv1.APIServiceSpec{
   155  					Group:           g.Name,
   156  					Version:         v.Version,
   157  					VersionPriority: versionPriority,
   158  					Service: &apiregistrationv1.ServiceReference{
   159  						Name: "service1",
   160  					},
   161  				},
   162  			}, service1)
   163  		}
   164  	}
   165  
   166  	for _, g := range apiGroup2.Items {
   167  		versionPriority := int32(len(g.Versions) + 1)
   168  		for _, v := range g.Versions {
   169  			versionPriority--
   170  			aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   171  				ObjectMeta: metav1.ObjectMeta{
   172  					Name: v.Version + "." + g.Name,
   173  				},
   174  				Spec: apiregistrationv1.APIServiceSpec{
   175  					Group:           g.Name,
   176  					Version:         v.Version,
   177  					VersionPriority: versionPriority,
   178  					Service: &apiregistrationv1.ServiceReference{
   179  						Name: "service2",
   180  					},
   181  				},
   182  			}, service2)
   183  		}
   184  	}
   185  
   186  	for _, g := range apiGroup3.Items {
   187  		versionPriority := int32(len(g.Versions) + 1)
   188  		for _, v := range g.Versions {
   189  			versionPriority--
   190  			aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   191  				ObjectMeta: metav1.ObjectMeta{
   192  					Name: v.Version + "." + g.Name,
   193  				},
   194  				Spec: apiregistrationv1.APIServiceSpec{
   195  					Group:           g.Name,
   196  					Version:         v.Version,
   197  					VersionPriority: versionPriority,
   198  					Service: &apiregistrationv1.ServiceReference{
   199  						Name: "service3",
   200  					},
   201  				},
   202  			}, service3)
   203  		}
   204  	}
   205  
   206  	testCtx, testCancel := context.WithCancel(context.Background())
   207  	defer testCancel()
   208  
   209  	go aggregatedManager.Run(testCtx.Done(), nil)
   210  
   211  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   212  
   213  	response, _, parsed := fetchPath(aggregatedResourceManager, "")
   214  	if response.StatusCode != 200 {
   215  		t.Fatalf("unexpected status code %d", response.StatusCode)
   216  	}
   217  	checkAPIGroups(t, apiGroup1, parsed)
   218  	checkAPIGroups(t, apiGroup2, parsed)
   219  	checkAPIGroups(t, apiGroup3WithFixup, parsed)
   220  }
   221  
   222  func checkAPIGroups(t *testing.T, api apidiscoveryv2.APIGroupDiscoveryList, response *apidiscoveryv2.APIGroupDiscoveryList) {
   223  	t.Helper()
   224  	if len(response.Items) < len(api.Items) {
   225  		t.Errorf("expected to check for at least %d groups, only have %d groups in response", len(api.Items), len(response.Items))
   226  	}
   227  	for _, knownGroup := range api.Items {
   228  		found := false
   229  		for _, possibleGroup := range response.Items {
   230  			if knownGroup.Name == possibleGroup.Name {
   231  				t.Logf("found %s", knownGroup.Name)
   232  				found = true
   233  				diff := cmp.Diff(knownGroup, possibleGroup)
   234  				if len(diff) > 0 {
   235  					t.Error(diff)
   236  				}
   237  			}
   238  		}
   239  		if found == false {
   240  			t.Errorf("could not find %s", knownGroup.Name)
   241  		}
   242  	}
   243  }
   244  
   245  // TestInitialRunHasAllAPIServices tests that when discovery is ready, all APIService
   246  // are present and ones that have not synced are in the list as Stale.
   247  func TestInitialRunHasAllAPIServices(t *testing.T) {
   248  	neverReturnCh := make(chan struct{})
   249  	defer close(neverReturnCh)
   250  	service := discoveryendpoint.NewResourceManager("apis")
   251  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   252  
   253  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   254  
   255  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   256  		ObjectMeta: metav1.ObjectMeta{
   257  			Name: "v1.stable.example.com",
   258  		},
   259  		Spec: apiregistrationv1.APIServiceSpec{
   260  			Group:   "stable.example.com",
   261  			Version: "v1",
   262  			Service: &apiregistrationv1.ServiceReference{
   263  				Name: "test-service",
   264  			},
   265  		},
   266  	}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   267  		<-neverReturnCh
   268  		service.ServeHTTP(w, r)
   269  	}))
   270  	testCtx, cancel := context.WithCancel(context.Background())
   271  	defer cancel()
   272  
   273  	initialSyncedCh := make(chan struct{})
   274  	go aggregatedManager.Run(testCtx.Done(), initialSyncedCh)
   275  	select {
   276  	case <-initialSyncedCh:
   277  	case <-time.After(10 * time.Second):
   278  		t.Fatal("timed out waiting for initial sync")
   279  	}
   280  
   281  	response, _, parsed := fetchPath(aggregatedResourceManager, "")
   282  	if response.StatusCode != 200 {
   283  		t.Fatalf("unexpected status code %d", response.StatusCode)
   284  	}
   285  
   286  	apiGroup := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
   287  		{
   288  			ObjectMeta: metav1.ObjectMeta{Name: "stable.example.com"},
   289  			Versions: []apidiscoveryv2.APIVersionDiscovery{
   290  				{
   291  					Version:   "v1",
   292  					Freshness: "Stale",
   293  				},
   294  			},
   295  		},
   296  	}}
   297  
   298  	checkAPIGroups(t, apiGroup, parsed)
   299  }
   300  
   301  func TestServiceGC(t *testing.T) {
   302  	service := discoveryendpoint.NewResourceManager("apis")
   303  
   304  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   305  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   306  	testCtx, cancel := context.WithCancel(context.Background())
   307  	defer cancel()
   308  
   309  	go aggregatedManager.Run(testCtx.Done(), nil)
   310  
   311  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   312  		ObjectMeta: metav1.ObjectMeta{
   313  			Name: "v1.stable.example.com",
   314  		},
   315  		Spec: apiregistrationv1.APIServiceSpec{
   316  			Group:   "stable.example.com",
   317  			Version: "v1",
   318  			Service: &apiregistrationv1.ServiceReference{
   319  				Name: "test-service",
   320  			},
   321  		},
   322  	}, service)
   323  
   324  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   325  
   326  	// Lookup size of cache
   327  	getCacheLen := func() int {
   328  		aggregatedManager.resultsLock.Lock()
   329  		defer aggregatedManager.resultsLock.Unlock()
   330  		return len(aggregatedManager.cachedResults)
   331  	}
   332  
   333  	require.Equal(t, 1, getCacheLen())
   334  
   335  	// Change the service of the same APIService a bit to create duplicate entry
   336  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   337  		ObjectMeta: metav1.ObjectMeta{
   338  			Name: "v1.stable.example.com",
   339  		},
   340  		Spec: apiregistrationv1.APIServiceSpec{
   341  			Group:   "stable.example.com",
   342  			Version: "v1",
   343  			Service: &apiregistrationv1.ServiceReference{
   344  				Name: "test-service-changed",
   345  			},
   346  		},
   347  	}, service)
   348  
   349  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   350  	require.Equal(t, 1, getCacheLen())
   351  }
   352  
   353  // TestV2Beta1Skew tests that aggregated apiservers that only serve V2Beta1
   354  // are still supported
   355  func TestV2Beta1Skew(t *testing.T) {
   356  	apiGroup := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
   357  		{
   358  			ObjectMeta: metav1.ObjectMeta{Name: "stable.example.com"},
   359  			Versions: []apidiscoveryv2.APIVersionDiscovery{
   360  				{
   361  					Version:   "v1",
   362  					Freshness: "Current",
   363  					Resources: []apidiscoveryv2.APIResourceDiscovery{
   364  						{
   365  							Resource:     "parent-with-kind",
   366  							ResponseKind: &metav1.GroupVersionKind{Kind: "ParentWithKind"},
   367  							Subresources: []apidiscoveryv2.APISubresourceDiscovery{
   368  								{Subresource: "subresource-with-kind", ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithKind"}},
   369  							},
   370  						},
   371  					},
   372  				},
   373  			},
   374  		},
   375  	}}
   376  
   377  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   378  
   379  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   380  
   381  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   382  		ObjectMeta: metav1.ObjectMeta{
   383  			Name: "v1.stable.example.com",
   384  		},
   385  		Spec: apiregistrationv1.APIServiceSpec{
   386  			Group:   "stable.example.com",
   387  			Version: "v1",
   388  			Service: &apiregistrationv1.ServiceReference{
   389  				Name: "test-service",
   390  			},
   391  		},
   392  	}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   393  		// Force a v2beta1 response from the aggregated apiserver
   394  		v2b := apidiscoveryv2beta1.APIGroupDiscoveryList{}
   395  		err := apidiscoveryv2scheme.Convertv2APIGroupDiscoveryListTov2beta1APIGroupDiscoveryList(&apiGroup, &v2b, nil)
   396  		require.NoError(t, err)
   397  		converted, err := json.Marshal(v2b)
   398  		require.NoError(t, err)
   399  		w.Header().Set("Content-Type", "application/json;"+"g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
   400  		w.WriteHeader(200)
   401  		_, err = w.Write(converted)
   402  		require.NoError(t, err)
   403  	}))
   404  	testCtx, cancel := context.WithCancel(context.Background())
   405  	defer cancel()
   406  
   407  	go aggregatedManager.Run(testCtx.Done(), nil)
   408  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   409  
   410  	response, _, parsed := fetchPath(aggregatedResourceManager, "")
   411  	if response.StatusCode != 200 {
   412  		t.Fatalf("unexpected status code %d", response.StatusCode)
   413  	}
   414  
   415  	checkAPIGroups(t, apiGroup, parsed)
   416  }
   417  
   418  // Test that a handler associated with an APIService gets pinged after the
   419  // APIService has been marked as dirty
   420  func TestDirty(t *testing.T) {
   421  	var pinged atomic.Bool
   422  	service := discoveryendpoint.NewResourceManager("apis")
   423  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   424  
   425  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   426  
   427  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   428  		ObjectMeta: metav1.ObjectMeta{
   429  			Name: "v1.stable.example.com",
   430  		},
   431  		Spec: apiregistrationv1.APIServiceSpec{
   432  			Group:   "stable.example.com",
   433  			Version: "v1",
   434  			Service: &apiregistrationv1.ServiceReference{
   435  				Name: "test-service",
   436  			},
   437  		},
   438  	}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   439  		pinged.Store(true)
   440  		service.ServeHTTP(w, r)
   441  	}))
   442  	testCtx, cancel := context.WithCancel(context.Background())
   443  	defer cancel()
   444  
   445  	go aggregatedManager.Run(testCtx.Done(), nil)
   446  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   447  
   448  	// immediately check for ping, since Run() should block for local services
   449  	if !pinged.Load() {
   450  		t.Errorf("service handler never pinged")
   451  	}
   452  }
   453  
   454  // Shows that waitForQueueComplete also waits for syncing to
   455  // complete by artificially making the sync handler take a long time
   456  func TestWaitForSync(t *testing.T) {
   457  	pinged := atomic.Bool{}
   458  	service := discoveryendpoint.NewResourceManager("apis")
   459  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   460  
   461  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   462  
   463  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   464  		ObjectMeta: metav1.ObjectMeta{
   465  			Name: "v1.stable.example.com",
   466  		},
   467  		Spec: apiregistrationv1.APIServiceSpec{
   468  			Group:   "stable.example.com",
   469  			Version: "v1",
   470  			Service: &apiregistrationv1.ServiceReference{
   471  				Name: "test-service",
   472  			},
   473  		},
   474  	}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   475  		time.Sleep(3 * time.Second)
   476  		pinged.Store(true)
   477  		service.ServeHTTP(w, r)
   478  	}))
   479  	testCtx, cancel := context.WithCancel(context.Background())
   480  	defer cancel()
   481  
   482  	go aggregatedManager.Run(testCtx.Done(), nil)
   483  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   484  
   485  	// immediately check for ping, since Run() should block for local services
   486  	if !pinged.Load() {
   487  		t.Errorf("service handler never pinged")
   488  	}
   489  }
   490  
   491  // Show that an APIService can be removed and that its group no longer remains
   492  // if there are no versions
   493  func TestRemoveAPIService(t *testing.T) {
   494  	aggyService := discoveryendpoint.NewResourceManager("apis")
   495  	service := discoveryendpoint.NewResourceManager("apis")
   496  	apiGroup := fuzzAPIGroups(2, 3, 10)
   497  	service.SetGroups(apiGroup.Items)
   498  
   499  	var apiServices []*apiregistrationv1.APIService
   500  	for _, g := range apiGroup.Items {
   501  		for _, v := range g.Versions {
   502  			apiservice := &apiregistrationv1.APIService{
   503  				ObjectMeta: metav1.ObjectMeta{
   504  					Name: v.Version + "." + g.Name,
   505  				},
   506  				Spec: apiregistrationv1.APIServiceSpec{
   507  					Group:   g.Name,
   508  					Version: v.Version,
   509  					Service: &apiregistrationv1.ServiceReference{
   510  						Namespace: "serviceNamespace",
   511  						Name:      "serviceName",
   512  					},
   513  				},
   514  			}
   515  
   516  			apiServices = append(apiServices, apiservice)
   517  		}
   518  	}
   519  
   520  	aggregatedManager := newDiscoveryManager(aggyService)
   521  
   522  	for _, s := range apiServices {
   523  		aggregatedManager.AddAPIService(s, service)
   524  	}
   525  
   526  	testCtx, testCancel := context.WithCancel(context.Background())
   527  	defer testCancel()
   528  
   529  	go aggregatedManager.Run(testCtx.Done(), nil)
   530  
   531  	for _, s := range apiServices {
   532  		aggregatedManager.RemoveAPIService(s.Name)
   533  	}
   534  
   535  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   536  
   537  	response, _, parsed := fetchPath(aggyService, "")
   538  	if response.StatusCode != 200 {
   539  		t.Fatalf("unexpected status code %d", response.StatusCode)
   540  	}
   541  	if len(parsed.Items) > 0 {
   542  		t.Errorf("expected to find no groups after service deletion (got %d groups)", len(parsed.Items))
   543  	}
   544  }
   545  
   546  func TestLegacyFallbackNoCache(t *testing.T) {
   547  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   548  	rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
   549  
   550  	legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
   551  		Name: "stable.example.com",
   552  		PreferredVersion: metav1.GroupVersionForDiscovery{
   553  			GroupVersion: "stable.example.com/v1",
   554  			Version:      "v1",
   555  		},
   556  		Versions: []metav1.GroupVersionForDiscovery{
   557  			{
   558  				GroupVersion: "stable.example.com/v1",
   559  				Version:      "v1",
   560  			},
   561  			{
   562  				GroupVersion: "stable.example.com/v1beta1",
   563  				Version:      "v1beta1",
   564  			},
   565  			{
   566  				GroupVersion: "stable.example.com/v1alpha1",
   567  				Version:      "v1alpha1",
   568  			},
   569  			{
   570  				GroupVersion: "stable.example.com/v2alpha1",
   571  				Version:      "v2alpha1",
   572  			},
   573  		},
   574  	})
   575  
   576  	generateVersionResource := func(version string) metav1.APIResource {
   577  		return metav1.APIResource{
   578  			Name:         "foos",
   579  			SingularName: "foo",
   580  			Group:        "stable.example.com",
   581  			Version:      version,
   582  			Namespaced:   false,
   583  			Kind:         "Foo",
   584  			Verbs:        []string{"get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"},
   585  			Categories:   []string{"all"},
   586  		}
   587  	}
   588  
   589  	resources := map[string]metav1.APIResource{
   590  		"v1":       generateVersionResource("v1"),
   591  		"v1beta1":  generateVersionResource("v1beta1"),
   592  		"v1alpha1": generateVersionResource("v1alpha1"),
   593  	}
   594  
   595  	legacyResourceHandlerV1 := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
   596  		Group:   "stable.example.com",
   597  		Version: "v1",
   598  	}, discovery.APIResourceListerFunc(func() []metav1.APIResource {
   599  		return []metav1.APIResource{
   600  			resources["v1"],
   601  		}
   602  	}))
   603  
   604  	legacyResourceHandlerV1Beta1 := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
   605  		Group:   "stable.example.com",
   606  		Version: "v1beta1",
   607  	}, discovery.APIResourceListerFunc(func() []metav1.APIResource {
   608  		return []metav1.APIResource{
   609  			resources["v1beta1"],
   610  		}
   611  	}))
   612  
   613  	legacyResourceHandlerV1Alpha1 := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
   614  		Group:   "stable.example.com",
   615  		Version: "v1alpha1",
   616  	}, discovery.APIResourceListerFunc(func() []metav1.APIResource {
   617  		return []metav1.APIResource{
   618  			resources["v1alpha1"],
   619  		}
   620  	}))
   621  
   622  	handlerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   623  		if r.URL.Path == "/apis/stable.example.com" {
   624  			legacyGroupHandler.ServeHTTP(w, r)
   625  		} else if r.URL.Path == "/apis/stable.example.com/v1" {
   626  			// defer to legacy discovery
   627  			legacyResourceHandlerV1.ServeHTTP(w, r)
   628  		} else if r.URL.Path == "/apis/stable.example.com/v1beta1" {
   629  			// defer to legacy discovery
   630  			legacyResourceHandlerV1Beta1.ServeHTTP(w, r)
   631  		} else if r.URL.Path == "/apis/stable.example.com/v1alpha1" {
   632  			legacyResourceHandlerV1Alpha1.ServeHTTP(w, r)
   633  		} else if r.URL.Path == "/apis/stable.example.com/v2alpha1" {
   634  			// serve the most minimal discovery doc that could have worked prior to aggregated discovery
   635  			json.NewEncoder(w).Encode(&metav1.APIResourceList{
   636  				GroupVersion: "stable.example.com/v2alpha1",
   637  				APIResources: []metav1.APIResource{
   638  					{Name: "parent-without-kind"},
   639  					{Name: "missing-parent/subresource-without-parent", Kind: "SubresourceWithoutParent"},
   640  					{Name: "parent-without-kind/subresource", Kind: "Subresource"},
   641  					{Name: "parent-without-kind/subresource-without-kind"},
   642  				},
   643  			})
   644  		} else if r.URL.Path == "/apis" {
   645  			rootAPIsHandler.ServeHTTP(w, r)
   646  		} else {
   647  			// Unknown url
   648  			t.Fatalf("unexpected request sent to %v", r.URL.Path)
   649  		}
   650  	})
   651  
   652  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   653  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   654  		ObjectMeta: metav1.ObjectMeta{
   655  			Name: "v1.stable.example.com",
   656  		},
   657  		Spec: apiregistrationv1.APIServiceSpec{
   658  			Group:   "stable.example.com",
   659  			Version: "v1",
   660  			Service: &apiregistrationv1.ServiceReference{
   661  				Name: "test-service",
   662  			},
   663  		},
   664  	}, handlerFunc)
   665  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   666  		ObjectMeta: metav1.ObjectMeta{
   667  			Name: "v1beta1.stable.example.com",
   668  		},
   669  		Spec: apiregistrationv1.APIServiceSpec{
   670  			Group:   "stable.example.com",
   671  			Version: "v1beta1",
   672  			Service: &apiregistrationv1.ServiceReference{
   673  				Name: "test-service",
   674  			},
   675  		},
   676  	}, handlerFunc)
   677  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   678  		ObjectMeta: metav1.ObjectMeta{
   679  			Name: "v1alpha1.stable.example.com",
   680  		},
   681  		Spec: apiregistrationv1.APIServiceSpec{
   682  			Group:   "stable.example.com",
   683  			Version: "v1alpha1",
   684  			Service: &apiregistrationv1.ServiceReference{
   685  				Name: "test-service",
   686  			},
   687  		},
   688  	}, handlerFunc)
   689  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   690  		ObjectMeta: metav1.ObjectMeta{
   691  			Name: "v2alpha1.stable.example.com",
   692  		},
   693  		Spec: apiregistrationv1.APIServiceSpec{
   694  			Group:   "stable.example.com",
   695  			Version: "v2alpha1",
   696  			Service: &apiregistrationv1.ServiceReference{
   697  				Name: "test-service",
   698  			},
   699  		},
   700  	}, handlerFunc)
   701  
   702  	testCtx, cancel := context.WithCancel(context.Background())
   703  	defer cancel()
   704  
   705  	go aggregatedManager.Run(testCtx.Done(), nil)
   706  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   707  
   708  	// At this point external services have synced. Check if discovery document
   709  	// includes the legacy resources
   710  	_, _, doc := fetchPath(aggregatedResourceManager, "")
   711  
   712  	mustConvert := func(r []metav1.APIResource) []apidiscoveryv2.APIResourceDiscovery {
   713  		converted, err := endpoints.ConvertGroupVersionIntoToDiscovery(r)
   714  		require.NoError(t, err)
   715  		return converted
   716  	}
   717  	expectAggregatedDiscovery := []apidiscoveryv2.APIGroupDiscovery{{
   718  		ObjectMeta: metav1.ObjectMeta{
   719  			Name: "stable.example.com",
   720  		},
   721  		Versions: []apidiscoveryv2.APIVersionDiscovery{
   722  			{
   723  				Version:   "v1",
   724  				Resources: mustConvert([]metav1.APIResource{resources["v1"]}),
   725  				Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
   726  			},
   727  			{
   728  				Version:   "v1beta1",
   729  				Resources: mustConvert([]metav1.APIResource{resources["v1beta1"]}),
   730  				Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
   731  			},
   732  			{
   733  				Version: "v2alpha1",
   734  				Resources: []apidiscoveryv2.APIResourceDiscovery{
   735  					{
   736  						Resource:     "parent-without-kind",
   737  						ResponseKind: &metav1.GroupVersionKind{}, // defaulted
   738  						Scope:        "Cluster",
   739  						Subresources: []apidiscoveryv2.APISubresourceDiscovery{
   740  							{
   741  								Subresource:  "subresource",
   742  								ResponseKind: &metav1.GroupVersionKind{Kind: "Subresource"},
   743  							},
   744  							{
   745  								Subresource:  "subresource-without-kind",
   746  								ResponseKind: &metav1.GroupVersionKind{}, // defaulted
   747  							},
   748  						},
   749  					},
   750  					{
   751  						Resource:     "missing-parent",
   752  						ResponseKind: &metav1.GroupVersionKind{}, // defaulted
   753  						Scope:        "Cluster",
   754  						Subresources: []apidiscoveryv2.APISubresourceDiscovery{
   755  							{
   756  								Subresource:  "subresource-without-parent",
   757  								ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithoutParent"},
   758  							},
   759  						},
   760  					},
   761  				},
   762  				Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
   763  			},
   764  			{
   765  				Version:   "v1alpha1",
   766  				Resources: mustConvert([]metav1.APIResource{resources["v1alpha1"]}),
   767  				Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
   768  			},
   769  		},
   770  	}}
   771  	require.Equal(t, doc.Items, expectAggregatedDiscovery)
   772  }
   773  
   774  func testLegacyFallbackWithCustomRootHandler(t *testing.T, rootHandlerFn func(http.ResponseWriter, *http.Request)) {
   775  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   776  
   777  	legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
   778  		Name: "stable.example.com",
   779  		PreferredVersion: metav1.GroupVersionForDiscovery{
   780  			GroupVersion: "stable.example.com/v1",
   781  			Version:      "v1",
   782  		},
   783  		Versions: []metav1.GroupVersionForDiscovery{
   784  			{
   785  				GroupVersion: "stable.example.com/v1",
   786  				Version:      "v1",
   787  			},
   788  			{
   789  				GroupVersion: "stable.example.com/v1beta1",
   790  				Version:      "v1beta1",
   791  			},
   792  		},
   793  	})
   794  
   795  	resource := metav1.APIResource{
   796  		Name:         "foos",
   797  		SingularName: "foo",
   798  		Group:        "stable.example.com",
   799  		Version:      "v1",
   800  		Namespaced:   false,
   801  		Kind:         "Foo",
   802  		Verbs:        []string{"get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"},
   803  		Categories:   []string{"all"},
   804  	}
   805  
   806  	legacyResourceHandler := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
   807  		Group:   "stable.example.com",
   808  		Version: "v1",
   809  	}, discovery.APIResourceListerFunc(func() []metav1.APIResource {
   810  		return []metav1.APIResource{
   811  			resource,
   812  		}
   813  	}))
   814  
   815  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   816  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   817  		ObjectMeta: metav1.ObjectMeta{
   818  			Name: "v1.stable.example.com",
   819  		},
   820  		Spec: apiregistrationv1.APIServiceSpec{
   821  			Group:   "stable.example.com",
   822  			Version: "v1",
   823  			Service: &apiregistrationv1.ServiceReference{
   824  				Name: "test-service",
   825  			},
   826  		},
   827  	}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   828  		if r.URL.Path == "/apis/stable.example.com" {
   829  			legacyGroupHandler.ServeHTTP(w, r)
   830  		} else if r.URL.Path == "/apis/stable.example.com/v1" {
   831  			// defer to legacy discovery
   832  			legacyResourceHandler.ServeHTTP(w, r)
   833  		} else if r.URL.Path == "/apis" {
   834  			rootHandlerFn(w, r)
   835  		} else {
   836  			// Unknown url
   837  			t.Fatalf("unexpected request sent to %v", r.URL.Path)
   838  		}
   839  	}))
   840  	testCtx, cancel := context.WithCancel(context.Background())
   841  	defer cancel()
   842  
   843  	go aggregatedManager.Run(testCtx.Done(), nil)
   844  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   845  
   846  	// At this point external services have synced. Check if discovery document
   847  	// includes the legacy resources
   848  	_, _, doc := fetchPath(aggregatedResourceManager, "")
   849  
   850  	converted, err := endpoints.ConvertGroupVersionIntoToDiscovery([]metav1.APIResource{resource})
   851  	require.NoError(t, err)
   852  	require.Equal(t, []apidiscoveryv2.APIGroupDiscovery{
   853  		{
   854  			ObjectMeta: metav1.ObjectMeta{
   855  				Name: resource.Group,
   856  			},
   857  			Versions: []apidiscoveryv2.APIVersionDiscovery{
   858  				{
   859  					Version:   resource.Version,
   860  					Resources: converted,
   861  					Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
   862  				},
   863  			},
   864  		},
   865  	}, doc.Items)
   866  }
   867  func TestLegacyFallback(t *testing.T) {
   868  	rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
   869  	testCases := []struct {
   870  		name        string
   871  		rootHandler func(http.ResponseWriter, *http.Request)
   872  	}{
   873  		{
   874  			name:        "Default root handler (406)",
   875  			rootHandler: rootAPIsHandler.ServeHTTP,
   876  		},
   877  		{
   878  			name: "Root handler with non 200 status code",
   879  			rootHandler: func(w http.ResponseWriter, r *http.Request) {
   880  				w.WriteHeader(404)
   881  			},
   882  		},
   883  		{
   884  			name: "Root handler with 200 response code no content type",
   885  			rootHandler: func(w http.ResponseWriter, r *http.Request) {
   886  				w.Header().Set("Content-Type", "application/json")
   887  				w.WriteHeader(200)
   888  			},
   889  		},
   890  		{
   891  			name: "Root handler with 200 response code incorrect content type",
   892  			rootHandler: func(w http.ResponseWriter, r *http.Request) {
   893  				w.Header().Set("Content-Type", "application/json;g=apidiscovery.k8s.io;v=v1alpha1;as=APIGroupDiscoveryList")
   894  				w.WriteHeader(200)
   895  			},
   896  		},
   897  	}
   898  	for _, tc := range testCases {
   899  		testLegacyFallbackWithCustomRootHandler(t, tc.rootHandler)
   900  	}
   901  }
   902  
   903  func TestAPIServiceStale(t *testing.T) {
   904  	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
   905  	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
   906  	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
   907  		ObjectMeta: metav1.ObjectMeta{
   908  			Name: "v1.stable.example.com",
   909  		},
   910  		Spec: apiregistrationv1.APIServiceSpec{
   911  			Group:   "stable.example.com",
   912  			Version: "v1",
   913  			Service: &apiregistrationv1.ServiceReference{
   914  				Name: "test-service",
   915  			},
   916  		},
   917  	}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   918  		w.WriteHeader(503)
   919  	}))
   920  	testCtx, cancel := context.WithCancel(context.Background())
   921  	defer cancel()
   922  
   923  	go aggregatedManager.Run(testCtx.Done(), nil)
   924  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   925  
   926  	// At this point external services have synced. Check if discovery document
   927  	// lists the APIService group version as Stale.
   928  	_, _, doc := fetchPath(aggregatedResourceManager, "")
   929  	require.Equal(t, []apidiscoveryv2.APIGroupDiscovery{
   930  		{
   931  			ObjectMeta: metav1.ObjectMeta{
   932  				Name: "stable.example.com",
   933  			},
   934  			Versions: []apidiscoveryv2.APIVersionDiscovery{
   935  				{
   936  					Version:   "v1",
   937  					Freshness: apidiscoveryv2.DiscoveryFreshnessStale,
   938  				},
   939  			},
   940  		},
   941  	}, doc.Items)
   942  }
   943  
   944  // Exercises the 304 Not Modified Path of the aggregator
   945  // This path in 1.26.0 would result in a deadlock if an aggregated APIService
   946  // returned a 304 Not Modified response for its own aggregated discovery document.
   947  func TestNotModified(t *testing.T) {
   948  	aggyService := discoveryendpoint.NewResourceManager("apis")
   949  	service := discoveryendpoint.NewResourceManager("apis")
   950  	apiGroup := fuzzAPIGroups(2, 3, 10)
   951  	service.SetGroups(apiGroup.Items)
   952  
   953  	var apiServices []*apiregistrationv1.APIService
   954  	for _, g := range apiGroup.Items {
   955  		for _, v := range g.Versions {
   956  			apiservice := &apiregistrationv1.APIService{
   957  				ObjectMeta: metav1.ObjectMeta{
   958  					Name: v.Version + "." + g.Name,
   959  				},
   960  				Spec: apiregistrationv1.APIServiceSpec{
   961  					Group:   g.Name,
   962  					Version: v.Version,
   963  					Service: &apiregistrationv1.ServiceReference{
   964  						Namespace: "serviceNamespace",
   965  						Name:      "serviceName",
   966  					},
   967  				},
   968  			}
   969  
   970  			apiServices = append(apiServices, apiservice)
   971  		}
   972  	}
   973  
   974  	aggregatedManager := newDiscoveryManager(aggyService)
   975  	// Add all except the last group.
   976  	// Ensure this is done BEFORE the call to run, so they are included in initial
   977  	// list to keep test focused
   978  	for _, s := range apiServices[:len(apiServices)-1] {
   979  		aggregatedManager.AddAPIService(s, service)
   980  	}
   981  
   982  	testCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   983  	defer cancel()
   984  
   985  	go aggregatedManager.Run(testCtx.Done(), nil)
   986  
   987  	// Important to wait here to ensure we prime the cache with the initial list
   988  	// of documents in order to exercise 304 Not Modified
   989  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
   990  
   991  	// Now add all groups. We excluded one group before so that waitForQueueIsComplete
   992  	// could include it in this round. Now, if waitForQueueIsComplete ever returns
   993  	// true, it must have synced all the pre-existing groups before, which would
   994  	// return 304 Not Modified
   995  	for _, s := range apiServices {
   996  		aggregatedManager.AddAPIService(s, service)
   997  	}
   998  
   999  	// This would wait the full timeout on 1.26.0.
  1000  	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
  1001  }
  1002  
  1003  // copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go
  1004  func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2.APIGroupDiscoveryList {
  1005  	fuzzer := fuzz.NewWithSeed(seed)
  1006  	fuzzer.NumElements(atLeastNumGroups, maxNumGroups)
  1007  	fuzzer.NilChance(0)
  1008  	fuzzer.Funcs(func(o *apidiscoveryv2.APIGroupDiscovery, c fuzz.Continue) {
  1009  		c.FuzzNoCustom(o)
  1010  
  1011  		// The ResourceManager will just not serve the group if its versions
  1012  		// list is empty
  1013  		atLeastOne := apidiscoveryv2.APIVersionDiscovery{}
  1014  		c.Fuzz(&atLeastOne)
  1015  		o.Versions = append(o.Versions, atLeastOne)
  1016  
  1017  		// clear invalid fuzzed values
  1018  		o.TypeMeta = metav1.TypeMeta{}
  1019  		// truncate object meta to just name
  1020  		o.ObjectMeta = metav1.ObjectMeta{Name: o.ObjectMeta.Name}
  1021  		// fix version freshness value, make versions unique and non-empty
  1022  		for i := range o.Versions {
  1023  			o.Versions[i].Freshness = "Current"
  1024  			o.Versions[i].Version = fmt.Sprintf("v%d", i+1)
  1025  		}
  1026  	})
  1027  
  1028  	var apis []apidiscoveryv2.APIGroupDiscovery
  1029  	fuzzer.Fuzz(&apis)
  1030  
  1031  	return apidiscoveryv2.APIGroupDiscoveryList{
  1032  		TypeMeta: metav1.TypeMeta{
  1033  			Kind:       "APIGroupDiscoveryList",
  1034  			APIVersion: "v1",
  1035  		},
  1036  		Items: apis,
  1037  	}
  1038  
  1039  }
  1040  
  1041  // copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go
  1042  func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apidiscoveryv2.APIGroupDiscoveryList) {
  1043  	// Expect json-formatted apis group list
  1044  	w := httptest.NewRecorder()
  1045  	req := httptest.NewRequest("GET", "/apis", nil)
  1046  
  1047  	// Ask for JSON response
  1048  	req.Header.Set("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2;as=APIGroupDiscoveryList,"+runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
  1049  
  1050  	if etag != "" {
  1051  		// Quote provided etag if unquoted
  1052  		quoted := etag
  1053  		if !strings.HasPrefix(etag, "\"") {
  1054  			quoted = strconv.Quote(etag)
  1055  		}
  1056  		req.Header.Set("If-None-Match", quoted)
  1057  	}
  1058  
  1059  	handler.ServeHTTP(w, req)
  1060  
  1061  	bytes := w.Body.Bytes()
  1062  	var decoded *apidiscoveryv2.APIGroupDiscoveryList
  1063  	if len(bytes) > 0 {
  1064  		decoded = &apidiscoveryv2.APIGroupDiscoveryList{}
  1065  		runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), bytes, decoded)
  1066  	}
  1067  
  1068  	return w.Result(), bytes, decoded
  1069  }
  1070  
  1071  // completerWorkqueue is a workqueue.RateLimitingInterface that implements
  1072  // isComplete
  1073  type completerWorkqueue struct {
  1074  	lock sync.Mutex
  1075  	workqueue.RateLimitingInterface
  1076  	processing map[interface{}]struct{}
  1077  }
  1078  
  1079  var _ = workqueue.RateLimitingInterface(&completerWorkqueue{})
  1080  
  1081  func newCompleterWorkqueue(wq workqueue.RateLimitingInterface) *completerWorkqueue {
  1082  	return &completerWorkqueue{
  1083  		RateLimitingInterface: wq,
  1084  		processing:            make(map[interface{}]struct{}),
  1085  	}
  1086  }
  1087  
  1088  func (q *completerWorkqueue) Add(item interface{}) {
  1089  	q.lock.Lock()
  1090  	defer q.lock.Unlock()
  1091  	q.processing[item] = struct{}{}
  1092  	q.RateLimitingInterface.Add(item)
  1093  }
  1094  
  1095  func (q *completerWorkqueue) AddAfter(item interface{}, duration time.Duration) {
  1096  	q.Add(item)
  1097  }
  1098  
  1099  func (q *completerWorkqueue) AddRateLimited(item interface{}) {
  1100  	q.Add(item)
  1101  }
  1102  
  1103  func (q *completerWorkqueue) Done(item interface{}) {
  1104  	q.lock.Lock()
  1105  	defer q.lock.Unlock()
  1106  	delete(q.processing, item)
  1107  	q.RateLimitingInterface.Done(item)
  1108  }
  1109  
  1110  func (q *completerWorkqueue) isComplete() bool {
  1111  	q.lock.Lock()
  1112  	defer q.lock.Unlock()
  1113  	return q.Len() == 0 && len(q.processing) == 0
  1114  }
  1115  

View as plain text