...

Source file src/github.com/gomodule/redigo/redisx/connmux.go

Documentation: github.com/gomodule/redigo/redisx

     1  // Copyright 2014 Gary Burd
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"): you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    11  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    12  // License for the specific language governing permissions and limitations
    13  // under the License.
    14  
    15  package redisx
    16  
    17  import (
    18  	"errors"
    19  	"sync"
    20  
    21  	"github.com/gomodule/redigo/internal"
    22  	"github.com/gomodule/redigo/redis"
    23  )
    24  
    25  // ConnMux multiplexes one or more connections to a single underlying
    26  // connection. The ConnMux connections do not support concurrency, commands
    27  // that associate server side state with the connection or commands that put
    28  // the connection in a special mode.
    29  type ConnMux struct {
    30  	c redis.Conn
    31  
    32  	sendMu sync.Mutex
    33  	sendID uint
    34  
    35  	recvMu   sync.Mutex
    36  	recvID   uint
    37  	recvWait map[uint]chan struct{}
    38  }
    39  
    40  func NewConnMux(c redis.Conn) *ConnMux {
    41  	return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})}
    42  }
    43  
    44  // Get gets a connection. The application must close the returned connection.
    45  func (p *ConnMux) Get() redis.Conn {
    46  	c := &muxConn{p: p}
    47  	c.ids = c.buf[:0]
    48  	return c
    49  }
    50  
    51  // Close closes the underlying connection.
    52  func (p *ConnMux) Close() error {
    53  	return p.c.Close()
    54  }
    55  
    56  type muxConn struct {
    57  	p   *ConnMux
    58  	ids []uint
    59  	buf [8]uint
    60  }
    61  
    62  func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error {
    63  	if internal.LookupCommandInfo(cmd).Set != 0 {
    64  		return errors.New("command not supported by mux pool")
    65  	}
    66  	p := c.p
    67  	p.sendMu.Lock()
    68  	id := p.sendID
    69  	c.ids = append(c.ids, id)
    70  	p.sendID++
    71  	err := p.c.Send(cmd, args...)
    72  	if flush {
    73  		err = p.c.Flush()
    74  	}
    75  	p.sendMu.Unlock()
    76  	return err
    77  }
    78  
    79  func (c *muxConn) Send(cmd string, args ...interface{}) error {
    80  	return c.send(false, cmd, args...)
    81  }
    82  
    83  func (c *muxConn) Flush() error {
    84  	p := c.p
    85  	p.sendMu.Lock()
    86  	err := p.c.Flush()
    87  	p.sendMu.Unlock()
    88  	return err
    89  }
    90  
    91  func (c *muxConn) Receive() (interface{}, error) {
    92  	if len(c.ids) == 0 {
    93  		return nil, errors.New("mux pool underflow")
    94  	}
    95  
    96  	id := c.ids[0]
    97  	c.ids = c.ids[1:]
    98  	if len(c.ids) == 0 {
    99  		c.ids = c.buf[:0]
   100  	}
   101  
   102  	p := c.p
   103  	p.recvMu.Lock()
   104  	if p.recvID != id {
   105  		ch := make(chan struct{})
   106  		p.recvWait[id] = ch
   107  		p.recvMu.Unlock()
   108  		<-ch
   109  		p.recvMu.Lock()
   110  		if p.recvID != id {
   111  			panic("out of sync")
   112  		}
   113  	}
   114  
   115  	v, err := p.c.Receive()
   116  
   117  	id++
   118  	p.recvID = id
   119  	ch, ok := p.recvWait[id]
   120  	if ok {
   121  		delete(p.recvWait, id)
   122  	}
   123  	p.recvMu.Unlock()
   124  	if ok {
   125  		ch <- struct{}{}
   126  	}
   127  
   128  	return v, err
   129  }
   130  
   131  func (c *muxConn) Close() error {
   132  	var err error
   133  	if len(c.ids) == 0 {
   134  		return nil
   135  	}
   136  	c.Flush()
   137  	for _ = range c.ids {
   138  		_, err = c.Receive()
   139  	}
   140  	return err
   141  }
   142  
   143  func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) {
   144  	if err := c.send(true, cmd, args...); err != nil {
   145  		return nil, err
   146  	}
   147  	return c.Receive()
   148  }
   149  
   150  func (c *muxConn) Err() error {
   151  	return c.p.c.Err()
   152  }
   153  

View as plain text