...

Source file src/github.com/cert-manager/issuer-lib/internal/kubeutil/source_channel.go

Documentation: github.com/cert-manager/issuer-lib/internal/kubeutil

     1  /*
     2  Copyright 2023 The cert-manager Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubeutil
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  
    24  	"k8s.io/apimachinery/pkg/runtime/schema"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/client-go/util/workqueue"
    27  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    28  	"sigs.k8s.io/controller-runtime/pkg/source"
    29  )
    30  
    31  type EventSource interface {
    32  	AddConsumer(gvk schema.GroupVersionKind) source.Source
    33  	ReportError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName, err error) error
    34  	HasReportedError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName) error
    35  }
    36  
    37  type resource struct {
    38  	gvk            schema.GroupVersionKind
    39  	namespacedName types.NamespacedName
    40  }
    41  
    42  type eventSource struct {
    43  	mu         sync.RWMutex
    44  	dest       map[schema.GroupVersionKind]workqueue.RateLimitingInterface
    45  	invalidate sync.Map
    46  }
    47  
    48  func NewEventStore() EventSource {
    49  	return &eventSource{
    50  		dest: make(map[schema.GroupVersionKind]workqueue.RateLimitingInterface),
    51  	}
    52  }
    53  
    54  func (es *eventSource) HasReportedError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName) error {
    55  	err, ok := es.invalidate.LoadAndDelete(resource{
    56  		gvk:            gvk,
    57  		namespacedName: namespacedName,
    58  	})
    59  	if !ok {
    60  		return nil
    61  	}
    62  	return err.(error)
    63  }
    64  
    65  func (es *eventSource) ReportError(gvk schema.GroupVersionKind, namespacedName types.NamespacedName, err error) error {
    66  	es.mu.RLock()
    67  	defer es.mu.RUnlock()
    68  
    69  	if queue, ok := es.dest[gvk]; !ok {
    70  		return fmt.Errorf("consumer for %v does not exist", gvk)
    71  	} else {
    72  		es.invalidate.Store(resource{
    73  			gvk:            gvk,
    74  			namespacedName: namespacedName,
    75  		}, err)
    76  
    77  		queue.Add(reconcile.Request{NamespacedName: namespacedName})
    78  		return nil
    79  	}
    80  }
    81  
    82  func (es *eventSource) AddConsumer(gvk schema.GroupVersionKind) source.Source {
    83  	return &eventConsumer{
    84  		register: func(queue workqueue.RateLimitingInterface) error {
    85  			es.mu.Lock()
    86  			defer es.mu.Unlock()
    87  
    88  			_, ok := es.dest[gvk]
    89  			if ok {
    90  				return fmt.Errorf("consumer for %v already registered", gvk)
    91  			}
    92  
    93  			es.dest[gvk] = queue
    94  
    95  			return nil
    96  		},
    97  	}
    98  }
    99  
   100  type eventConsumer struct {
   101  	register func(queue workqueue.RateLimitingInterface) error
   102  }
   103  
   104  var _ source.Source = &eventConsumer{}
   105  
   106  func (cs *eventConsumer) String() string {
   107  	return fmt.Sprintf("EventConsumer: %p", cs)
   108  }
   109  
   110  // Start implements Source and should only be called by the Controller.
   111  func (cs *eventConsumer) Start(_ context.Context, queue workqueue.RateLimitingInterface) error {
   112  	if cs.register == nil {
   113  		return fmt.Errorf("register function not provided")
   114  	}
   115  
   116  	err := cs.register(queue)
   117  	cs.register = nil
   118  
   119  	return err
   120  }
   121  

View as plain text