...

Source file src/github.com/99designs/gqlgen/graphql/handler/transport/sse.go

Documentation: github.com/99designs/gqlgen/graphql/handler/transport

     1  package transport
     2  
     3  import (
     4  	"encoding/json"
     5  	"fmt"
     6  	"io"
     7  	"log"
     8  	"mime"
     9  	"net/http"
    10  	"strings"
    11  
    12  	"github.com/vektah/gqlparser/v2/gqlerror"
    13  
    14  	"github.com/99designs/gqlgen/graphql"
    15  )
    16  
    17  type SSE struct{}
    18  
    19  var _ graphql.Transport = SSE{}
    20  
    21  func (t SSE) Supports(r *http.Request) bool {
    22  	if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") {
    23  		return false
    24  	}
    25  	mediaType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
    26  	if err != nil {
    27  		return false
    28  	}
    29  	return r.Method == http.MethodPost && mediaType == "application/json"
    30  }
    31  
    32  func (t SSE) Do(w http.ResponseWriter, r *http.Request, exec graphql.GraphExecutor) {
    33  	ctx := r.Context()
    34  	flusher, ok := w.(http.Flusher)
    35  	if !ok {
    36  		SendErrorf(w, http.StatusInternalServerError, "streaming unsupported")
    37  		return
    38  	}
    39  	defer flusher.Flush()
    40  
    41  	w.Header().Set("Cache-Control", "no-cache")
    42  	w.Header().Set("Connection", "keep-alive")
    43  	w.Header().Set("Content-Type", "application/json")
    44  
    45  	params := &graphql.RawParams{}
    46  	start := graphql.Now()
    47  	params.Headers = r.Header
    48  	params.ReadTime = graphql.TraceTiming{
    49  		Start: start,
    50  		End:   graphql.Now(),
    51  	}
    52  
    53  	bodyString, err := getRequestBody(r)
    54  	if err != nil {
    55  		gqlErr := gqlerror.Errorf("could not get json request body: %+v", err)
    56  		resp := exec.DispatchError(ctx, gqlerror.List{gqlErr})
    57  		log.Printf("could not get json request body: %+v", err.Error())
    58  		writeJson(w, resp)
    59  		return
    60  	}
    61  
    62  	bodyReader := io.NopCloser(strings.NewReader(bodyString))
    63  	if err = jsonDecode(bodyReader, &params); err != nil {
    64  		w.WriteHeader(http.StatusBadRequest)
    65  		gqlErr := gqlerror.Errorf(
    66  			"json request body could not be decoded: %+v body:%s",
    67  			err,
    68  			bodyString,
    69  		)
    70  		resp := exec.DispatchError(ctx, gqlerror.List{gqlErr})
    71  		log.Printf("decoding error: %+v body:%s", err.Error(), bodyString)
    72  		writeJson(w, resp)
    73  		return
    74  	}
    75  
    76  	rc, opErr := exec.CreateOperationContext(ctx, params)
    77  	ctx = graphql.WithOperationContext(ctx, rc)
    78  
    79  	w.Header().Set("Content-Type", "text/event-stream")
    80  	fmt.Fprint(w, ":\n\n")
    81  	flusher.Flush()
    82  
    83  	if opErr != nil {
    84  		resp := exec.DispatchError(ctx, opErr)
    85  		writeJsonWithSSE(w, resp)
    86  	} else {
    87  		responses, ctx := exec.DispatchOperation(ctx, rc)
    88  		for {
    89  			response := responses(ctx)
    90  			if response == nil {
    91  				break
    92  			}
    93  			writeJsonWithSSE(w, response)
    94  			flusher.Flush()
    95  		}
    96  	}
    97  
    98  	fmt.Fprint(w, "event: complete\n\n")
    99  }
   100  
   101  func writeJsonWithSSE(w io.Writer, response *graphql.Response) {
   102  	b, err := json.Marshal(response)
   103  	if err != nil {
   104  		panic(err)
   105  	}
   106  	fmt.Fprintf(w, "event: next\ndata: %s\n\n", b)
   107  }
   108  

View as plain text