...

Source file src/google.golang.org/grpc/authz/grpc_authz_server_interceptors.go

Documentation: google.golang.org/grpc/authz

     1  /*
     2   * Copyright 2021 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package authz
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"os"
    24  	"sync/atomic"
    25  	"time"
    26  	"unsafe"
    27  
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/codes"
    30  	"google.golang.org/grpc/grpclog"
    31  	"google.golang.org/grpc/internal/xds/rbac"
    32  	"google.golang.org/grpc/status"
    33  )
    34  
    35  var logger = grpclog.Component("authz")
    36  
    37  // StaticInterceptor contains engines used to make authorization decisions. It
    38  // either contains two engines deny engine followed by an allow engine or only
    39  // one allow engine.
    40  type StaticInterceptor struct {
    41  	engines rbac.ChainEngine
    42  }
    43  
    44  // NewStatic returns a new StaticInterceptor from a static authorization policy
    45  // JSON string.
    46  func NewStatic(authzPolicy string) (*StaticInterceptor, error) {
    47  	rbacs, policyName, err := translatePolicy(authzPolicy)
    48  	if err != nil {
    49  		return nil, err
    50  	}
    51  	chainEngine, err := rbac.NewChainEngine(rbacs, policyName)
    52  	if err != nil {
    53  		return nil, err
    54  	}
    55  	return &StaticInterceptor{*chainEngine}, nil
    56  }
    57  
    58  // UnaryInterceptor intercepts incoming Unary RPC requests.
    59  // Only authorized requests are allowed to pass. Otherwise, an unauthorized
    60  // error is returned to the client.
    61  func (i *StaticInterceptor) UnaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
    62  	err := i.engines.IsAuthorized(ctx)
    63  	if err != nil {
    64  		if status.Code(err) == codes.PermissionDenied {
    65  			if logger.V(2) {
    66  				logger.Infof("unauthorized RPC request rejected: %v", err)
    67  			}
    68  			return nil, status.Errorf(codes.PermissionDenied, "unauthorized RPC request rejected")
    69  		}
    70  		return nil, err
    71  	}
    72  	return handler(ctx, req)
    73  }
    74  
    75  // StreamInterceptor intercepts incoming Stream RPC requests.
    76  // Only authorized requests are allowed to pass. Otherwise, an unauthorized
    77  // error is returned to the client.
    78  func (i *StaticInterceptor) StreamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    79  	err := i.engines.IsAuthorized(ss.Context())
    80  	if err != nil {
    81  		if status.Code(err) == codes.PermissionDenied {
    82  			if logger.V(2) {
    83  				logger.Infof("unauthorized RPC request rejected: %v", err)
    84  			}
    85  			return status.Errorf(codes.PermissionDenied, "unauthorized RPC request rejected")
    86  		}
    87  		return err
    88  	}
    89  	return handler(srv, ss)
    90  }
    91  
    92  // FileWatcherInterceptor contains details used to make authorization decisions
    93  // by watching a file path that contains authorization policy in JSON format.
    94  type FileWatcherInterceptor struct {
    95  	internalInterceptor unsafe.Pointer // *StaticInterceptor
    96  	policyFile          string
    97  	policyContents      []byte
    98  	refreshDuration     time.Duration
    99  	cancel              context.CancelFunc
   100  }
   101  
   102  // NewFileWatcher returns a new FileWatcherInterceptor from a policy file
   103  // that contains JSON string of authorization policy and a refresh duration to
   104  // specify the amount of time between policy refreshes.
   105  func NewFileWatcher(file string, duration time.Duration) (*FileWatcherInterceptor, error) {
   106  	if file == "" {
   107  		return nil, fmt.Errorf("authorization policy file path is empty")
   108  	}
   109  	if duration <= time.Duration(0) {
   110  		return nil, fmt.Errorf("requires refresh interval(%v) greater than 0s", duration)
   111  	}
   112  	i := &FileWatcherInterceptor{policyFile: file, refreshDuration: duration}
   113  	if err := i.updateInternalInterceptor(); err != nil {
   114  		return nil, err
   115  	}
   116  	ctx, cancel := context.WithCancel(context.Background())
   117  	i.cancel = cancel
   118  	// Create a background go routine for policy refresh.
   119  	go i.run(ctx)
   120  	return i, nil
   121  }
   122  
   123  func (i *FileWatcherInterceptor) run(ctx context.Context) {
   124  	ticker := time.NewTicker(i.refreshDuration)
   125  	for {
   126  		if err := i.updateInternalInterceptor(); err != nil {
   127  			logger.Warningf("authorization policy reload status err: %v", err)
   128  		}
   129  		select {
   130  		case <-ctx.Done():
   131  			ticker.Stop()
   132  			return
   133  		case <-ticker.C:
   134  		}
   135  	}
   136  }
   137  
   138  // updateInternalInterceptor checks if the policy file that is watching has changed,
   139  // and if so, updates the internalInterceptor with the policy. Unlike the
   140  // constructor, if there is an error in reading the file or parsing the policy, the
   141  // previous internalInterceptors will not be replaced.
   142  func (i *FileWatcherInterceptor) updateInternalInterceptor() error {
   143  	policyContents, err := os.ReadFile(i.policyFile)
   144  	if err != nil {
   145  		return fmt.Errorf("policyFile(%s) read failed: %v", i.policyFile, err)
   146  	}
   147  	if bytes.Equal(i.policyContents, policyContents) {
   148  		return nil
   149  	}
   150  	i.policyContents = policyContents
   151  	policyContentsString := string(policyContents)
   152  	interceptor, err := NewStatic(policyContentsString)
   153  	if err != nil {
   154  		return err
   155  	}
   156  	atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor))
   157  	logger.Infof("authorization policy reload status: successfully loaded new policy %v", policyContentsString)
   158  	return nil
   159  }
   160  
   161  // Close cleans up resources allocated by the interceptor.
   162  func (i *FileWatcherInterceptor) Close() {
   163  	i.cancel()
   164  }
   165  
   166  // UnaryInterceptor intercepts incoming Unary RPC requests.
   167  // Only authorized requests are allowed to pass. Otherwise, an unauthorized
   168  // error is returned to the client.
   169  func (i *FileWatcherInterceptor) UnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
   170  	return ((*StaticInterceptor)(atomic.LoadPointer(&i.internalInterceptor))).UnaryInterceptor(ctx, req, info, handler)
   171  }
   172  
   173  // StreamInterceptor intercepts incoming Stream RPC requests.
   174  // Only authorized requests are allowed to pass. Otherwise, an unauthorized
   175  // error is returned to the client.
   176  func (i *FileWatcherInterceptor) StreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   177  	return ((*StaticInterceptor)(atomic.LoadPointer(&i.internalInterceptor))).StreamInterceptor(srv, ss, info, handler)
   178  }
   179  

View as plain text