package lighthouse import ( "context" "fmt" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" kwatch "k8s.io/apimachinery/pkg/watch" "edge-infra.dev/pkg/edge/lighthouse/config" k8serrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( // lighthouseConfigMapName the name of the lighthouse configmap. lighthouseConfigMapName = "lighthouse" // lighthouseConfigMapNs the namespace of the lighthouse configmap. lighthouseConfigMapNs = "lighthouse" // lighthouseDataFieldName the configmap data field name. lighthouseDataFieldName = "CHANNELS" ) // WatchCfg initializes a new watch and watches for changes in the lighthouse configmap. // it then handles that change by updating the lighthouse config. func WatchCfg(ctx context.Context, s *config.Config, lstner *WatchTower, log logr.Logger) (kwatch.Interface, error) { //if the lighthouse configmap is not found register default channels lighthouseConfigmap := &v1.ConfigMap{} if err := s.WatchClient.Get(ctx, types.NamespacedName{ Name: lighthouseConfigMapName, Namespace: lighthouseConfigMapNs, }, lighthouseConfigmap); err != nil { if !k8serrors.IsNotFound(err) { return nil, err } updateChannels(lstner, "", log) } list := &v1.ConfigMapList{} watch, err := s.WatchClient.Watch(ctx, list, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", lighthouseConfigMapName), Namespace: lighthouseConfigMapNs, }) if err != nil { return nil, err } go func(log logr.Logger) { for event := range watch.ResultChan() { switch event.Type { case kwatch.Deleted: // lighthouse configmap was deleted, unregister all channels // use the default list of channels and register them. if err := lstner.UnregisterAllChannels(); err != nil { log.Error(err, "An error occurred unregistering all Postgres channels") panic(err) } updateChannels(lstner, "", log) case kwatch.Added, kwatch.Modified: updatedConfigmap := event.Object.(*v1.ConfigMap) updateChannels(lstner, updatedConfigmap.Data[lighthouseDataFieldName], log) } } }(log) return watch, nil } // updateChannels parses the channels list and updates the channels in the listener. func updateChannels(lstner *WatchTower, list string, log logr.Logger) { channels := config.ParseChannelList(list) for i := range channels { if err := lstner.RegisterChannel(channels[i]); err != nil { log.Error(err, fmt.Sprintf("An error occurred registering %s Postgres channels", channels[i])) } } }