1
2
3
4
5
6
7 package operation
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "time"
14
15 "go.mongodb.org/mongo-driver/bson"
16 "go.mongodb.org/mongo-driver/bson/bsontype"
17 "go.mongodb.org/mongo-driver/event"
18 "go.mongodb.org/mongo-driver/internal/driverutil"
19 "go.mongodb.org/mongo-driver/internal/logger"
20 "go.mongodb.org/mongo-driver/mongo/description"
21 "go.mongodb.org/mongo-driver/mongo/writeconcern"
22 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
23 "go.mongodb.org/mongo-driver/x/mongo/driver"
24 "go.mongodb.org/mongo-driver/x/mongo/driver/session"
25 )
26
27
28 type Update struct {
29 bypassDocumentValidation *bool
30 comment bsoncore.Value
31 ordered *bool
32 updates []bsoncore.Document
33 session *session.Client
34 clock *session.ClusterClock
35 collection string
36 monitor *event.CommandMonitor
37 database string
38 deployment driver.Deployment
39 hint *bool
40 arrayFilters *bool
41 selector description.ServerSelector
42 writeConcern *writeconcern.WriteConcern
43 retry *driver.RetryMode
44 result UpdateResult
45 crypt driver.Crypt
46 serverAPI *driver.ServerAPIOptions
47 let bsoncore.Document
48 timeout *time.Duration
49 logger *logger.Logger
50 }
51
52
53 type Upsert struct {
54 Index int64
55 ID interface{} `bson:"_id"`
56 }
57
58
59 type UpdateResult struct {
60
61 N int64
62
63 NModified int64
64
65 Upserted []Upsert
66 }
67
68 func buildUpdateResult(response bsoncore.Document) (UpdateResult, error) {
69 elements, err := response.Elements()
70 if err != nil {
71 return UpdateResult{}, err
72 }
73 ur := UpdateResult{}
74 for _, element := range elements {
75 switch element.Key() {
76 case "nModified":
77 var ok bool
78 ur.NModified, ok = element.Value().AsInt64OK()
79 if !ok {
80 return ur, fmt.Errorf("response field 'nModified' is type int32 or int64, but received BSON type %s", element.Value().Type)
81 }
82 case "n":
83 var ok bool
84 ur.N, ok = element.Value().AsInt64OK()
85 if !ok {
86 return ur, fmt.Errorf("response field 'n' is type int32 or int64, but received BSON type %s", element.Value().Type)
87 }
88 case "upserted":
89 arr, ok := element.Value().ArrayOK()
90 if !ok {
91 return ur, fmt.Errorf("response field 'upserted' is type array, but received BSON type %s", element.Value().Type)
92 }
93
94 var values []bsoncore.Value
95 values, err = arr.Values()
96 if err != nil {
97 break
98 }
99
100 for _, val := range values {
101 valDoc, ok := val.DocumentOK()
102 if !ok {
103 return ur, fmt.Errorf("upserted value is type document, but received BSON type %s", val.Type)
104 }
105 var upsert Upsert
106 if err = bson.Unmarshal(valDoc, &upsert); err != nil {
107 return ur, err
108 }
109 ur.Upserted = append(ur.Upserted, upsert)
110 }
111 }
112 }
113 return ur, nil
114 }
115
116
117 func NewUpdate(updates ...bsoncore.Document) *Update {
118 return &Update{
119 updates: updates,
120 }
121 }
122
123
124 func (u *Update) Result() UpdateResult { return u.result }
125
126 func (u *Update) processResponse(info driver.ResponseInfo) error {
127 ur, err := buildUpdateResult(info.ServerResponse)
128
129 u.result.N += ur.N
130 u.result.NModified += ur.NModified
131 if info.CurrentIndex > 0 {
132 for ind := range ur.Upserted {
133 ur.Upserted[ind].Index += int64(info.CurrentIndex)
134 }
135 }
136 u.result.Upserted = append(u.result.Upserted, ur.Upserted...)
137 return err
138
139 }
140
141
142 func (u *Update) Execute(ctx context.Context) error {
143 if u.deployment == nil {
144 return errors.New("the Update operation must have a Deployment set before Execute can be called")
145 }
146 batches := &driver.Batches{
147 Identifier: "updates",
148 Documents: u.updates,
149 Ordered: u.ordered,
150 }
151
152 return driver.Operation{
153 CommandFn: u.command,
154 ProcessResponseFn: u.processResponse,
155 Batches: batches,
156 RetryMode: u.retry,
157 Type: driver.Write,
158 Client: u.session,
159 Clock: u.clock,
160 CommandMonitor: u.monitor,
161 Database: u.database,
162 Deployment: u.deployment,
163 Selector: u.selector,
164 WriteConcern: u.writeConcern,
165 Crypt: u.crypt,
166 ServerAPI: u.serverAPI,
167 Timeout: u.timeout,
168 Logger: u.logger,
169 Name: driverutil.UpdateOp,
170 }.Execute(ctx)
171
172 }
173
174 func (u *Update) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
175 dst = bsoncore.AppendStringElement(dst, "update", u.collection)
176 if u.bypassDocumentValidation != nil &&
177 (desc.WireVersion != nil && desc.WireVersion.Includes(4)) {
178
179 dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *u.bypassDocumentValidation)
180 }
181 if u.comment.Type != bsontype.Type(0) {
182 dst = bsoncore.AppendValueElement(dst, "comment", u.comment)
183 }
184 if u.ordered != nil {
185
186 dst = bsoncore.AppendBooleanElement(dst, "ordered", *u.ordered)
187 }
188 if u.hint != nil && *u.hint {
189
190 if desc.WireVersion == nil || !desc.WireVersion.Includes(5) {
191 return nil, errors.New("the 'hint' command parameter requires a minimum server wire version of 5")
192 }
193 if !u.writeConcern.Acknowledged() {
194 return nil, errUnacknowledgedHint
195 }
196 }
197 if u.arrayFilters != nil && *u.arrayFilters {
198 if desc.WireVersion == nil || !desc.WireVersion.Includes(6) {
199 return nil, errors.New("the 'arrayFilters' command parameter requires a minimum server wire version of 6")
200 }
201 }
202 if u.let != nil {
203 dst = bsoncore.AppendDocumentElement(dst, "let", u.let)
204 }
205
206 return dst, nil
207 }
208
209
210
211 func (u *Update) BypassDocumentValidation(bypassDocumentValidation bool) *Update {
212 if u == nil {
213 u = new(Update)
214 }
215
216 u.bypassDocumentValidation = &bypassDocumentValidation
217 return u
218 }
219
220
221
222
223 func (u *Update) Hint(hint bool) *Update {
224 if u == nil {
225 u = new(Update)
226 }
227
228 u.hint = &hint
229 return u
230 }
231
232
233
234 func (u *Update) ArrayFilters(arrayFilters bool) *Update {
235 if u == nil {
236 u = new(Update)
237 }
238
239 u.arrayFilters = &arrayFilters
240 return u
241 }
242
243
244
245 func (u *Update) Ordered(ordered bool) *Update {
246 if u == nil {
247 u = new(Update)
248 }
249
250 u.ordered = &ordered
251 return u
252 }
253
254
255
256
257 func (u *Update) Updates(updates ...bsoncore.Document) *Update {
258 if u == nil {
259 u = new(Update)
260 }
261
262 u.updates = updates
263 return u
264 }
265
266
267 func (u *Update) Session(session *session.Client) *Update {
268 if u == nil {
269 u = new(Update)
270 }
271
272 u.session = session
273 return u
274 }
275
276
277 func (u *Update) ClusterClock(clock *session.ClusterClock) *Update {
278 if u == nil {
279 u = new(Update)
280 }
281
282 u.clock = clock
283 return u
284 }
285
286
287 func (u *Update) Collection(collection string) *Update {
288 if u == nil {
289 u = new(Update)
290 }
291
292 u.collection = collection
293 return u
294 }
295
296
297 func (u *Update) CommandMonitor(monitor *event.CommandMonitor) *Update {
298 if u == nil {
299 u = new(Update)
300 }
301
302 u.monitor = monitor
303 return u
304 }
305
306
307 func (u *Update) Comment(comment bsoncore.Value) *Update {
308 if u == nil {
309 u = new(Update)
310 }
311
312 u.comment = comment
313 return u
314 }
315
316
317 func (u *Update) Database(database string) *Update {
318 if u == nil {
319 u = new(Update)
320 }
321
322 u.database = database
323 return u
324 }
325
326
327 func (u *Update) Deployment(deployment driver.Deployment) *Update {
328 if u == nil {
329 u = new(Update)
330 }
331
332 u.deployment = deployment
333 return u
334 }
335
336
337 func (u *Update) ServerSelector(selector description.ServerSelector) *Update {
338 if u == nil {
339 u = new(Update)
340 }
341
342 u.selector = selector
343 return u
344 }
345
346
347 func (u *Update) WriteConcern(writeConcern *writeconcern.WriteConcern) *Update {
348 if u == nil {
349 u = new(Update)
350 }
351
352 u.writeConcern = writeConcern
353 return u
354 }
355
356
357
358
359 func (u *Update) Retry(retry driver.RetryMode) *Update {
360 if u == nil {
361 u = new(Update)
362 }
363
364 u.retry = &retry
365 return u
366 }
367
368
369 func (u *Update) Crypt(crypt driver.Crypt) *Update {
370 if u == nil {
371 u = new(Update)
372 }
373
374 u.crypt = crypt
375 return u
376 }
377
378
379 func (u *Update) ServerAPI(serverAPI *driver.ServerAPIOptions) *Update {
380 if u == nil {
381 u = new(Update)
382 }
383
384 u.serverAPI = serverAPI
385 return u
386 }
387
388
389 func (u *Update) Let(let bsoncore.Document) *Update {
390 if u == nil {
391 u = new(Update)
392 }
393
394 u.let = let
395 return u
396 }
397
398
399 func (u *Update) Timeout(timeout *time.Duration) *Update {
400 if u == nil {
401 u = new(Update)
402 }
403
404 u.timeout = timeout
405 return u
406 }
407
408
409 func (u *Update) Logger(logger *logger.Logger) *Update {
410 if u == nil {
411 u = new(Update)
412 }
413
414 u.logger = logger
415 return u
416 }
417
View as plain text