...

Source file src/github.com/containerd/ttrpc/channel_test.go

Documentation: github.com/containerd/ttrpc

     1  /*
     2     Copyright The containerd Authors.
     3  
     4     Licensed under the Apache License, Version 2.0 (the "License");
     5     you may not use this file except in compliance with the License.
     6     You may obtain a copy of the License at
     7  
     8         http://www.apache.org/licenses/LICENSE-2.0
     9  
    10     Unless required by applicable law or agreed to in writing, software
    11     distributed under the License is distributed on an "AS IS" BASIS,
    12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13     See the License for the specific language governing permissions and
    14     limitations under the License.
    15  */
    16  
    17  package ttrpc
    18  
    19  import (
    20  	"bytes"
    21  	"errors"
    22  	"io"
    23  	"net"
    24  	"reflect"
    25  	"testing"
    26  
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/status"
    29  )
    30  
    31  func TestReadWriteMessage(t *testing.T) {
    32  	var (
    33  		w, r     = net.Pipe()
    34  		ch       = newChannel(w)
    35  		rch      = newChannel(r)
    36  		messages = [][]byte{
    37  			[]byte("hello"),
    38  			[]byte("this is a test"),
    39  			[]byte("of message framing"),
    40  		}
    41  		received [][]byte
    42  		errs     = make(chan error, 1)
    43  	)
    44  
    45  	go func() {
    46  		for i, msg := range messages {
    47  			if err := ch.send(uint32(i), 1, 0, msg); err != nil {
    48  				errs <- err
    49  				return
    50  			}
    51  		}
    52  
    53  		w.Close()
    54  	}()
    55  
    56  	for {
    57  		_, p, err := rch.recv()
    58  		if err != nil {
    59  			if !errors.Is(err, io.EOF) {
    60  				t.Fatal(err)
    61  			}
    62  
    63  			break
    64  		}
    65  		received = append(received, p)
    66  
    67  		// make sure we don't have send errors
    68  		select {
    69  		case err := <-errs:
    70  			if err != nil {
    71  				t.Fatal(err)
    72  			}
    73  		default:
    74  		}
    75  	}
    76  
    77  	if !reflect.DeepEqual(received, messages) {
    78  		t.Fatalf("didn't received expected set of messages: %v != %v", received, messages)
    79  	}
    80  
    81  	select {
    82  	case err := <-errs:
    83  		if err != nil {
    84  			t.Fatal(err)
    85  		}
    86  	default:
    87  	}
    88  }
    89  
    90  func TestMessageOversize(t *testing.T) {
    91  	var (
    92  		w, r     = net.Pipe()
    93  		wch, rch = newChannel(w), newChannel(r)
    94  		msg      = bytes.Repeat([]byte("a message of massive length"), 512<<10)
    95  		errs     = make(chan error, 1)
    96  	)
    97  
    98  	go func() {
    99  		if err := wch.send(1, 1, 0, msg); err != nil {
   100  			errs <- err
   101  		}
   102  	}()
   103  
   104  	_, _, err := rch.recv()
   105  	if err == nil {
   106  		t.Fatalf("error expected reading with small buffer")
   107  	}
   108  
   109  	status, ok := status.FromError(err)
   110  	if !ok {
   111  		t.Fatalf("expected grpc status error: %v", err)
   112  	}
   113  
   114  	if status.Code() != codes.ResourceExhausted {
   115  		t.Fatalf("expected grpc status code: %v != %v", status.Code(), codes.ResourceExhausted)
   116  	}
   117  
   118  	select {
   119  	case err := <-errs:
   120  		if err != nil {
   121  			t.Fatal(err)
   122  		}
   123  	default:
   124  	}
   125  }
   126  

View as plain text