...
1
2
3 package diode
4
5 import (
6 "context"
7 "io"
8 "sync"
9 "time"
10
11 "github.com/rs/zerolog/diode/internal/diodes"
12 )
13
14 var bufPool = &sync.Pool{
15 New: func() interface{} {
16 return make([]byte, 0, 500)
17 },
18 }
19
20 type Alerter func(missed int)
21
22 type diodeFetcher interface {
23 diodes.Diode
24 Next() diodes.GenericDataType
25 }
26
27
28
29 type Writer struct {
30 w io.Writer
31 d diodeFetcher
32 c context.CancelFunc
33 done chan struct{}
34 }
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 func NewWriter(w io.Writer, size int, pollInterval time.Duration, f Alerter) Writer {
52 ctx, cancel := context.WithCancel(context.Background())
53 dw := Writer{
54 w: w,
55 c: cancel,
56 done: make(chan struct{}),
57 }
58 if f == nil {
59 f = func(int) {}
60 }
61 d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
62 if pollInterval > 0 {
63 dw.d = diodes.NewPoller(d,
64 diodes.WithPollingInterval(pollInterval),
65 diodes.WithPollingContext(ctx))
66 } else {
67 dw.d = diodes.NewWaiter(d,
68 diodes.WithWaiterContext(ctx))
69 }
70 go dw.poll()
71 return dw
72 }
73
74 func (dw Writer) Write(p []byte) (n int, err error) {
75
76
77 p = append(bufPool.Get().([]byte), p...)
78 dw.d.Set(diodes.GenericDataType(&p))
79 return len(p), nil
80 }
81
82
83
84 func (dw Writer) Close() error {
85 dw.c()
86 <-dw.done
87 if w, ok := dw.w.(io.Closer); ok {
88 return w.Close()
89 }
90 return nil
91 }
92
93 func (dw Writer) poll() {
94 defer close(dw.done)
95 for {
96 d := dw.d.Next()
97 if d == nil {
98 return
99 }
100 p := *(*[]byte)(d)
101 dw.w.Write(p)
102
103
104
105
106
107
108
109 const maxSize = 1 << 16
110 if cap(p) <= maxSize {
111 bufPool.Put(p[:0])
112 }
113 }
114 }
115
View as plain text