1 /* 2 3 Package listen is a self-contained Go program which uses the LISTEN / NOTIFY 4 mechanism to avoid polling the database while waiting for more work to arrive. 5 6 // 7 // You can see the program in action by defining a function similar to 8 // the following: 9 // 10 // CREATE OR REPLACE FUNCTION public.get_work() 11 // RETURNS bigint 12 // LANGUAGE sql 13 // AS $$ 14 // SELECT CASE WHEN random() >= 0.2 THEN int8 '1' END 15 // $$ 16 // ; 17 18 package main 19 20 import ( 21 "database/sql" 22 "fmt" 23 "time" 24 25 "github.com/lib/pq" 26 ) 27 28 func doWork(db *sql.DB, work int64) { 29 // work here 30 } 31 32 func getWork(db *sql.DB) { 33 for { 34 // get work from the database here 35 var work sql.NullInt64 36 err := db.QueryRow("SELECT get_work()").Scan(&work) 37 if err != nil { 38 fmt.Println("call to get_work() failed: ", err) 39 time.Sleep(10 * time.Second) 40 continue 41 } 42 if !work.Valid { 43 // no more work to do 44 fmt.Println("ran out of work") 45 return 46 } 47 48 fmt.Println("starting work on ", work.Int64) 49 go doWork(db, work.Int64) 50 } 51 } 52 53 func waitForNotification(l *pq.Listener) { 54 select { 55 case <-l.Notify: 56 fmt.Println("received notification, new work available") 57 case <-time.After(90 * time.Second): 58 go l.Ping() 59 // Check if there's more work available, just in case it takes 60 // a while for the Listener to notice connection loss and 61 // reconnect. 62 fmt.Println("received no work for 90 seconds, checking for new work") 63 } 64 } 65 66 func main() { 67 var conninfo string = "" 68 69 db, err := sql.Open("postgres", conninfo) 70 if err != nil { 71 panic(err) 72 } 73 74 reportProblem := func(ev pq.ListenerEventType, err error) { 75 if err != nil { 76 fmt.Println(err.Error()) 77 } 78 } 79 80 minReconn := 10 * time.Second 81 maxReconn := time.Minute 82 listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem) 83 err = listener.Listen("getwork") 84 if err != nil { 85 panic(err) 86 } 87 88 fmt.Println("entering main loop") 89 for { 90 // process all available work before waiting for notifications 91 getWork(db) 92 waitForNotification(listener) 93 } 94 } 95 96 97 */ 98 package listen 99