1
2
3
4
5
6
7 package operation
8
9 import (
10 "context"
11 "errors"
12 "time"
13
14 "go.mongodb.org/mongo-driver/bson/bsontype"
15 "go.mongodb.org/mongo-driver/event"
16 "go.mongodb.org/mongo-driver/internal/driverutil"
17 "go.mongodb.org/mongo-driver/mongo/description"
18 "go.mongodb.org/mongo-driver/mongo/readconcern"
19 "go.mongodb.org/mongo-driver/mongo/readpref"
20 "go.mongodb.org/mongo-driver/mongo/writeconcern"
21 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
22 "go.mongodb.org/mongo-driver/x/mongo/driver"
23 "go.mongodb.org/mongo-driver/x/mongo/driver/session"
24 )
25
26
27 type Aggregate struct {
28 allowDiskUse *bool
29 batchSize *int32
30 bypassDocumentValidation *bool
31 collation bsoncore.Document
32 comment *string
33 hint bsoncore.Value
34 maxTime *time.Duration
35 pipeline bsoncore.Document
36 session *session.Client
37 clock *session.ClusterClock
38 collection string
39 monitor *event.CommandMonitor
40 database string
41 deployment driver.Deployment
42 readConcern *readconcern.ReadConcern
43 readPreference *readpref.ReadPref
44 retry *driver.RetryMode
45 selector description.ServerSelector
46 writeConcern *writeconcern.WriteConcern
47 crypt driver.Crypt
48 serverAPI *driver.ServerAPIOptions
49 let bsoncore.Document
50 hasOutputStage bool
51 customOptions map[string]bsoncore.Value
52 timeout *time.Duration
53 omitCSOTMaxTimeMS bool
54
55 result driver.CursorResponse
56 }
57
58
59 func NewAggregate(pipeline bsoncore.Document) *Aggregate {
60 return &Aggregate{
61 pipeline: pipeline,
62 }
63 }
64
65
66 func (a *Aggregate) Result(opts driver.CursorOptions) (*driver.BatchCursor, error) {
67
68 clientSession := a.session
69
70 clock := a.clock
71 opts.ServerAPI = a.serverAPI
72 return driver.NewBatchCursor(a.result, clientSession, clock, opts)
73 }
74
75
76
77 func (a *Aggregate) ResultCursorResponse() driver.CursorResponse {
78 return a.result
79 }
80
81 func (a *Aggregate) processResponse(info driver.ResponseInfo) error {
82 var err error
83
84 a.result, err = driver.NewCursorResponse(info)
85 return err
86
87 }
88
89
90 func (a *Aggregate) Execute(ctx context.Context) error {
91 if a.deployment == nil {
92 return errors.New("the Aggregate operation must have a Deployment set before Execute can be called")
93 }
94
95 return driver.Operation{
96 CommandFn: a.command,
97 ProcessResponseFn: a.processResponse,
98
99 Client: a.session,
100 Clock: a.clock,
101 CommandMonitor: a.monitor,
102 Database: a.database,
103 Deployment: a.deployment,
104 ReadConcern: a.readConcern,
105 ReadPreference: a.readPreference,
106 Type: driver.Read,
107 RetryMode: a.retry,
108 Selector: a.selector,
109 WriteConcern: a.writeConcern,
110 Crypt: a.crypt,
111 MinimumWriteConcernWireVersion: 5,
112 ServerAPI: a.serverAPI,
113 IsOutputAggregate: a.hasOutputStage,
114 MaxTime: a.maxTime,
115 Timeout: a.timeout,
116 Name: driverutil.AggregateOp,
117 OmitCSOTMaxTimeMS: a.omitCSOTMaxTimeMS,
118 }.Execute(ctx)
119
120 }
121
122 func (a *Aggregate) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
123 header := bsoncore.Value{Type: bsontype.String, Data: bsoncore.AppendString(nil, a.collection)}
124 if a.collection == "" {
125 header = bsoncore.Value{Type: bsontype.Int32, Data: []byte{0x01, 0x00, 0x00, 0x00}}
126 }
127 dst = bsoncore.AppendValueElement(dst, "aggregate", header)
128
129 cursorIdx, cursorDoc := bsoncore.AppendDocumentStart(nil)
130 if a.allowDiskUse != nil {
131
132 dst = bsoncore.AppendBooleanElement(dst, "allowDiskUse", *a.allowDiskUse)
133 }
134 if a.batchSize != nil {
135 cursorDoc = bsoncore.AppendInt32Element(cursorDoc, "batchSize", *a.batchSize)
136 }
137 if a.bypassDocumentValidation != nil {
138
139 dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *a.bypassDocumentValidation)
140 }
141 if a.collation != nil {
142
143 if desc.WireVersion == nil || !desc.WireVersion.Includes(5) {
144 return nil, errors.New("the 'collation' command parameter requires a minimum server wire version of 5")
145 }
146 dst = bsoncore.AppendDocumentElement(dst, "collation", a.collation)
147 }
148 if a.comment != nil {
149
150 dst = bsoncore.AppendStringElement(dst, "comment", *a.comment)
151 }
152 if a.hint.Type != bsontype.Type(0) {
153
154 dst = bsoncore.AppendValueElement(dst, "hint", a.hint)
155 }
156 if a.pipeline != nil {
157
158 dst = bsoncore.AppendArrayElement(dst, "pipeline", a.pipeline)
159 }
160 if a.let != nil {
161 dst = bsoncore.AppendDocumentElement(dst, "let", a.let)
162 }
163 for optionName, optionValue := range a.customOptions {
164 dst = bsoncore.AppendValueElement(dst, optionName, optionValue)
165 }
166 cursorDoc, _ = bsoncore.AppendDocumentEnd(cursorDoc, cursorIdx)
167 dst = bsoncore.AppendDocumentElement(dst, "cursor", cursorDoc)
168
169 return dst, nil
170 }
171
172
173 func (a *Aggregate) AllowDiskUse(allowDiskUse bool) *Aggregate {
174 if a == nil {
175 a = new(Aggregate)
176 }
177
178 a.allowDiskUse = &allowDiskUse
179 return a
180 }
181
182
183 func (a *Aggregate) BatchSize(batchSize int32) *Aggregate {
184 if a == nil {
185 a = new(Aggregate)
186 }
187
188 a.batchSize = &batchSize
189 return a
190 }
191
192
193 func (a *Aggregate) BypassDocumentValidation(bypassDocumentValidation bool) *Aggregate {
194 if a == nil {
195 a = new(Aggregate)
196 }
197
198 a.bypassDocumentValidation = &bypassDocumentValidation
199 return a
200 }
201
202
203 func (a *Aggregate) Collation(collation bsoncore.Document) *Aggregate {
204 if a == nil {
205 a = new(Aggregate)
206 }
207
208 a.collation = collation
209 return a
210 }
211
212
213 func (a *Aggregate) Comment(comment string) *Aggregate {
214 if a == nil {
215 a = new(Aggregate)
216 }
217
218 a.comment = &comment
219 return a
220 }
221
222
223 func (a *Aggregate) Hint(hint bsoncore.Value) *Aggregate {
224 if a == nil {
225 a = new(Aggregate)
226 }
227
228 a.hint = hint
229 return a
230 }
231
232
233 func (a *Aggregate) MaxTime(maxTime *time.Duration) *Aggregate {
234 if a == nil {
235 a = new(Aggregate)
236 }
237
238 a.maxTime = maxTime
239 return a
240 }
241
242
243 func (a *Aggregate) Pipeline(pipeline bsoncore.Document) *Aggregate {
244 if a == nil {
245 a = new(Aggregate)
246 }
247
248 a.pipeline = pipeline
249 return a
250 }
251
252
253 func (a *Aggregate) Session(session *session.Client) *Aggregate {
254 if a == nil {
255 a = new(Aggregate)
256 }
257
258 a.session = session
259 return a
260 }
261
262
263 func (a *Aggregate) ClusterClock(clock *session.ClusterClock) *Aggregate {
264 if a == nil {
265 a = new(Aggregate)
266 }
267
268 a.clock = clock
269 return a
270 }
271
272
273 func (a *Aggregate) Collection(collection string) *Aggregate {
274 if a == nil {
275 a = new(Aggregate)
276 }
277
278 a.collection = collection
279 return a
280 }
281
282
283 func (a *Aggregate) CommandMonitor(monitor *event.CommandMonitor) *Aggregate {
284 if a == nil {
285 a = new(Aggregate)
286 }
287
288 a.monitor = monitor
289 return a
290 }
291
292
293 func (a *Aggregate) Database(database string) *Aggregate {
294 if a == nil {
295 a = new(Aggregate)
296 }
297
298 a.database = database
299 return a
300 }
301
302
303 func (a *Aggregate) Deployment(deployment driver.Deployment) *Aggregate {
304 if a == nil {
305 a = new(Aggregate)
306 }
307
308 a.deployment = deployment
309 return a
310 }
311
312
313 func (a *Aggregate) ReadConcern(readConcern *readconcern.ReadConcern) *Aggregate {
314 if a == nil {
315 a = new(Aggregate)
316 }
317
318 a.readConcern = readConcern
319 return a
320 }
321
322
323 func (a *Aggregate) ReadPreference(readPreference *readpref.ReadPref) *Aggregate {
324 if a == nil {
325 a = new(Aggregate)
326 }
327
328 a.readPreference = readPreference
329 return a
330 }
331
332
333 func (a *Aggregate) ServerSelector(selector description.ServerSelector) *Aggregate {
334 if a == nil {
335 a = new(Aggregate)
336 }
337
338 a.selector = selector
339 return a
340 }
341
342
343 func (a *Aggregate) WriteConcern(writeConcern *writeconcern.WriteConcern) *Aggregate {
344 if a == nil {
345 a = new(Aggregate)
346 }
347
348 a.writeConcern = writeConcern
349 return a
350 }
351
352
353
354
355 func (a *Aggregate) Retry(retry driver.RetryMode) *Aggregate {
356 if a == nil {
357 a = new(Aggregate)
358 }
359
360 a.retry = &retry
361 return a
362 }
363
364
365 func (a *Aggregate) Crypt(crypt driver.Crypt) *Aggregate {
366 if a == nil {
367 a = new(Aggregate)
368 }
369
370 a.crypt = crypt
371 return a
372 }
373
374
375 func (a *Aggregate) ServerAPI(serverAPI *driver.ServerAPIOptions) *Aggregate {
376 if a == nil {
377 a = new(Aggregate)
378 }
379
380 a.serverAPI = serverAPI
381 return a
382 }
383
384
385 func (a *Aggregate) Let(let bsoncore.Document) *Aggregate {
386 if a == nil {
387 a = new(Aggregate)
388 }
389
390 a.let = let
391 return a
392 }
393
394
395
396 func (a *Aggregate) HasOutputStage(hos bool) *Aggregate {
397 if a == nil {
398 a = new(Aggregate)
399 }
400
401 a.hasOutputStage = hos
402 return a
403 }
404
405
406 func (a *Aggregate) CustomOptions(co map[string]bsoncore.Value) *Aggregate {
407 if a == nil {
408 a = new(Aggregate)
409 }
410
411 a.customOptions = co
412 return a
413 }
414
415
416 func (a *Aggregate) Timeout(timeout *time.Duration) *Aggregate {
417 if a == nil {
418 a = new(Aggregate)
419 }
420
421 a.timeout = timeout
422 return a
423 }
424
425
426
427
428 func (a *Aggregate) OmitCSOTMaxTimeMS(omit bool) *Aggregate {
429 if a == nil {
430 a = new(Aggregate)
431 }
432
433 a.omitCSOTMaxTimeMS = omit
434 return a
435 }
436
View as plain text