...
1
2
3
4
5
6
7
8
9
10
11
12
13 package kivik
14
15 import (
16 "context"
17 "io"
18 "net/http"
19 "sync"
20
21 "github.com/go-kivik/kivik/v4/driver"
22 internal "github.com/go-kivik/kivik/v4/int/errors"
23 )
24
25 type iterator interface {
26 Next(interface{}) error
27 Close() error
28 }
29
30
31 const (
32
33
34 stateReady = iota
35
36 stateResultSetReady
37
38
39 stateResultSetRowReady
40
41
42 stateEOQ
43
44
45 stateRowReady
46
47
48 stateClosed
49 )
50
51 type iter struct {
52 feed iterator
53 onClose func()
54
55 mu sync.Mutex
56 state int
57 err error
58 wg sync.WaitGroup
59
60 cancel func()
61
62 curVal interface{}
63 }
64
65
66
67 func (i *iter) isReady() error {
68 if i.state == stateClosed {
69 return &internal.Error{Status: http.StatusBadRequest, Message: "kivik: Iterator is closed"}
70 }
71 if !stateIsReady(i.state) {
72 return &internal.Error{Status: http.StatusBadRequest, Message: "kivik: Iterator access before calling Next"}
73 }
74 return nil
75 }
76
77 func stateIsReady(state int) bool {
78 switch state {
79 case stateRowReady, stateResultSetReady, stateResultSetRowReady, stateClosed:
80 return true
81 }
82 return false
83 }
84
85
86
87
88
89
90 func newIterator(ctx context.Context, onClose func(), feed iterator, zeroValue interface{}) *iter {
91 i := &iter{
92 onClose: onClose,
93 feed: feed,
94 curVal: zeroValue,
95 }
96 ctx, i.cancel = context.WithCancel(ctx)
97 go i.awaitDone(ctx)
98 return i
99 }
100
101
102
103 func errIterator(err error) *iter {
104 return &iter{
105 state: stateClosed,
106 err: err,
107 }
108 }
109
110
111
112 func (i *iter) awaitDone(ctx context.Context) {
113 <-ctx.Done()
114 i.mu.Lock()
115 _ = i.closeErr(ctx.Err())
116 i.mu.Unlock()
117 }
118
119
120
121
122 func (i *iter) Next() bool {
123 i.mu.Lock()
124 defer i.mu.Unlock()
125 return i.next()
126 }
127
128
129 func (i *iter) next() bool {
130 if i.state == stateClosed {
131 return false
132 }
133 for {
134 err := i.feed.Next(i.curVal)
135 if err == driver.EOQ {
136 if i.state == stateResultSetReady || i.state == stateResultSetRowReady {
137 i.state = stateEOQ
138 i.err = nil
139 return false
140 }
141 continue
142 }
143 switch i.state {
144 case stateResultSetReady, stateResultSetRowReady:
145 i.state = stateResultSetRowReady
146 default:
147 i.state = stateRowReady
148 }
149 i.err = err
150 if i.err != nil {
151 _ = i.closeErr(nil)
152 return false
153 }
154 return true
155 }
156 }
157
158
159
160
161
162
163 func (i *iter) Close() error {
164 i.mu.Lock()
165 defer i.mu.Unlock()
166 i.wg.Wait()
167 return i.closeErr(nil)
168 }
169
170 func (i *iter) closeErr(err error) error {
171 if i.state == stateClosed {
172 return nil
173 }
174 i.state = stateClosed
175
176 if i.err == nil {
177 i.err = err
178 }
179
180 err = i.feed.Close()
181
182 if i.cancel != nil {
183 i.cancel()
184 }
185
186 if i.onClose != nil {
187 i.onClose()
188 }
189
190 return err
191 }
192
193
194
195 func (i *iter) Err() error {
196 i.mu.Lock()
197 defer i.mu.Unlock()
198 if i.err == io.EOF {
199 return nil
200 }
201 return i.err
202 }
203
View as plain text