...

Source file src/github.com/containerd/ttrpc/stream_test.go

Documentation: github.com/containerd/ttrpc

     1  /*
     2     Copyright The containerd Authors.
     3  
     4     Licensed under the Apache License, Version 2.0 (the "License");
     5     you may not use this file except in compliance with the License.
     6     You may obtain a copy of the License at
     7  
     8         http://www.apache.org/licenses/LICENSE-2.0
     9  
    10     Unless required by applicable law or agreed to in writing, software
    11     distributed under the License is distributed on an "AS IS" BASIS,
    12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13     See the License for the specific language governing permissions and
    14     limitations under the License.
    15  */
    16  
    17  package ttrpc
    18  
    19  import (
    20  	"context"
    21  	"io"
    22  	"testing"
    23  
    24  	"github.com/containerd/ttrpc/internal"
    25  )
    26  
    27  func TestStreamClient(t *testing.T) {
    28  	var (
    29  		ctx             = context.Background()
    30  		server          = mustServer(t)(NewServer())
    31  		addr, listener  = newTestListener(t)
    32  		client, cleanup = newTestClient(t, addr)
    33  		serviceName     = "streamService"
    34  	)
    35  
    36  	defer listener.Close()
    37  	defer cleanup()
    38  
    39  	desc := &ServiceDesc{
    40  		Methods: map[string]Method{
    41  			"Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
    42  				var req internal.EchoPayload
    43  				if err := unmarshal(&req); err != nil {
    44  					return nil, err
    45  				}
    46  				req.Seq++
    47  				return &req, nil
    48  			},
    49  		},
    50  		Streams: map[string]Stream{
    51  			"EchoStream": {
    52  				Handler: func(ctx context.Context, ss StreamServer) (interface{}, error) {
    53  					for {
    54  						var req internal.EchoPayload
    55  						if err := ss.RecvMsg(&req); err != nil {
    56  							if err == io.EOF {
    57  								err = nil
    58  							}
    59  							return nil, err
    60  						}
    61  						req.Seq++
    62  						if err := ss.SendMsg(&req); err != nil {
    63  							return nil, err
    64  						}
    65  					}
    66  
    67  				},
    68  				StreamingClient: true,
    69  				StreamingServer: true,
    70  			},
    71  		},
    72  	}
    73  	server.RegisterService(serviceName, desc)
    74  
    75  	go server.Serve(ctx, listener)
    76  	defer server.Shutdown(ctx)
    77  
    78  	//func (c *Client) NewStream(ctx context.Context, desc *StreamDesc, service, method string) (ClientStream, error) {
    79  	var req, resp internal.EchoPayload
    80  	if err := client.Call(ctx, serviceName, "Echo", &req, &resp); err != nil {
    81  		t.Fatal(err)
    82  	}
    83  
    84  	stream, err := client.NewStream(ctx, &StreamDesc{true, true}, serviceName, "EchoStream", nil)
    85  	if err != nil {
    86  		t.Fatal(err)
    87  	}
    88  	for i := 1; i <= 100; i++ {
    89  		req := internal.EchoPayload{
    90  			Seq: int64(i),
    91  			Msg: "should be returned",
    92  		}
    93  		if err := stream.SendMsg(&req); err != nil {
    94  			t.Fatalf("%d: %v", i, err)
    95  		}
    96  		var resp internal.EchoPayload
    97  		if err := stream.RecvMsg(&resp); err != nil {
    98  			t.Fatalf("%d: %v", i, err)
    99  		}
   100  		if resp.Seq != int64(i)+1 {
   101  			t.Fatalf("%d: unexpected sequence value: %d, expected %d", i, resp.Seq, i+1)
   102  		}
   103  		if resp.Msg != req.Msg {
   104  			t.Fatalf("%d: unexpected message: %q, expected %q", i, resp.Msg, req.Msg)
   105  		}
   106  	}
   107  	if err := stream.CloseSend(); err != nil {
   108  		t.Fatal(err)
   109  	}
   110  
   111  	err = stream.RecvMsg(&resp)
   112  	if err == nil {
   113  		t.Fatal("expected io.EOF after close send")
   114  	}
   115  	if err != io.EOF {
   116  		t.Fatalf("expected io.EOF after close send, got %v", err)
   117  	}
   118  }
   119  

View as plain text