...

Source file src/github.com/gomodule/redigo/redis/pubsub.go

Documentation: github.com/gomodule/redigo/redis

     1  // Copyright 2012 Gary Burd
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"): you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    11  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    12  // License for the specific language governing permissions and limitations
    13  // under the License.
    14  
    15  package redis
    16  
    17  import (
    18  	"errors"
    19  	"time"
    20  )
    21  
    22  // Subscription represents a subscribe or unsubscribe notification.
    23  type Subscription struct {
    24  	// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
    25  	Kind string
    26  
    27  	// The channel that was changed.
    28  	Channel string
    29  
    30  	// The current number of subscriptions for connection.
    31  	Count int
    32  }
    33  
    34  // Message represents a message notification.
    35  type Message struct {
    36  	// The originating channel.
    37  	Channel string
    38  
    39  	// The matched pattern, if any
    40  	Pattern string
    41  
    42  	// The message data.
    43  	Data []byte
    44  }
    45  
    46  // Pong represents a pubsub pong notification.
    47  type Pong struct {
    48  	Data string
    49  }
    50  
    51  // PubSubConn wraps a Conn with convenience methods for subscribers.
    52  type PubSubConn struct {
    53  	Conn Conn
    54  }
    55  
    56  // Close closes the connection.
    57  func (c PubSubConn) Close() error {
    58  	return c.Conn.Close()
    59  }
    60  
    61  // Subscribe subscribes the connection to the specified channels.
    62  func (c PubSubConn) Subscribe(channel ...interface{}) error {
    63  	c.Conn.Send("SUBSCRIBE", channel...)
    64  	return c.Conn.Flush()
    65  }
    66  
    67  // PSubscribe subscribes the connection to the given patterns.
    68  func (c PubSubConn) PSubscribe(channel ...interface{}) error {
    69  	c.Conn.Send("PSUBSCRIBE", channel...)
    70  	return c.Conn.Flush()
    71  }
    72  
    73  // Unsubscribe unsubscribes the connection from the given channels, or from all
    74  // of them if none is given.
    75  func (c PubSubConn) Unsubscribe(channel ...interface{}) error {
    76  	c.Conn.Send("UNSUBSCRIBE", channel...)
    77  	return c.Conn.Flush()
    78  }
    79  
    80  // PUnsubscribe unsubscribes the connection from the given patterns, or from all
    81  // of them if none is given.
    82  func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
    83  	c.Conn.Send("PUNSUBSCRIBE", channel...)
    84  	return c.Conn.Flush()
    85  }
    86  
    87  // Ping sends a PING to the server with the specified data.
    88  //
    89  // The connection must be subscribed to at least one channel or pattern when
    90  // calling this method.
    91  func (c PubSubConn) Ping(data string) error {
    92  	c.Conn.Send("PING", data)
    93  	return c.Conn.Flush()
    94  }
    95  
    96  // Receive returns a pushed message as a Subscription, Message, Pong or error.
    97  // The return value is intended to be used directly in a type switch as
    98  // illustrated in the PubSubConn example.
    99  func (c PubSubConn) Receive() interface{} {
   100  	return c.receiveInternal(c.Conn.Receive())
   101  }
   102  
   103  // ReceiveWithTimeout is like Receive, but it allows the application to
   104  // override the connection's default timeout.
   105  func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{} {
   106  	return c.receiveInternal(ReceiveWithTimeout(c.Conn, timeout))
   107  }
   108  
   109  func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
   110  	reply, err := Values(replyArg, errArg)
   111  	if err != nil {
   112  		return err
   113  	}
   114  
   115  	var kind string
   116  	reply, err = Scan(reply, &kind)
   117  	if err != nil {
   118  		return err
   119  	}
   120  
   121  	switch kind {
   122  	case "message":
   123  		var m Message
   124  		if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
   125  			return err
   126  		}
   127  		return m
   128  	case "pmessage":
   129  		var m Message
   130  		if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
   131  			return err
   132  		}
   133  		return m
   134  	case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
   135  		s := Subscription{Kind: kind}
   136  		if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
   137  			return err
   138  		}
   139  		return s
   140  	case "pong":
   141  		var p Pong
   142  		if _, err := Scan(reply, &p.Data); err != nil {
   143  			return err
   144  		}
   145  		return p
   146  	}
   147  	return errors.New("redigo: unknown pubsub notification")
   148  }
   149  

View as plain text