1
16
17 package authz
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "os"
24 "sync/atomic"
25 "time"
26 "unsafe"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/grpclog"
31 "google.golang.org/grpc/internal/xds/rbac"
32 "google.golang.org/grpc/status"
33 )
34
35 var logger = grpclog.Component("authz")
36
37
38
39
40 type StaticInterceptor struct {
41 engines rbac.ChainEngine
42 }
43
44
45
46 func NewStatic(authzPolicy string) (*StaticInterceptor, error) {
47 rbacs, policyName, err := translatePolicy(authzPolicy)
48 if err != nil {
49 return nil, err
50 }
51 chainEngine, err := rbac.NewChainEngine(rbacs, policyName)
52 if err != nil {
53 return nil, err
54 }
55 return &StaticInterceptor{*chainEngine}, nil
56 }
57
58
59
60
61 func (i *StaticInterceptor) UnaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
62 err := i.engines.IsAuthorized(ctx)
63 if err != nil {
64 if status.Code(err) == codes.PermissionDenied {
65 if logger.V(2) {
66 logger.Infof("unauthorized RPC request rejected: %v", err)
67 }
68 return nil, status.Errorf(codes.PermissionDenied, "unauthorized RPC request rejected")
69 }
70 return nil, err
71 }
72 return handler(ctx, req)
73 }
74
75
76
77
78 func (i *StaticInterceptor) StreamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
79 err := i.engines.IsAuthorized(ss.Context())
80 if err != nil {
81 if status.Code(err) == codes.PermissionDenied {
82 if logger.V(2) {
83 logger.Infof("unauthorized RPC request rejected: %v", err)
84 }
85 return status.Errorf(codes.PermissionDenied, "unauthorized RPC request rejected")
86 }
87 return err
88 }
89 return handler(srv, ss)
90 }
91
92
93
94 type FileWatcherInterceptor struct {
95 internalInterceptor unsafe.Pointer
96 policyFile string
97 policyContents []byte
98 refreshDuration time.Duration
99 cancel context.CancelFunc
100 }
101
102
103
104
105 func NewFileWatcher(file string, duration time.Duration) (*FileWatcherInterceptor, error) {
106 if file == "" {
107 return nil, fmt.Errorf("authorization policy file path is empty")
108 }
109 if duration <= time.Duration(0) {
110 return nil, fmt.Errorf("requires refresh interval(%v) greater than 0s", duration)
111 }
112 i := &FileWatcherInterceptor{policyFile: file, refreshDuration: duration}
113 if err := i.updateInternalInterceptor(); err != nil {
114 return nil, err
115 }
116 ctx, cancel := context.WithCancel(context.Background())
117 i.cancel = cancel
118
119 go i.run(ctx)
120 return i, nil
121 }
122
123 func (i *FileWatcherInterceptor) run(ctx context.Context) {
124 ticker := time.NewTicker(i.refreshDuration)
125 for {
126 if err := i.updateInternalInterceptor(); err != nil {
127 logger.Warningf("authorization policy reload status err: %v", err)
128 }
129 select {
130 case <-ctx.Done():
131 ticker.Stop()
132 return
133 case <-ticker.C:
134 }
135 }
136 }
137
138
139
140
141
142 func (i *FileWatcherInterceptor) updateInternalInterceptor() error {
143 policyContents, err := os.ReadFile(i.policyFile)
144 if err != nil {
145 return fmt.Errorf("policyFile(%s) read failed: %v", i.policyFile, err)
146 }
147 if bytes.Equal(i.policyContents, policyContents) {
148 return nil
149 }
150 i.policyContents = policyContents
151 policyContentsString := string(policyContents)
152 interceptor, err := NewStatic(policyContentsString)
153 if err != nil {
154 return err
155 }
156 atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor))
157 logger.Infof("authorization policy reload status: successfully loaded new policy %v", policyContentsString)
158 return nil
159 }
160
161
162 func (i *FileWatcherInterceptor) Close() {
163 i.cancel()
164 }
165
166
167
168
169 func (i *FileWatcherInterceptor) UnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
170 return ((*StaticInterceptor)(atomic.LoadPointer(&i.internalInterceptor))).UnaryInterceptor(ctx, req, info, handler)
171 }
172
173
174
175
176 func (i *FileWatcherInterceptor) StreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
177 return ((*StaticInterceptor)(atomic.LoadPointer(&i.internalInterceptor))).StreamInterceptor(srv, ss, info, handler)
178 }
179
View as plain text