...
1
18
19
20 package rls
21
22 import (
23 "encoding/json"
24 "fmt"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/internal"
28 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
29 "google.golang.org/grpc/xds/internal/clusterspecifier"
30 "google.golang.org/protobuf/encoding/protojson"
31 "google.golang.org/protobuf/proto"
32 "google.golang.org/protobuf/types/known/anypb"
33 )
34
35 func init() {
36 clusterspecifier.Register(rls{})
37
38
39 internal.RegisterRLSClusterSpecifierPluginForTesting = func() {
40 clusterspecifier.Register(rls{})
41 }
42 internal.UnregisterRLSClusterSpecifierPluginForTesting = func() {
43 for _, typeURL := range rls.TypeURLs(rls{}) {
44 clusterspecifier.UnregisterForTesting(typeURL)
45 }
46 }
47 }
48
49 type rls struct{}
50
51 func (rls) TypeURLs() []string {
52 return []string{"type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier"}
53 }
54
55
56
57
58 type lbConfigJSON struct {
59 RouteLookupConfig json.RawMessage `json:"routeLookupConfig"`
60 ChildPolicy []map[string]json.RawMessage `json:"childPolicy"`
61 ChildPolicyConfigTargetFieldName string `json:"childPolicyConfigTargetFieldName"`
62 }
63
64 func (rls) ParseClusterSpecifierConfig(cfg proto.Message) (clusterspecifier.BalancerConfig, error) {
65 if cfg == nil {
66 return nil, fmt.Errorf("rls_csp: nil configuration message provided")
67 }
68 any, ok := cfg.(*anypb.Any)
69 if !ok {
70 return nil, fmt.Errorf("rls_csp: error parsing config %v: unknown type %T", cfg, cfg)
71 }
72 rlcs := new(rlspb.RouteLookupClusterSpecifier)
73
74 if err := any.UnmarshalTo(rlcs); err != nil {
75 return nil, fmt.Errorf("rls_csp: error parsing config %v: %v", cfg, err)
76 }
77 rlcJSON, err := protojson.Marshal(rlcs.GetRouteLookupConfig())
78 if err != nil {
79 return nil, fmt.Errorf("rls_csp: error marshaling route lookup config: %v: %v", rlcs.GetRouteLookupConfig(), err)
80 }
81 lbCfgJSON := &lbConfigJSON{
82 RouteLookupConfig: rlcJSON,
83 ChildPolicy: []map[string]json.RawMessage{
84 {
85 "cds_experimental": json.RawMessage("{}"),
86 },
87 },
88 ChildPolicyConfigTargetFieldName: "cluster",
89 }
90
91 rawJSON, err := json.Marshal(lbCfgJSON)
92 if err != nil {
93 return nil, fmt.Errorf("rls_csp: error marshaling load balancing config %v: %v", lbCfgJSON, err)
94 }
95
96 rlsBB := balancer.Get(internal.RLSLoadBalancingPolicyName)
97 if rlsBB == nil {
98 return nil, fmt.Errorf("RLS LB policy not registered")
99 }
100 if _, err = rlsBB.(balancer.ConfigParser).ParseConfig(rawJSON); err != nil {
101 return nil, fmt.Errorf("rls_csp: validation error from rls lb policy parsing: %v", err)
102 }
103
104 return clusterspecifier.BalancerConfig{{internal.RLSLoadBalancingPolicyName: lbCfgJSON}}, nil
105 }
106
View as plain text