1 package pq
2
3 import (
4 "database/sql"
5 "database/sql/driver"
6 "errors"
7 "fmt"
8 "io"
9 "os"
10 "runtime"
11 "sync"
12 "testing"
13 "time"
14 )
15
16 var errNilNotification = errors.New("nil notification")
17
18 func expectNotification(t *testing.T, ch <-chan *Notification, relname string, extra string) error {
19 select {
20 case n := <-ch:
21 if n == nil {
22 return errNilNotification
23 }
24 if n.Channel != relname || n.Extra != extra {
25 return fmt.Errorf("unexpected notification %v", n)
26 }
27 return nil
28 case <-time.After(1500 * time.Millisecond):
29 return fmt.Errorf("timeout")
30 }
31 }
32
33 func expectNoNotification(t *testing.T, ch <-chan *Notification) error {
34 select {
35 case n := <-ch:
36 return fmt.Errorf("unexpected notification %v", n)
37 case <-time.After(100 * time.Millisecond):
38 return nil
39 }
40 }
41
42 func expectEvent(t *testing.T, eventch <-chan ListenerEventType, et ListenerEventType) error {
43 select {
44 case e := <-eventch:
45 if e != et {
46 return fmt.Errorf("unexpected event %v", e)
47 }
48 return nil
49 case <-time.After(1500 * time.Millisecond):
50 panic("expectEvent timeout")
51 }
52 }
53
54 func expectNoEvent(t *testing.T, eventch <-chan ListenerEventType) error {
55 select {
56 case e := <-eventch:
57 return fmt.Errorf("unexpected event %v", e)
58 case <-time.After(100 * time.Millisecond):
59 return nil
60 }
61 }
62
63 func newTestListenerConn(t *testing.T) (*ListenerConn, <-chan *Notification) {
64 datname := os.Getenv("PGDATABASE")
65 sslmode := os.Getenv("PGSSLMODE")
66
67 if datname == "" {
68 os.Setenv("PGDATABASE", "pqgotest")
69 }
70
71 if sslmode == "" {
72 os.Setenv("PGSSLMODE", "disable")
73 }
74
75 notificationChan := make(chan *Notification)
76 l, err := NewListenerConn("", notificationChan)
77 if err != nil {
78 t.Fatal(err)
79 }
80
81 return l, notificationChan
82 }
83
84 func TestNewListenerConn(t *testing.T) {
85 l, _ := newTestListenerConn(t)
86
87 defer l.Close()
88 }
89
90 func TestConnListen(t *testing.T) {
91 l, channel := newTestListenerConn(t)
92
93 defer l.Close()
94
95 db := openTestConn(t)
96 defer db.Close()
97
98 ok, err := l.Listen("notify_test")
99 if !ok || err != nil {
100 t.Fatal(err)
101 }
102
103 _, err = db.Exec("NOTIFY notify_test")
104 if err != nil {
105 t.Fatal(err)
106 }
107
108 err = expectNotification(t, channel, "notify_test", "")
109 if err != nil {
110 t.Fatal(err)
111 }
112 }
113
114 func TestConnUnlisten(t *testing.T) {
115 l, channel := newTestListenerConn(t)
116
117 defer l.Close()
118
119 db := openTestConn(t)
120 defer db.Close()
121
122 ok, err := l.Listen("notify_test")
123 if !ok || err != nil {
124 t.Fatal(err)
125 }
126
127 _, err = db.Exec("NOTIFY notify_test")
128 if err != nil {
129 t.Fatal(err)
130 }
131
132 err = expectNotification(t, channel, "notify_test", "")
133 if err != nil {
134 t.Fatal(err)
135 }
136
137 ok, err = l.Unlisten("notify_test")
138 if !ok || err != nil {
139 t.Fatal(err)
140 }
141
142 _, err = db.Exec("NOTIFY notify_test")
143 if err != nil {
144 t.Fatal(err)
145 }
146
147 err = expectNoNotification(t, channel)
148 if err != nil {
149 t.Fatal(err)
150 }
151 }
152
153 func TestConnUnlistenAll(t *testing.T) {
154 l, channel := newTestListenerConn(t)
155
156 defer l.Close()
157
158 db := openTestConn(t)
159 defer db.Close()
160
161 ok, err := l.Listen("notify_test")
162 if !ok || err != nil {
163 t.Fatal(err)
164 }
165
166 _, err = db.Exec("NOTIFY notify_test")
167 if err != nil {
168 t.Fatal(err)
169 }
170
171 err = expectNotification(t, channel, "notify_test", "")
172 if err != nil {
173 t.Fatal(err)
174 }
175
176 ok, err = l.UnlistenAll()
177 if !ok || err != nil {
178 t.Fatal(err)
179 }
180
181 _, err = db.Exec("NOTIFY notify_test")
182 if err != nil {
183 t.Fatal(err)
184 }
185
186 err = expectNoNotification(t, channel)
187 if err != nil {
188 t.Fatal(err)
189 }
190 }
191
192 func TestConnClose(t *testing.T) {
193 l, _ := newTestListenerConn(t)
194 defer l.Close()
195
196 err := l.Close()
197 if err != nil {
198 t.Fatal(err)
199 }
200 err = l.Close()
201 if err != errListenerConnClosed {
202 t.Fatalf("expected errListenerConnClosed; got %v", err)
203 }
204 }
205
206 func TestConnPing(t *testing.T) {
207 l, _ := newTestListenerConn(t)
208 defer l.Close()
209 err := l.Ping()
210 if err != nil {
211 t.Fatal(err)
212 }
213 err = l.Close()
214 if err != nil {
215 t.Fatal(err)
216 }
217 err = l.Ping()
218 if err != errListenerConnClosed {
219 t.Fatalf("expected errListenerConnClosed; got %v", err)
220 }
221 }
222
223
224 func TestConnExecDeadlock(t *testing.T) {
225 l, _ := newTestListenerConn(t)
226 defer l.Close()
227
228 var wg sync.WaitGroup
229 wg.Add(2)
230
231 go func() {
232 l.ExecSimpleQuery("SELECT pg_sleep(60)")
233 wg.Done()
234 }()
235 runtime.Gosched()
236 go func() {
237 l.ExecSimpleQuery("SELECT 1")
238 wg.Done()
239 }()
240
241 runtime.Gosched()
242
243 l.Close()
244
245 defer time.AfterFunc(10*time.Second, func() {
246 panic("timed out")
247 }).Stop()
248 wg.Wait()
249 }
250
251
252 func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) {
253 l, _ := newTestListenerConn(t)
254 defer l.Close()
255
256 var wg sync.WaitGroup
257 wg.Add(1)
258
259 go func() {
260 sent, err := l.ExecSimpleQuery("SELECT pg_sleep(60)")
261 if sent {
262 panic("expected sent=false")
263 }
264
265 if err == nil {
266 panic("expected error")
267 }
268 wg.Done()
269 }()
270
271 runtime.Gosched()
272 err := l.Close()
273 if err != nil {
274 t.Fatal(err)
275 }
276
277 defer time.AfterFunc(10*time.Second, func() {
278 panic("timed out")
279 }).Stop()
280 wg.Wait()
281 }
282
283 func TestNotifyExtra(t *testing.T) {
284 db := openTestConn(t)
285 defer db.Close()
286
287 if getServerVersion(t, db) < 90000 {
288 t.Skip("skipping NOTIFY payload test since the server does not appear to support it")
289 }
290
291 l, channel := newTestListenerConn(t)
292 defer l.Close()
293
294 ok, err := l.Listen("notify_test")
295 if !ok || err != nil {
296 t.Fatal(err)
297 }
298
299 _, err = db.Exec("NOTIFY notify_test, 'something'")
300 if err != nil {
301 t.Fatal(err)
302 }
303
304 err = expectNotification(t, channel, "notify_test", "something")
305 if err != nil {
306 t.Fatal(err)
307 }
308 }
309
310
311 func newTestListenerTimeout(t *testing.T, min time.Duration, max time.Duration) (*Listener, <-chan ListenerEventType) {
312 datname := os.Getenv("PGDATABASE")
313 sslmode := os.Getenv("PGSSLMODE")
314
315 if datname == "" {
316 os.Setenv("PGDATABASE", "pqgotest")
317 }
318
319 if sslmode == "" {
320 os.Setenv("PGSSLMODE", "disable")
321 }
322
323 eventch := make(chan ListenerEventType, 16)
324 l := NewListener("", min, max, func(t ListenerEventType, err error) { eventch <- t })
325 err := expectEvent(t, eventch, ListenerEventConnected)
326 if err != nil {
327 t.Fatal(err)
328 }
329 return l, eventch
330 }
331
332 func newTestListener(t *testing.T) (*Listener, <-chan ListenerEventType) {
333 return newTestListenerTimeout(t, time.Hour, time.Hour)
334 }
335
336 func TestListenerListen(t *testing.T) {
337 l, _ := newTestListener(t)
338 defer l.Close()
339
340 db := openTestConn(t)
341 defer db.Close()
342
343 err := l.Listen("notify_listen_test")
344 if err != nil {
345 t.Fatal(err)
346 }
347
348 _, err = db.Exec("NOTIFY notify_listen_test")
349 if err != nil {
350 t.Fatal(err)
351 }
352
353 err = expectNotification(t, l.Notify, "notify_listen_test", "")
354 if err != nil {
355 t.Fatal(err)
356 }
357 }
358
359 func TestListenerUnlisten(t *testing.T) {
360 l, _ := newTestListener(t)
361 defer l.Close()
362
363 db := openTestConn(t)
364 defer db.Close()
365
366 err := l.Listen("notify_listen_test")
367 if err != nil {
368 t.Fatal(err)
369 }
370
371 _, err = db.Exec("NOTIFY notify_listen_test")
372 if err != nil {
373 t.Fatal(err)
374 }
375
376 err = l.Unlisten("notify_listen_test")
377 if err != nil {
378 t.Fatal(err)
379 }
380
381 err = expectNotification(t, l.Notify, "notify_listen_test", "")
382 if err != nil {
383 t.Fatal(err)
384 }
385
386 _, err = db.Exec("NOTIFY notify_listen_test")
387 if err != nil {
388 t.Fatal(err)
389 }
390
391 err = expectNoNotification(t, l.Notify)
392 if err != nil {
393 t.Fatal(err)
394 }
395 }
396
397 func TestListenerUnlistenAll(t *testing.T) {
398 l, _ := newTestListener(t)
399 defer l.Close()
400
401 db := openTestConn(t)
402 defer db.Close()
403
404 err := l.Listen("notify_listen_test")
405 if err != nil {
406 t.Fatal(err)
407 }
408
409 _, err = db.Exec("NOTIFY notify_listen_test")
410 if err != nil {
411 t.Fatal(err)
412 }
413
414 err = l.UnlistenAll()
415 if err != nil {
416 t.Fatal(err)
417 }
418
419 err = expectNotification(t, l.Notify, "notify_listen_test", "")
420 if err != nil {
421 t.Fatal(err)
422 }
423
424 _, err = db.Exec("NOTIFY notify_listen_test")
425 if err != nil {
426 t.Fatal(err)
427 }
428
429 err = expectNoNotification(t, l.Notify)
430 if err != nil {
431 t.Fatal(err)
432 }
433 }
434
435 func TestListenerFailedQuery(t *testing.T) {
436 l, eventch := newTestListener(t)
437 defer l.Close()
438
439 db := openTestConn(t)
440 defer db.Close()
441
442 err := l.Listen("notify_listen_test")
443 if err != nil {
444 t.Fatal(err)
445 }
446
447 _, err = db.Exec("NOTIFY notify_listen_test")
448 if err != nil {
449 t.Fatal(err)
450 }
451
452 err = expectNotification(t, l.Notify, "notify_listen_test", "")
453 if err != nil {
454 t.Fatal(err)
455 }
456
457
458 ok, err := l.cn.ExecSimpleQuery("SELECT error")
459 if !ok {
460 t.Fatalf("could not send query to server: %v", err)
461 }
462 _, ok = err.(PGError)
463 if !ok {
464 t.Fatalf("unexpected error %v", err)
465 }
466 err = expectNoEvent(t, eventch)
467 if err != nil {
468 t.Fatal(err)
469 }
470
471
472 _, err = db.Exec("NOTIFY notify_listen_test")
473 if err != nil {
474 t.Fatal(err)
475 }
476
477 err = expectNotification(t, l.Notify, "notify_listen_test", "")
478 if err != nil {
479 t.Fatal(err)
480 }
481 }
482
483 func TestListenerReconnect(t *testing.T) {
484 l, eventch := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
485 defer l.Close()
486
487 db := openTestConn(t)
488 defer db.Close()
489
490 err := l.Listen("notify_listen_test")
491 if err != nil {
492 t.Fatal(err)
493 }
494
495 _, err = db.Exec("NOTIFY notify_listen_test")
496 if err != nil {
497 t.Fatal(err)
498 }
499
500 err = expectNotification(t, l.Notify, "notify_listen_test", "")
501 if err != nil {
502 t.Fatal(err)
503 }
504
505
506 ok, err := l.cn.ExecSimpleQuery("SELECT pg_terminate_backend(pg_backend_pid())")
507 if ok {
508 t.Fatalf("could not kill the connection: %v", err)
509 }
510 if err != io.EOF {
511 t.Fatalf("unexpected error %v", err)
512 }
513 err = expectEvent(t, eventch, ListenerEventDisconnected)
514 if err != nil {
515 t.Fatal(err)
516 }
517 err = expectEvent(t, eventch, ListenerEventReconnected)
518 if err != nil {
519 t.Fatal(err)
520 }
521
522
523 _, err = db.Exec("NOTIFY notify_listen_test")
524 if err != nil {
525 t.Fatal(err)
526 }
527
528
529 err = expectNotification(t, l.Notify, "", "")
530 if err != errNilNotification {
531 t.Fatal(err)
532 }
533
534 err = expectNotification(t, l.Notify, "notify_listen_test", "")
535 if err != nil {
536 t.Fatal(err)
537 }
538 }
539
540 func TestListenerClose(t *testing.T) {
541 l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
542 defer l.Close()
543
544 err := l.Close()
545 if err != nil {
546 t.Fatal(err)
547 }
548 err = l.Close()
549 if err != errListenerClosed {
550 t.Fatalf("expected errListenerClosed; got %v", err)
551 }
552 }
553
554 func TestListenerPing(t *testing.T) {
555 l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
556 defer l.Close()
557
558 err := l.Ping()
559 if err != nil {
560 t.Fatal(err)
561 }
562
563 err = l.Close()
564 if err != nil {
565 t.Fatal(err)
566 }
567
568 err = l.Ping()
569 if err != errListenerClosed {
570 t.Fatalf("expected errListenerClosed; got %v", err)
571 }
572 }
573
574 func TestConnectorWithNotificationHandler_Simple(t *testing.T) {
575 b, err := NewConnector("")
576 if err != nil {
577 t.Fatal(err)
578 }
579 var notification *Notification
580
581 c := ConnectorWithNotificationHandler(b, func(n *Notification) { notification = n })
582 sendNotification(c, t, "Test notification #1")
583 if notification == nil || notification.Extra != "Test notification #1" {
584 t.Fatalf("Expected notification w/ message, got %v", notification)
585 }
586
587 prevC := c
588 if c = ConnectorWithNotificationHandler(c, nil); c != prevC {
589 t.Fatalf("Expected to not create new connector but did")
590 }
591 sendNotification(c, t, "Test notification #2")
592 if notification == nil || notification.Extra != "Test notification #1" {
593 t.Fatalf("Expected notification to not change, got %v", notification)
594 }
595
596 if c = ConnectorWithNotificationHandler(c, func(n *Notification) { notification = n }); c != prevC {
597 t.Fatal("Expected to not create new connector but did")
598 }
599 sendNotification(c, t, "Test notification #3")
600 if notification == nil || notification.Extra != "Test notification #3" {
601 t.Fatalf("Expected notification w/ message, got %v", notification)
602 }
603 }
604
605 func sendNotification(c driver.Connector, t *testing.T, escapedNotification string) {
606 db := sql.OpenDB(c)
607 defer db.Close()
608 sql := fmt.Sprintf("LISTEN foo; NOTIFY foo, '%s';", escapedNotification)
609 if _, err := db.Exec(sql); err != nil {
610 t.Fatal(err)
611 }
612 }
613
View as plain text