...

Source file src/github.com/lib/pq/notify_test.go

Documentation: github.com/lib/pq

     1  package pq
     2  
     3  import (
     4  	"database/sql"
     5  	"database/sql/driver"
     6  	"errors"
     7  	"fmt"
     8  	"io"
     9  	"os"
    10  	"runtime"
    11  	"sync"
    12  	"testing"
    13  	"time"
    14  )
    15  
    16  var errNilNotification = errors.New("nil notification")
    17  
    18  func expectNotification(t *testing.T, ch <-chan *Notification, relname string, extra string) error {
    19  	select {
    20  	case n := <-ch:
    21  		if n == nil {
    22  			return errNilNotification
    23  		}
    24  		if n.Channel != relname || n.Extra != extra {
    25  			return fmt.Errorf("unexpected notification %v", n)
    26  		}
    27  		return nil
    28  	case <-time.After(1500 * time.Millisecond):
    29  		return fmt.Errorf("timeout")
    30  	}
    31  }
    32  
    33  func expectNoNotification(t *testing.T, ch <-chan *Notification) error {
    34  	select {
    35  	case n := <-ch:
    36  		return fmt.Errorf("unexpected notification %v", n)
    37  	case <-time.After(100 * time.Millisecond):
    38  		return nil
    39  	}
    40  }
    41  
    42  func expectEvent(t *testing.T, eventch <-chan ListenerEventType, et ListenerEventType) error {
    43  	select {
    44  	case e := <-eventch:
    45  		if e != et {
    46  			return fmt.Errorf("unexpected event %v", e)
    47  		}
    48  		return nil
    49  	case <-time.After(1500 * time.Millisecond):
    50  		panic("expectEvent timeout")
    51  	}
    52  }
    53  
    54  func expectNoEvent(t *testing.T, eventch <-chan ListenerEventType) error {
    55  	select {
    56  	case e := <-eventch:
    57  		return fmt.Errorf("unexpected event %v", e)
    58  	case <-time.After(100 * time.Millisecond):
    59  		return nil
    60  	}
    61  }
    62  
    63  func newTestListenerConn(t *testing.T) (*ListenerConn, <-chan *Notification) {
    64  	datname := os.Getenv("PGDATABASE")
    65  	sslmode := os.Getenv("PGSSLMODE")
    66  
    67  	if datname == "" {
    68  		os.Setenv("PGDATABASE", "pqgotest")
    69  	}
    70  
    71  	if sslmode == "" {
    72  		os.Setenv("PGSSLMODE", "disable")
    73  	}
    74  
    75  	notificationChan := make(chan *Notification)
    76  	l, err := NewListenerConn("", notificationChan)
    77  	if err != nil {
    78  		t.Fatal(err)
    79  	}
    80  
    81  	return l, notificationChan
    82  }
    83  
    84  func TestNewListenerConn(t *testing.T) {
    85  	l, _ := newTestListenerConn(t)
    86  
    87  	defer l.Close()
    88  }
    89  
    90  func TestConnListen(t *testing.T) {
    91  	l, channel := newTestListenerConn(t)
    92  
    93  	defer l.Close()
    94  
    95  	db := openTestConn(t)
    96  	defer db.Close()
    97  
    98  	ok, err := l.Listen("notify_test")
    99  	if !ok || err != nil {
   100  		t.Fatal(err)
   101  	}
   102  
   103  	_, err = db.Exec("NOTIFY notify_test")
   104  	if err != nil {
   105  		t.Fatal(err)
   106  	}
   107  
   108  	err = expectNotification(t, channel, "notify_test", "")
   109  	if err != nil {
   110  		t.Fatal(err)
   111  	}
   112  }
   113  
   114  func TestConnUnlisten(t *testing.T) {
   115  	l, channel := newTestListenerConn(t)
   116  
   117  	defer l.Close()
   118  
   119  	db := openTestConn(t)
   120  	defer db.Close()
   121  
   122  	ok, err := l.Listen("notify_test")
   123  	if !ok || err != nil {
   124  		t.Fatal(err)
   125  	}
   126  
   127  	_, err = db.Exec("NOTIFY notify_test")
   128  	if err != nil {
   129  		t.Fatal(err)
   130  	}
   131  
   132  	err = expectNotification(t, channel, "notify_test", "")
   133  	if err != nil {
   134  		t.Fatal(err)
   135  	}
   136  
   137  	ok, err = l.Unlisten("notify_test")
   138  	if !ok || err != nil {
   139  		t.Fatal(err)
   140  	}
   141  
   142  	_, err = db.Exec("NOTIFY notify_test")
   143  	if err != nil {
   144  		t.Fatal(err)
   145  	}
   146  
   147  	err = expectNoNotification(t, channel)
   148  	if err != nil {
   149  		t.Fatal(err)
   150  	}
   151  }
   152  
   153  func TestConnUnlistenAll(t *testing.T) {
   154  	l, channel := newTestListenerConn(t)
   155  
   156  	defer l.Close()
   157  
   158  	db := openTestConn(t)
   159  	defer db.Close()
   160  
   161  	ok, err := l.Listen("notify_test")
   162  	if !ok || err != nil {
   163  		t.Fatal(err)
   164  	}
   165  
   166  	_, err = db.Exec("NOTIFY notify_test")
   167  	if err != nil {
   168  		t.Fatal(err)
   169  	}
   170  
   171  	err = expectNotification(t, channel, "notify_test", "")
   172  	if err != nil {
   173  		t.Fatal(err)
   174  	}
   175  
   176  	ok, err = l.UnlistenAll()
   177  	if !ok || err != nil {
   178  		t.Fatal(err)
   179  	}
   180  
   181  	_, err = db.Exec("NOTIFY notify_test")
   182  	if err != nil {
   183  		t.Fatal(err)
   184  	}
   185  
   186  	err = expectNoNotification(t, channel)
   187  	if err != nil {
   188  		t.Fatal(err)
   189  	}
   190  }
   191  
   192  func TestConnClose(t *testing.T) {
   193  	l, _ := newTestListenerConn(t)
   194  	defer l.Close()
   195  
   196  	err := l.Close()
   197  	if err != nil {
   198  		t.Fatal(err)
   199  	}
   200  	err = l.Close()
   201  	if err != errListenerConnClosed {
   202  		t.Fatalf("expected errListenerConnClosed; got %v", err)
   203  	}
   204  }
   205  
   206  func TestConnPing(t *testing.T) {
   207  	l, _ := newTestListenerConn(t)
   208  	defer l.Close()
   209  	err := l.Ping()
   210  	if err != nil {
   211  		t.Fatal(err)
   212  	}
   213  	err = l.Close()
   214  	if err != nil {
   215  		t.Fatal(err)
   216  	}
   217  	err = l.Ping()
   218  	if err != errListenerConnClosed {
   219  		t.Fatalf("expected errListenerConnClosed; got %v", err)
   220  	}
   221  }
   222  
   223  // Test for deadlock where a query fails while another one is queued
   224  func TestConnExecDeadlock(t *testing.T) {
   225  	l, _ := newTestListenerConn(t)
   226  	defer l.Close()
   227  
   228  	var wg sync.WaitGroup
   229  	wg.Add(2)
   230  
   231  	go func() {
   232  		l.ExecSimpleQuery("SELECT pg_sleep(60)")
   233  		wg.Done()
   234  	}()
   235  	runtime.Gosched()
   236  	go func() {
   237  		l.ExecSimpleQuery("SELECT 1")
   238  		wg.Done()
   239  	}()
   240  	// give the two goroutines some time to get into position
   241  	runtime.Gosched()
   242  	// calls Close on the net.Conn; equivalent to a network failure
   243  	l.Close()
   244  
   245  	defer time.AfterFunc(10*time.Second, func() {
   246  		panic("timed out")
   247  	}).Stop()
   248  	wg.Wait()
   249  }
   250  
   251  // Test for ListenerConn being closed while a slow query is executing
   252  func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) {
   253  	l, _ := newTestListenerConn(t)
   254  	defer l.Close()
   255  
   256  	var wg sync.WaitGroup
   257  	wg.Add(1)
   258  
   259  	go func() {
   260  		sent, err := l.ExecSimpleQuery("SELECT pg_sleep(60)")
   261  		if sent {
   262  			panic("expected sent=false")
   263  		}
   264  		// could be any of a number of errors
   265  		if err == nil {
   266  			panic("expected error")
   267  		}
   268  		wg.Done()
   269  	}()
   270  	// give the above goroutine some time to get into position
   271  	runtime.Gosched()
   272  	err := l.Close()
   273  	if err != nil {
   274  		t.Fatal(err)
   275  	}
   276  
   277  	defer time.AfterFunc(10*time.Second, func() {
   278  		panic("timed out")
   279  	}).Stop()
   280  	wg.Wait()
   281  }
   282  
   283  func TestNotifyExtra(t *testing.T) {
   284  	db := openTestConn(t)
   285  	defer db.Close()
   286  
   287  	if getServerVersion(t, db) < 90000 {
   288  		t.Skip("skipping NOTIFY payload test since the server does not appear to support it")
   289  	}
   290  
   291  	l, channel := newTestListenerConn(t)
   292  	defer l.Close()
   293  
   294  	ok, err := l.Listen("notify_test")
   295  	if !ok || err != nil {
   296  		t.Fatal(err)
   297  	}
   298  
   299  	_, err = db.Exec("NOTIFY notify_test, 'something'")
   300  	if err != nil {
   301  		t.Fatal(err)
   302  	}
   303  
   304  	err = expectNotification(t, channel, "notify_test", "something")
   305  	if err != nil {
   306  		t.Fatal(err)
   307  	}
   308  }
   309  
   310  // create a new test listener and also set the timeouts
   311  func newTestListenerTimeout(t *testing.T, min time.Duration, max time.Duration) (*Listener, <-chan ListenerEventType) {
   312  	datname := os.Getenv("PGDATABASE")
   313  	sslmode := os.Getenv("PGSSLMODE")
   314  
   315  	if datname == "" {
   316  		os.Setenv("PGDATABASE", "pqgotest")
   317  	}
   318  
   319  	if sslmode == "" {
   320  		os.Setenv("PGSSLMODE", "disable")
   321  	}
   322  
   323  	eventch := make(chan ListenerEventType, 16)
   324  	l := NewListener("", min, max, func(t ListenerEventType, err error) { eventch <- t })
   325  	err := expectEvent(t, eventch, ListenerEventConnected)
   326  	if err != nil {
   327  		t.Fatal(err)
   328  	}
   329  	return l, eventch
   330  }
   331  
   332  func newTestListener(t *testing.T) (*Listener, <-chan ListenerEventType) {
   333  	return newTestListenerTimeout(t, time.Hour, time.Hour)
   334  }
   335  
   336  func TestListenerListen(t *testing.T) {
   337  	l, _ := newTestListener(t)
   338  	defer l.Close()
   339  
   340  	db := openTestConn(t)
   341  	defer db.Close()
   342  
   343  	err := l.Listen("notify_listen_test")
   344  	if err != nil {
   345  		t.Fatal(err)
   346  	}
   347  
   348  	_, err = db.Exec("NOTIFY notify_listen_test")
   349  	if err != nil {
   350  		t.Fatal(err)
   351  	}
   352  
   353  	err = expectNotification(t, l.Notify, "notify_listen_test", "")
   354  	if err != nil {
   355  		t.Fatal(err)
   356  	}
   357  }
   358  
   359  func TestListenerUnlisten(t *testing.T) {
   360  	l, _ := newTestListener(t)
   361  	defer l.Close()
   362  
   363  	db := openTestConn(t)
   364  	defer db.Close()
   365  
   366  	err := l.Listen("notify_listen_test")
   367  	if err != nil {
   368  		t.Fatal(err)
   369  	}
   370  
   371  	_, err = db.Exec("NOTIFY notify_listen_test")
   372  	if err != nil {
   373  		t.Fatal(err)
   374  	}
   375  
   376  	err = l.Unlisten("notify_listen_test")
   377  	if err != nil {
   378  		t.Fatal(err)
   379  	}
   380  
   381  	err = expectNotification(t, l.Notify, "notify_listen_test", "")
   382  	if err != nil {
   383  		t.Fatal(err)
   384  	}
   385  
   386  	_, err = db.Exec("NOTIFY notify_listen_test")
   387  	if err != nil {
   388  		t.Fatal(err)
   389  	}
   390  
   391  	err = expectNoNotification(t, l.Notify)
   392  	if err != nil {
   393  		t.Fatal(err)
   394  	}
   395  }
   396  
   397  func TestListenerUnlistenAll(t *testing.T) {
   398  	l, _ := newTestListener(t)
   399  	defer l.Close()
   400  
   401  	db := openTestConn(t)
   402  	defer db.Close()
   403  
   404  	err := l.Listen("notify_listen_test")
   405  	if err != nil {
   406  		t.Fatal(err)
   407  	}
   408  
   409  	_, err = db.Exec("NOTIFY notify_listen_test")
   410  	if err != nil {
   411  		t.Fatal(err)
   412  	}
   413  
   414  	err = l.UnlistenAll()
   415  	if err != nil {
   416  		t.Fatal(err)
   417  	}
   418  
   419  	err = expectNotification(t, l.Notify, "notify_listen_test", "")
   420  	if err != nil {
   421  		t.Fatal(err)
   422  	}
   423  
   424  	_, err = db.Exec("NOTIFY notify_listen_test")
   425  	if err != nil {
   426  		t.Fatal(err)
   427  	}
   428  
   429  	err = expectNoNotification(t, l.Notify)
   430  	if err != nil {
   431  		t.Fatal(err)
   432  	}
   433  }
   434  
   435  func TestListenerFailedQuery(t *testing.T) {
   436  	l, eventch := newTestListener(t)
   437  	defer l.Close()
   438  
   439  	db := openTestConn(t)
   440  	defer db.Close()
   441  
   442  	err := l.Listen("notify_listen_test")
   443  	if err != nil {
   444  		t.Fatal(err)
   445  	}
   446  
   447  	_, err = db.Exec("NOTIFY notify_listen_test")
   448  	if err != nil {
   449  		t.Fatal(err)
   450  	}
   451  
   452  	err = expectNotification(t, l.Notify, "notify_listen_test", "")
   453  	if err != nil {
   454  		t.Fatal(err)
   455  	}
   456  
   457  	// shouldn't cause a disconnect
   458  	ok, err := l.cn.ExecSimpleQuery("SELECT error")
   459  	if !ok {
   460  		t.Fatalf("could not send query to server: %v", err)
   461  	}
   462  	_, ok = err.(PGError)
   463  	if !ok {
   464  		t.Fatalf("unexpected error %v", err)
   465  	}
   466  	err = expectNoEvent(t, eventch)
   467  	if err != nil {
   468  		t.Fatal(err)
   469  	}
   470  
   471  	// should still work
   472  	_, err = db.Exec("NOTIFY notify_listen_test")
   473  	if err != nil {
   474  		t.Fatal(err)
   475  	}
   476  
   477  	err = expectNotification(t, l.Notify, "notify_listen_test", "")
   478  	if err != nil {
   479  		t.Fatal(err)
   480  	}
   481  }
   482  
   483  func TestListenerReconnect(t *testing.T) {
   484  	l, eventch := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
   485  	defer l.Close()
   486  
   487  	db := openTestConn(t)
   488  	defer db.Close()
   489  
   490  	err := l.Listen("notify_listen_test")
   491  	if err != nil {
   492  		t.Fatal(err)
   493  	}
   494  
   495  	_, err = db.Exec("NOTIFY notify_listen_test")
   496  	if err != nil {
   497  		t.Fatal(err)
   498  	}
   499  
   500  	err = expectNotification(t, l.Notify, "notify_listen_test", "")
   501  	if err != nil {
   502  		t.Fatal(err)
   503  	}
   504  
   505  	// kill the connection and make sure it comes back up
   506  	ok, err := l.cn.ExecSimpleQuery("SELECT pg_terminate_backend(pg_backend_pid())")
   507  	if ok {
   508  		t.Fatalf("could not kill the connection: %v", err)
   509  	}
   510  	if err != io.EOF {
   511  		t.Fatalf("unexpected error %v", err)
   512  	}
   513  	err = expectEvent(t, eventch, ListenerEventDisconnected)
   514  	if err != nil {
   515  		t.Fatal(err)
   516  	}
   517  	err = expectEvent(t, eventch, ListenerEventReconnected)
   518  	if err != nil {
   519  		t.Fatal(err)
   520  	}
   521  
   522  	// should still work
   523  	_, err = db.Exec("NOTIFY notify_listen_test")
   524  	if err != nil {
   525  		t.Fatal(err)
   526  	}
   527  
   528  	// should get nil after Reconnected
   529  	err = expectNotification(t, l.Notify, "", "")
   530  	if err != errNilNotification {
   531  		t.Fatal(err)
   532  	}
   533  
   534  	err = expectNotification(t, l.Notify, "notify_listen_test", "")
   535  	if err != nil {
   536  		t.Fatal(err)
   537  	}
   538  }
   539  
   540  func TestListenerClose(t *testing.T) {
   541  	l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
   542  	defer l.Close()
   543  
   544  	err := l.Close()
   545  	if err != nil {
   546  		t.Fatal(err)
   547  	}
   548  	err = l.Close()
   549  	if err != errListenerClosed {
   550  		t.Fatalf("expected errListenerClosed; got %v", err)
   551  	}
   552  }
   553  
   554  func TestListenerPing(t *testing.T) {
   555  	l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
   556  	defer l.Close()
   557  
   558  	err := l.Ping()
   559  	if err != nil {
   560  		t.Fatal(err)
   561  	}
   562  
   563  	err = l.Close()
   564  	if err != nil {
   565  		t.Fatal(err)
   566  	}
   567  
   568  	err = l.Ping()
   569  	if err != errListenerClosed {
   570  		t.Fatalf("expected errListenerClosed; got %v", err)
   571  	}
   572  }
   573  
   574  func TestConnectorWithNotificationHandler_Simple(t *testing.T) {
   575  	b, err := NewConnector("")
   576  	if err != nil {
   577  		t.Fatal(err)
   578  	}
   579  	var notification *Notification
   580  	// Make connector w/ handler to set the local var
   581  	c := ConnectorWithNotificationHandler(b, func(n *Notification) { notification = n })
   582  	sendNotification(c, t, "Test notification #1")
   583  	if notification == nil || notification.Extra != "Test notification #1" {
   584  		t.Fatalf("Expected notification w/ message, got %v", notification)
   585  	}
   586  	// Unset the handler on the same connector
   587  	prevC := c
   588  	if c = ConnectorWithNotificationHandler(c, nil); c != prevC {
   589  		t.Fatalf("Expected to not create new connector but did")
   590  	}
   591  	sendNotification(c, t, "Test notification #2")
   592  	if notification == nil || notification.Extra != "Test notification #1" {
   593  		t.Fatalf("Expected notification to not change, got %v", notification)
   594  	}
   595  	// Set it back on the same connector
   596  	if c = ConnectorWithNotificationHandler(c, func(n *Notification) { notification = n }); c != prevC {
   597  		t.Fatal("Expected to not create new connector but did")
   598  	}
   599  	sendNotification(c, t, "Test notification #3")
   600  	if notification == nil || notification.Extra != "Test notification #3" {
   601  		t.Fatalf("Expected notification w/ message, got %v", notification)
   602  	}
   603  }
   604  
   605  func sendNotification(c driver.Connector, t *testing.T, escapedNotification string) {
   606  	db := sql.OpenDB(c)
   607  	defer db.Close()
   608  	sql := fmt.Sprintf("LISTEN foo; NOTIFY foo, '%s';", escapedNotification)
   609  	if _, err := db.Exec(sql); err != nil {
   610  		t.Fatal(err)
   611  	}
   612  }
   613  

View as plain text