package client import ( "context" b64 "encoding/base64" "encoding/json" "fmt" "reflect" "edge-infra.dev/pkg/edge/chariot" "google.golang.org/api/option" "edge-infra.dev/pkg/lib/gcp/pubsub" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/edge/k8objectsutils" ) const ( // Create is a creation operation for chariot v2 Create Operation = iota // Delete is a deletion operation for chariot v2 Delete ) type Operation int // ChariotMessage represents a pubsub message for chariot. // Message Structure is defined here: https://docs.edge-infra.dev/rfc/0004-chariot-v2/#message-structure type ChariotMessage struct { // Objects is an array of base64 encoded objects on which the operation should be carried out on. Objects []string `json:"objects"` // Banner is the banner name. It is the $BANNER_GUID Banner string `json:"banner"` // Cluster is the cluster name. it is the $CLUSTER_GUID Cluster string `json:"cluster"` // Operation is the operation to be carried out. Example are CREATE, DELETE. Operation string `json:"operation"` // Owner Owner string `json:"owner"` // Dir is the directory chariot places the object. Dir string `json:"dir,omitempty"` // Notify is a boolean to notify clusters that a resource has been applied Notify bool `json:"notify,omitempty"` } // String converts operation to a string. func (s Operation) String() string { switch s { case Create: return "CREATE" case Delete: return "DELETE" default: return "CREATE" } } // NewChariotMessage returns a new chariot message. func NewChariotMessage() *ChariotMessage { return &ChariotMessage{} } // SetBanner sets the banner $BANNER_GUID. func (c *ChariotMessage) SetBanner(banner string) *ChariotMessage { c.Banner = banner return c } // SetCluster sets the cluster $CLUSTER_GUID. func (c *ChariotMessage) SetCluster(cluster string) *ChariotMessage { c.Cluster = cluster return c } // SetDir sets the dir in which chariot should place the object. func (c *ChariotMessage) SetDir(dir string) *ChariotMessage { c.Dir = dir return c } // SetOperation sets the chariot v2 operation to be carried out. (e.g. CREATE, DELETE) func (c *ChariotMessage) SetOperation(operation Operation) *ChariotMessage { c.Operation = operation.String() return c } // SetOwner sets the owner performing this operation. // Explained here: https://docs.edge-infra.dev/rfc/0004-chariot-v2/#ownership func (c *ChariotMessage) SetOwner(owner string) *ChariotMessage { c.Owner = owner return c } // SetObjects sets the objects for which the action is being performed. // objects is a slice of base64 encoded strings. func (c *ChariotMessage) SetObjects(objects []string) *ChariotMessage { c.Objects = objects return c } // SetNotify indicates if a cluster needs to be notified to reconcile a resource. func (c *ChariotMessage) SetNotify(notify bool) *ChariotMessage { c.Notify = notify return c } // AddObject adds an object to the chariot v2 message. // object is a base64 encoded string. func (c *ChariotMessage) AddObject(object ...string) *ChariotMessage { c.Objects = append(c.Objects, object...) return c } // SetK8sBase64Objects convert the objects into base64 string and sets the objects for which the action is being performed. // objects is a slice of any object. func (c *ChariotMessage) SetK8sBase64Objects(objects ...client.Object) (*ChariotMessage, error) { var base64Strings []string for _, o := range objects { data, err := k8objectsutils.ToBase64JSONString(o) if err != nil { return nil, err } base64Strings = append(base64Strings, data) } return c.SetObjects(base64Strings), nil } // AddObject convert the objects into base64 and adds an object to the chariot v2 message. // object is any object. func (c *ChariotMessage) AddK8sBase64Object(object ...client.Object) (*ChariotMessage, error) { for _, o := range object { data, err := k8objectsutils.ToBase64JSONString(o) if err != nil { return nil, err } c.AddObject(data) } return c, nil } // ToByte json marshals the chariot v2 message and returns a slice of bytes and an error if any. func (c *ChariotMessage) ToByte() ([]byte, error) { return json.Marshal(c) } // Publish it publishes the chariot message to the chariot pubsub topic. func (c *ChariotMessage) Publish(ctx context.Context, projectID string, topicID string, attributes map[string]string, opts ...option.ClientOption) error { pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...) if err != nil { return err } chariotMessageByte, err := c.ToByte() if err != nil { return err } return pubSubService.Send(ctx, topicID, chariotMessageByte, attributes) } func (c *ChariotMessage) Matches(x interface{}) bool { reflectedValue := reflect.ValueOf(x).Elem() ls := reflectedValue.FieldByName("Objects").Interface().([]string) //ls if len(c.Objects) != len(ls) { return false } map1 := map[string]bool{} map2 := map[string]bool{} for i := range ls { str1, _ := b64.StdEncoding.DecodeString(ls[i]) gvk1, err := chariot.ParseYamlGVKNN(str1) if err != nil { return false } str2, _ := b64.StdEncoding.DecodeString(c.Objects[i]) gvk2, err := chariot.ParseYamlGVKNN(str2) if err != nil { return false } //if they're the same skip if gvk1.Hash() == gvk2.Hash() { continue } _, isItem2InMap1 := map1[gvk2.Hash()] _, isItem1InMap2 := map2[gvk1.Hash()] //if not in either map add to both if !isItem2InMap1 { map2[gvk2.Hash()] = true } else { delete(map1, gvk2.Hash()) } //if not in either map add to both if !isItem1InMap2 { map1[gvk1.Hash()] = true } else { delete(map2, gvk1.Hash()) } } //if both maps not empty mismatch in list of objects if len(map1) != 0 || len(map2) != 0 { return false } if c.Banner != reflectedValue.FieldByName("Banner").String() { return false } else if c.Cluster != reflectedValue.FieldByName("Cluster").String() { return false } else if c.Operation != reflectedValue.FieldByName("Operation").String() { return false } else if c.Owner != reflectedValue.FieldByName("Owner").String() { return false } else if c.Dir != reflectedValue.FieldByName("Dir").String() { return false } return true } func (c *ChariotMessage) String() string { return fmt.Sprintf("{ChariotMessage - Banner:%s Cluster:%s Operation:%s Owner:%s Dir:%s}", c.Banner, c.Cluster, c.Operation, c.Owner, c.Dir) }