1
16
17 package groupcache
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "net/http"
25 "net/url"
26 "strings"
27 "sync"
28
29 "github.com/golang/groupcache/consistenthash"
30 pb "github.com/golang/groupcache/groupcachepb"
31 "github.com/golang/protobuf/proto"
32 )
33
34 const defaultBasePath = "/_groupcache/"
35
36 const defaultReplicas = 50
37
38
39 type HTTPPool struct {
40
41
42
43 Context func(*http.Request) context.Context
44
45
46
47
48 Transport func(context.Context) http.RoundTripper
49
50
51 self string
52
53
54 opts HTTPPoolOptions
55
56 mu sync.Mutex
57 peers *consistenthash.Map
58 httpGetters map[string]*httpGetter
59 }
60
61
62 type HTTPPoolOptions struct {
63
64
65 BasePath string
66
67
68
69 Replicas int
70
71
72
73 HashFn consistenthash.Hash
74 }
75
76
77
78
79
80 func NewHTTPPool(self string) *HTTPPool {
81 p := NewHTTPPoolOpts(self, nil)
82 http.Handle(p.opts.BasePath, p)
83 return p
84 }
85
86 var httpPoolMade bool
87
88
89
90
91 func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
92 if httpPoolMade {
93 panic("groupcache: NewHTTPPool must be called only once")
94 }
95 httpPoolMade = true
96
97 p := &HTTPPool{
98 self: self,
99 httpGetters: make(map[string]*httpGetter),
100 }
101 if o != nil {
102 p.opts = *o
103 }
104 if p.opts.BasePath == "" {
105 p.opts.BasePath = defaultBasePath
106 }
107 if p.opts.Replicas == 0 {
108 p.opts.Replicas = defaultReplicas
109 }
110 p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
111
112 RegisterPeerPicker(func() PeerPicker { return p })
113 return p
114 }
115
116
117
118
119 func (p *HTTPPool) Set(peers ...string) {
120 p.mu.Lock()
121 defer p.mu.Unlock()
122 p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
123 p.peers.Add(peers...)
124 p.httpGetters = make(map[string]*httpGetter, len(peers))
125 for _, peer := range peers {
126 p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
127 }
128 }
129
130 func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
131 p.mu.Lock()
132 defer p.mu.Unlock()
133 if p.peers.IsEmpty() {
134 return nil, false
135 }
136 if peer := p.peers.Get(key); peer != p.self {
137 return p.httpGetters[peer], true
138 }
139 return nil, false
140 }
141
142 func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
143
144 if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
145 panic("HTTPPool serving unexpected path: " + r.URL.Path)
146 }
147 parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
148 if len(parts) != 2 {
149 http.Error(w, "bad request", http.StatusBadRequest)
150 return
151 }
152 groupName := parts[0]
153 key := parts[1]
154
155
156 group := GetGroup(groupName)
157 if group == nil {
158 http.Error(w, "no such group: "+groupName, http.StatusNotFound)
159 return
160 }
161 var ctx context.Context
162 if p.Context != nil {
163 ctx = p.Context(r)
164 } else {
165 ctx = r.Context()
166 }
167
168 group.Stats.ServerRequests.Add(1)
169 var value []byte
170 err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
171 if err != nil {
172 http.Error(w, err.Error(), http.StatusInternalServerError)
173 return
174 }
175
176
177 body, err := proto.Marshal(&pb.GetResponse{Value: value})
178 if err != nil {
179 http.Error(w, err.Error(), http.StatusInternalServerError)
180 return
181 }
182 w.Header().Set("Content-Type", "application/x-protobuf")
183 w.Write(body)
184 }
185
186 type httpGetter struct {
187 transport func(context.Context) http.RoundTripper
188 baseURL string
189 }
190
191 var bufferPool = sync.Pool{
192 New: func() interface{} { return new(bytes.Buffer) },
193 }
194
195 func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
196 u := fmt.Sprintf(
197 "%v%v/%v",
198 h.baseURL,
199 url.QueryEscape(in.GetGroup()),
200 url.QueryEscape(in.GetKey()),
201 )
202 req, err := http.NewRequest("GET", u, nil)
203 if err != nil {
204 return err
205 }
206 req = req.WithContext(ctx)
207 tr := http.DefaultTransport
208 if h.transport != nil {
209 tr = h.transport(ctx)
210 }
211 res, err := tr.RoundTrip(req)
212 if err != nil {
213 return err
214 }
215 defer res.Body.Close()
216 if res.StatusCode != http.StatusOK {
217 return fmt.Errorf("server returned: %v", res.Status)
218 }
219 b := bufferPool.Get().(*bytes.Buffer)
220 b.Reset()
221 defer bufferPool.Put(b)
222 _, err = io.Copy(b, res.Body)
223 if err != nil {
224 return fmt.Errorf("reading response body: %v", err)
225 }
226 err = proto.Unmarshal(b.Bytes(), out)
227 if err != nil {
228 return fmt.Errorf("decoding response body: %v", err)
229 }
230 return nil
231 }
232
View as plain text