1 /* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package testutils 19 20 import ( 21 "context" 22 ) 23 24 // DefaultChanBufferSize is the default buffer size of the underlying channel. 25 const DefaultChanBufferSize = 1 26 27 // Channel wraps a generic channel and provides a timed receive operation. 28 type Channel struct { 29 // C is the underlying channel on which values sent using the SendXxx() methods are delivered. 30 // Tests which cannot use ReceiveXxx() for whatever reasons can use C to read the values. 31 C chan any 32 } 33 34 // Send sends value on the underlying channel. 35 func (c *Channel) Send(value any) { 36 c.C <- value 37 } 38 39 // SendContext sends value on the underlying channel, or returns an error if 40 // the context expires. 41 func (c *Channel) SendContext(ctx context.Context, value any) error { 42 select { 43 case c.C <- value: 44 return nil 45 case <-ctx.Done(): 46 return ctx.Err() 47 } 48 } 49 50 // SendOrFail attempts to send value on the underlying channel. Returns true 51 // if successful or false if the channel was full. 52 func (c *Channel) SendOrFail(value any) bool { 53 select { 54 case c.C <- value: 55 return true 56 default: 57 return false 58 } 59 } 60 61 // ReceiveOrFail returns the value on the underlying channel and true, or nil 62 // and false if the channel was empty. 63 func (c *Channel) ReceiveOrFail() (any, bool) { 64 select { 65 case got := <-c.C: 66 return got, true 67 default: 68 return nil, false 69 } 70 } 71 72 // Receive returns the value received on the underlying channel, or the error 73 // returned by ctx if it is closed or cancelled. 74 func (c *Channel) Receive(ctx context.Context) (any, error) { 75 select { 76 case <-ctx.Done(): 77 return nil, ctx.Err() 78 case got := <-c.C: 79 return got, nil 80 } 81 } 82 83 // Replace clears the value on the underlying channel, and sends the new value. 84 // 85 // It's expected to be used with a size-1 channel, to only keep the most 86 // up-to-date item. This method is inherently racy when invoked concurrently 87 // from multiple goroutines. 88 func (c *Channel) Replace(value any) { 89 for { 90 select { 91 case c.C <- value: 92 return 93 case <-c.C: 94 } 95 } 96 } 97 98 // NewChannel returns a new Channel. 99 func NewChannel() *Channel { 100 return NewChannelWithSize(DefaultChanBufferSize) 101 } 102 103 // NewChannelWithSize returns a new Channel with a buffer of bufSize. 104 func NewChannelWithSize(bufSize int) *Channel { 105 return &Channel{C: make(chan any, bufSize)} 106 } 107