...

Source file src/edge-infra.dev/pkg/edge/lighthouse/watch.go

Documentation: edge-infra.dev/pkg/edge/lighthouse

     1  package lighthouse
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/go-logr/logr"
     8  	v1 "k8s.io/api/core/v1"
     9  	"k8s.io/apimachinery/pkg/fields"
    10  	"k8s.io/apimachinery/pkg/types"
    11  	kwatch "k8s.io/apimachinery/pkg/watch"
    12  
    13  	"edge-infra.dev/pkg/edge/lighthouse/config"
    14  
    15  	k8serrors "k8s.io/apimachinery/pkg/api/errors"
    16  	"sigs.k8s.io/controller-runtime/pkg/client"
    17  )
    18  
    19  const (
    20  	// lighthouseConfigMapName the name of the lighthouse configmap.
    21  	lighthouseConfigMapName = "lighthouse"
    22  	// lighthouseConfigMapNs the namespace of the lighthouse configmap.
    23  	lighthouseConfigMapNs = "lighthouse"
    24  	// lighthouseDataFieldName the configmap data field name.
    25  	lighthouseDataFieldName = "CHANNELS"
    26  )
    27  
    28  // WatchCfg initializes a new watch and watches for changes in the lighthouse configmap.
    29  // it then handles that change by updating the lighthouse config.
    30  func WatchCfg(ctx context.Context, s *config.Config, lstner *WatchTower, log logr.Logger) (kwatch.Interface, error) {
    31  	//if the lighthouse configmap is not found register default channels
    32  	lighthouseConfigmap := &v1.ConfigMap{}
    33  	if err := s.WatchClient.Get(ctx, types.NamespacedName{
    34  		Name:      lighthouseConfigMapName,
    35  		Namespace: lighthouseConfigMapNs,
    36  	}, lighthouseConfigmap); err != nil {
    37  		if !k8serrors.IsNotFound(err) {
    38  			return nil, err
    39  		}
    40  		updateChannels(lstner, "", log)
    41  	}
    42  	list := &v1.ConfigMapList{}
    43  	watch, err := s.WatchClient.Watch(ctx, list, &client.ListOptions{
    44  		FieldSelector: fields.OneTermEqualSelector("metadata.name", lighthouseConfigMapName),
    45  		Namespace:     lighthouseConfigMapNs,
    46  	})
    47  	if err != nil {
    48  		return nil, err
    49  	}
    50  	go func(log logr.Logger) {
    51  		for event := range watch.ResultChan() {
    52  			switch event.Type {
    53  			case kwatch.Deleted:
    54  				// lighthouse configmap was deleted, unregister all channels
    55  				// use the default list of channels and register them.
    56  				if err := lstner.UnregisterAllChannels(); err != nil {
    57  					log.Error(err, "An error occurred unregistering all Postgres channels")
    58  					panic(err)
    59  				}
    60  				updateChannels(lstner, "", log)
    61  			case kwatch.Added, kwatch.Modified:
    62  				updatedConfigmap := event.Object.(*v1.ConfigMap)
    63  				updateChannels(lstner, updatedConfigmap.Data[lighthouseDataFieldName], log)
    64  			}
    65  		}
    66  	}(log)
    67  	return watch, nil
    68  }
    69  
    70  // updateChannels parses the channels list and updates the channels in the listener.
    71  func updateChannels(lstner *WatchTower, list string, log logr.Logger) {
    72  	channels := config.ParseChannelList(list)
    73  	for i := range channels {
    74  		if err := lstner.RegisterChannel(channels[i]); err != nil {
    75  			log.Error(err, fmt.Sprintf("An error occurred registering %s Postgres channels", channels[i]))
    76  		}
    77  	}
    78  }
    79  

View as plain text