...
1 package syncs
2
3 import (
4 "context"
5 "reflect"
6 "sync"
7 )
8
9
10
11 func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
12 var once sync.Once
13 out := make(chan T)
14 cancel := make(chan T)
15 cancelFunc := func() {
16 once.Do(func() {
17 close(cancel)
18 })
19 <-out
20 }
21 cases := make([]reflect.SelectCase, len(chans)+1)
22 for i := range chans {
23 cases[i] = reflect.SelectCase{
24 Dir: reflect.SelectRecv,
25 Chan: reflect.ValueOf(chans[i]),
26 }
27 }
28 cases[len(cases)-1] = reflect.SelectCase{
29 Dir: reflect.SelectRecv,
30 Chan: reflect.ValueOf(cancel),
31 }
32 go func() {
33 defer close(out)
34 _, _, _ = reflect.Select(cases)
35 }()
36
37 return out, cancelFunc
38 }
39
View as plain text