...

Source file src/github.com/golang/groupcache/http.go

Documentation: github.com/golang/groupcache

     1  /*
     2  Copyright 2013 Google Inc.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8       http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package groupcache
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"net/http"
    25  	"net/url"
    26  	"strings"
    27  	"sync"
    28  
    29  	"github.com/golang/groupcache/consistenthash"
    30  	pb "github.com/golang/groupcache/groupcachepb"
    31  	"github.com/golang/protobuf/proto"
    32  )
    33  
    34  const defaultBasePath = "/_groupcache/"
    35  
    36  const defaultReplicas = 50
    37  
    38  // HTTPPool implements PeerPicker for a pool of HTTP peers.
    39  type HTTPPool struct {
    40  	// Context optionally specifies a context for the server to use when it
    41  	// receives a request.
    42  	// If nil, the server uses the request's context
    43  	Context func(*http.Request) context.Context
    44  
    45  	// Transport optionally specifies an http.RoundTripper for the client
    46  	// to use when it makes a request.
    47  	// If nil, the client uses http.DefaultTransport.
    48  	Transport func(context.Context) http.RoundTripper
    49  
    50  	// this peer's base URL, e.g. "https://example.net:8000"
    51  	self string
    52  
    53  	// opts specifies the options.
    54  	opts HTTPPoolOptions
    55  
    56  	mu          sync.Mutex // guards peers and httpGetters
    57  	peers       *consistenthash.Map
    58  	httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
    59  }
    60  
    61  // HTTPPoolOptions are the configurations of a HTTPPool.
    62  type HTTPPoolOptions struct {
    63  	// BasePath specifies the HTTP path that will serve groupcache requests.
    64  	// If blank, it defaults to "/_groupcache/".
    65  	BasePath string
    66  
    67  	// Replicas specifies the number of key replicas on the consistent hash.
    68  	// If blank, it defaults to 50.
    69  	Replicas int
    70  
    71  	// HashFn specifies the hash function of the consistent hash.
    72  	// If blank, it defaults to crc32.ChecksumIEEE.
    73  	HashFn consistenthash.Hash
    74  }
    75  
    76  // NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
    77  // For convenience, it also registers itself as an http.Handler with http.DefaultServeMux.
    78  // The self argument should be a valid base URL that points to the current server,
    79  // for example "http://example.net:8000".
    80  func NewHTTPPool(self string) *HTTPPool {
    81  	p := NewHTTPPoolOpts(self, nil)
    82  	http.Handle(p.opts.BasePath, p)
    83  	return p
    84  }
    85  
    86  var httpPoolMade bool
    87  
    88  // NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
    89  // Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
    90  // The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
    91  func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
    92  	if httpPoolMade {
    93  		panic("groupcache: NewHTTPPool must be called only once")
    94  	}
    95  	httpPoolMade = true
    96  
    97  	p := &HTTPPool{
    98  		self:        self,
    99  		httpGetters: make(map[string]*httpGetter),
   100  	}
   101  	if o != nil {
   102  		p.opts = *o
   103  	}
   104  	if p.opts.BasePath == "" {
   105  		p.opts.BasePath = defaultBasePath
   106  	}
   107  	if p.opts.Replicas == 0 {
   108  		p.opts.Replicas = defaultReplicas
   109  	}
   110  	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
   111  
   112  	RegisterPeerPicker(func() PeerPicker { return p })
   113  	return p
   114  }
   115  
   116  // Set updates the pool's list of peers.
   117  // Each peer value should be a valid base URL,
   118  // for example "http://example.net:8000".
   119  func (p *HTTPPool) Set(peers ...string) {
   120  	p.mu.Lock()
   121  	defer p.mu.Unlock()
   122  	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
   123  	p.peers.Add(peers...)
   124  	p.httpGetters = make(map[string]*httpGetter, len(peers))
   125  	for _, peer := range peers {
   126  		p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
   127  	}
   128  }
   129  
   130  func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
   131  	p.mu.Lock()
   132  	defer p.mu.Unlock()
   133  	if p.peers.IsEmpty() {
   134  		return nil, false
   135  	}
   136  	if peer := p.peers.Get(key); peer != p.self {
   137  		return p.httpGetters[peer], true
   138  	}
   139  	return nil, false
   140  }
   141  
   142  func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   143  	// Parse request.
   144  	if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
   145  		panic("HTTPPool serving unexpected path: " + r.URL.Path)
   146  	}
   147  	parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
   148  	if len(parts) != 2 {
   149  		http.Error(w, "bad request", http.StatusBadRequest)
   150  		return
   151  	}
   152  	groupName := parts[0]
   153  	key := parts[1]
   154  
   155  	// Fetch the value for this group/key.
   156  	group := GetGroup(groupName)
   157  	if group == nil {
   158  		http.Error(w, "no such group: "+groupName, http.StatusNotFound)
   159  		return
   160  	}
   161  	var ctx context.Context
   162  	if p.Context != nil {
   163  		ctx = p.Context(r)
   164  	} else {
   165  		ctx = r.Context()
   166  	}
   167  
   168  	group.Stats.ServerRequests.Add(1)
   169  	var value []byte
   170  	err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
   171  	if err != nil {
   172  		http.Error(w, err.Error(), http.StatusInternalServerError)
   173  		return
   174  	}
   175  
   176  	// Write the value to the response body as a proto message.
   177  	body, err := proto.Marshal(&pb.GetResponse{Value: value})
   178  	if err != nil {
   179  		http.Error(w, err.Error(), http.StatusInternalServerError)
   180  		return
   181  	}
   182  	w.Header().Set("Content-Type", "application/x-protobuf")
   183  	w.Write(body)
   184  }
   185  
   186  type httpGetter struct {
   187  	transport func(context.Context) http.RoundTripper
   188  	baseURL   string
   189  }
   190  
   191  var bufferPool = sync.Pool{
   192  	New: func() interface{} { return new(bytes.Buffer) },
   193  }
   194  
   195  func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
   196  	u := fmt.Sprintf(
   197  		"%v%v/%v",
   198  		h.baseURL,
   199  		url.QueryEscape(in.GetGroup()),
   200  		url.QueryEscape(in.GetKey()),
   201  	)
   202  	req, err := http.NewRequest("GET", u, nil)
   203  	if err != nil {
   204  		return err
   205  	}
   206  	req = req.WithContext(ctx)
   207  	tr := http.DefaultTransport
   208  	if h.transport != nil {
   209  		tr = h.transport(ctx)
   210  	}
   211  	res, err := tr.RoundTrip(req)
   212  	if err != nil {
   213  		return err
   214  	}
   215  	defer res.Body.Close()
   216  	if res.StatusCode != http.StatusOK {
   217  		return fmt.Errorf("server returned: %v", res.Status)
   218  	}
   219  	b := bufferPool.Get().(*bytes.Buffer)
   220  	b.Reset()
   221  	defer bufferPool.Put(b)
   222  	_, err = io.Copy(b, res.Body)
   223  	if err != nil {
   224  		return fmt.Errorf("reading response body: %v", err)
   225  	}
   226  	err = proto.Unmarshal(b.Bytes(), out)
   227  	if err != nil {
   228  		return fmt.Errorf("decoding response body: %v", err)
   229  	}
   230  	return nil
   231  }
   232  

View as plain text