...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3rpc
16
17 import (
18 "context"
19
20 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
21 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
22 "go.etcd.io/etcd/client/pkg/v3/types"
23 "go.etcd.io/etcd/server/v3/etcdserver"
24 )
25
26 type quotaKVServer struct {
27 pb.KVServer
28 qa quotaAlarmer
29 }
30
31 type quotaAlarmer struct {
32 q etcdserver.Quota
33 a Alarmer
34 id types.ID
35 }
36
37
38
39 func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error {
40 if qa.q.Available(r) {
41 return nil
42 }
43 req := &pb.AlarmRequest{
44 MemberID: uint64(qa.id),
45 Action: pb.AlarmRequest_ACTIVATE,
46 Alarm: pb.AlarmType_NOSPACE,
47 }
48 qa.a.Alarm(ctx, req)
49 return rpctypes.ErrGRPCNoSpace
50 }
51
52 func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
53 return "aKVServer{
54 NewKVServer(s),
55 quotaAlarmer{etcdserver.NewBackendQuota(s, "kv"), s, s.ID()},
56 }
57 }
58
59 func (s *quotaKVServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
60 if err := s.qa.check(ctx, r); err != nil {
61 return nil, err
62 }
63 return s.KVServer.Put(ctx, r)
64 }
65
66 func (s *quotaKVServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
67 if err := s.qa.check(ctx, r); err != nil {
68 return nil, err
69 }
70 return s.KVServer.Txn(ctx, r)
71 }
72
73 type quotaLeaseServer struct {
74 pb.LeaseServer
75 qa quotaAlarmer
76 }
77
78 func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
79 if err := s.qa.check(ctx, cr); err != nil {
80 return nil, err
81 }
82 return s.LeaseServer.LeaseGrant(ctx, cr)
83 }
84
85 func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
86 return "aLeaseServer{
87 NewLeaseServer(s),
88 quotaAlarmer{etcdserver.NewBackendQuota(s, "lease"), s, s.ID()},
89 }
90 }
91
View as plain text