...
1
16
17 package ttrpc
18
19 import (
20 "context"
21 "sync"
22 )
23
24 type streamID uint32
25
26 type streamMessage struct {
27 header messageHeader
28 payload []byte
29 }
30
31 type stream struct {
32 id streamID
33 sender sender
34 recv chan *streamMessage
35
36 closeOnce sync.Once
37 recvErr error
38 recvClose chan struct{}
39 }
40
41 func newStream(id streamID, send sender) *stream {
42 return &stream{
43 id: id,
44 sender: send,
45 recv: make(chan *streamMessage, 1),
46 recvClose: make(chan struct{}),
47 }
48 }
49
50 func (s *stream) closeWithError(err error) error {
51 s.closeOnce.Do(func() {
52 if err != nil {
53 s.recvErr = err
54 } else {
55 s.recvErr = ErrClosed
56 }
57 close(s.recvClose)
58 })
59 return nil
60 }
61
62 func (s *stream) send(mt messageType, flags uint8, b []byte) error {
63 return s.sender.send(uint32(s.id), mt, flags, b)
64 }
65
66 func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
67 select {
68 case <-s.recvClose:
69 return s.recvErr
70 default:
71 }
72 select {
73 case <-s.recvClose:
74 return s.recvErr
75 case s.recv <- msg:
76 return nil
77 case <-ctx.Done():
78 return ctx.Err()
79 }
80 }
81
82 type sender interface {
83 send(uint32, messageType, uint8, []byte) error
84 }
85
View as plain text