...
1 package transport
2
3 import (
4 "encoding/json"
5 "fmt"
6 "io"
7 "log"
8 "mime"
9 "net/http"
10 "strings"
11
12 "github.com/vektah/gqlparser/v2/gqlerror"
13
14 "github.com/99designs/gqlgen/graphql"
15 )
16
17 type SSE struct{}
18
19 var _ graphql.Transport = SSE{}
20
21 func (t SSE) Supports(r *http.Request) bool {
22 if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") {
23 return false
24 }
25 mediaType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
26 if err != nil {
27 return false
28 }
29 return r.Method == http.MethodPost && mediaType == "application/json"
30 }
31
32 func (t SSE) Do(w http.ResponseWriter, r *http.Request, exec graphql.GraphExecutor) {
33 ctx := r.Context()
34 flusher, ok := w.(http.Flusher)
35 if !ok {
36 SendErrorf(w, http.StatusInternalServerError, "streaming unsupported")
37 return
38 }
39 defer flusher.Flush()
40
41 w.Header().Set("Cache-Control", "no-cache")
42 w.Header().Set("Connection", "keep-alive")
43 w.Header().Set("Content-Type", "application/json")
44
45 params := &graphql.RawParams{}
46 start := graphql.Now()
47 params.Headers = r.Header
48 params.ReadTime = graphql.TraceTiming{
49 Start: start,
50 End: graphql.Now(),
51 }
52
53 bodyString, err := getRequestBody(r)
54 if err != nil {
55 gqlErr := gqlerror.Errorf("could not get json request body: %+v", err)
56 resp := exec.DispatchError(ctx, gqlerror.List{gqlErr})
57 log.Printf("could not get json request body: %+v", err.Error())
58 writeJson(w, resp)
59 return
60 }
61
62 bodyReader := io.NopCloser(strings.NewReader(bodyString))
63 if err = jsonDecode(bodyReader, ¶ms); err != nil {
64 w.WriteHeader(http.StatusBadRequest)
65 gqlErr := gqlerror.Errorf(
66 "json request body could not be decoded: %+v body:%s",
67 err,
68 bodyString,
69 )
70 resp := exec.DispatchError(ctx, gqlerror.List{gqlErr})
71 log.Printf("decoding error: %+v body:%s", err.Error(), bodyString)
72 writeJson(w, resp)
73 return
74 }
75
76 rc, opErr := exec.CreateOperationContext(ctx, params)
77 ctx = graphql.WithOperationContext(ctx, rc)
78
79 w.Header().Set("Content-Type", "text/event-stream")
80 fmt.Fprint(w, ":\n\n")
81 flusher.Flush()
82
83 if opErr != nil {
84 resp := exec.DispatchError(ctx, opErr)
85 writeJsonWithSSE(w, resp)
86 } else {
87 responses, ctx := exec.DispatchOperation(ctx, rc)
88 for {
89 response := responses(ctx)
90 if response == nil {
91 break
92 }
93 writeJsonWithSSE(w, response)
94 flusher.Flush()
95 }
96 }
97
98 fmt.Fprint(w, "event: complete\n\n")
99 }
100
101 func writeJsonWithSSE(w io.Writer, response *graphql.Response) {
102 b, err := json.Marshal(response)
103 if err != nil {
104 panic(err)
105 }
106 fmt.Fprintf(w, "event: next\ndata: %s\n\n", b)
107 }
108
View as plain text