...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/operation/update.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/operation

     1  // Copyright (C) MongoDB, Inc. 2019-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package operation
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"time"
    14  
    15  	"go.mongodb.org/mongo-driver/bson"
    16  	"go.mongodb.org/mongo-driver/bson/bsontype"
    17  	"go.mongodb.org/mongo-driver/event"
    18  	"go.mongodb.org/mongo-driver/internal/driverutil"
    19  	"go.mongodb.org/mongo-driver/internal/logger"
    20  	"go.mongodb.org/mongo-driver/mongo/description"
    21  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    22  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    23  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    24  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    25  )
    26  
    27  // Update performs an update operation.
    28  type Update struct {
    29  	bypassDocumentValidation *bool
    30  	comment                  bsoncore.Value
    31  	ordered                  *bool
    32  	updates                  []bsoncore.Document
    33  	session                  *session.Client
    34  	clock                    *session.ClusterClock
    35  	collection               string
    36  	monitor                  *event.CommandMonitor
    37  	database                 string
    38  	deployment               driver.Deployment
    39  	hint                     *bool
    40  	arrayFilters             *bool
    41  	selector                 description.ServerSelector
    42  	writeConcern             *writeconcern.WriteConcern
    43  	retry                    *driver.RetryMode
    44  	result                   UpdateResult
    45  	crypt                    driver.Crypt
    46  	serverAPI                *driver.ServerAPIOptions
    47  	let                      bsoncore.Document
    48  	timeout                  *time.Duration
    49  	logger                   *logger.Logger
    50  }
    51  
    52  // Upsert contains the information for an upsert in an Update operation.
    53  type Upsert struct {
    54  	Index int64
    55  	ID    interface{} `bson:"_id"`
    56  }
    57  
    58  // UpdateResult contains information for the result of an Update operation.
    59  type UpdateResult struct {
    60  	// Number of documents matched.
    61  	N int64
    62  	// Number of documents modified.
    63  	NModified int64
    64  	// Information about upserted documents.
    65  	Upserted []Upsert
    66  }
    67  
    68  func buildUpdateResult(response bsoncore.Document) (UpdateResult, error) {
    69  	elements, err := response.Elements()
    70  	if err != nil {
    71  		return UpdateResult{}, err
    72  	}
    73  	ur := UpdateResult{}
    74  	for _, element := range elements {
    75  		switch element.Key() {
    76  		case "nModified":
    77  			var ok bool
    78  			ur.NModified, ok = element.Value().AsInt64OK()
    79  			if !ok {
    80  				return ur, fmt.Errorf("response field 'nModified' is type int32 or int64, but received BSON type %s", element.Value().Type)
    81  			}
    82  		case "n":
    83  			var ok bool
    84  			ur.N, ok = element.Value().AsInt64OK()
    85  			if !ok {
    86  				return ur, fmt.Errorf("response field 'n' is type int32 or int64, but received BSON type %s", element.Value().Type)
    87  			}
    88  		case "upserted":
    89  			arr, ok := element.Value().ArrayOK()
    90  			if !ok {
    91  				return ur, fmt.Errorf("response field 'upserted' is type array, but received BSON type %s", element.Value().Type)
    92  			}
    93  
    94  			var values []bsoncore.Value
    95  			values, err = arr.Values()
    96  			if err != nil {
    97  				break
    98  			}
    99  
   100  			for _, val := range values {
   101  				valDoc, ok := val.DocumentOK()
   102  				if !ok {
   103  					return ur, fmt.Errorf("upserted value is type document, but received BSON type %s", val.Type)
   104  				}
   105  				var upsert Upsert
   106  				if err = bson.Unmarshal(valDoc, &upsert); err != nil {
   107  					return ur, err
   108  				}
   109  				ur.Upserted = append(ur.Upserted, upsert)
   110  			}
   111  		}
   112  	}
   113  	return ur, nil
   114  }
   115  
   116  // NewUpdate constructs and returns a new Update.
   117  func NewUpdate(updates ...bsoncore.Document) *Update {
   118  	return &Update{
   119  		updates: updates,
   120  	}
   121  }
   122  
   123  // Result returns the result of executing this operation.
   124  func (u *Update) Result() UpdateResult { return u.result }
   125  
   126  func (u *Update) processResponse(info driver.ResponseInfo) error {
   127  	ur, err := buildUpdateResult(info.ServerResponse)
   128  
   129  	u.result.N += ur.N
   130  	u.result.NModified += ur.NModified
   131  	if info.CurrentIndex > 0 {
   132  		for ind := range ur.Upserted {
   133  			ur.Upserted[ind].Index += int64(info.CurrentIndex)
   134  		}
   135  	}
   136  	u.result.Upserted = append(u.result.Upserted, ur.Upserted...)
   137  	return err
   138  
   139  }
   140  
   141  // Execute runs this operations and returns an error if the operation did not execute successfully.
   142  func (u *Update) Execute(ctx context.Context) error {
   143  	if u.deployment == nil {
   144  		return errors.New("the Update operation must have a Deployment set before Execute can be called")
   145  	}
   146  	batches := &driver.Batches{
   147  		Identifier: "updates",
   148  		Documents:  u.updates,
   149  		Ordered:    u.ordered,
   150  	}
   151  
   152  	return driver.Operation{
   153  		CommandFn:         u.command,
   154  		ProcessResponseFn: u.processResponse,
   155  		Batches:           batches,
   156  		RetryMode:         u.retry,
   157  		Type:              driver.Write,
   158  		Client:            u.session,
   159  		Clock:             u.clock,
   160  		CommandMonitor:    u.monitor,
   161  		Database:          u.database,
   162  		Deployment:        u.deployment,
   163  		Selector:          u.selector,
   164  		WriteConcern:      u.writeConcern,
   165  		Crypt:             u.crypt,
   166  		ServerAPI:         u.serverAPI,
   167  		Timeout:           u.timeout,
   168  		Logger:            u.logger,
   169  		Name:              driverutil.UpdateOp,
   170  	}.Execute(ctx)
   171  
   172  }
   173  
   174  func (u *Update) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
   175  	dst = bsoncore.AppendStringElement(dst, "update", u.collection)
   176  	if u.bypassDocumentValidation != nil &&
   177  		(desc.WireVersion != nil && desc.WireVersion.Includes(4)) {
   178  
   179  		dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *u.bypassDocumentValidation)
   180  	}
   181  	if u.comment.Type != bsontype.Type(0) {
   182  		dst = bsoncore.AppendValueElement(dst, "comment", u.comment)
   183  	}
   184  	if u.ordered != nil {
   185  
   186  		dst = bsoncore.AppendBooleanElement(dst, "ordered", *u.ordered)
   187  	}
   188  	if u.hint != nil && *u.hint {
   189  
   190  		if desc.WireVersion == nil || !desc.WireVersion.Includes(5) {
   191  			return nil, errors.New("the 'hint' command parameter requires a minimum server wire version of 5")
   192  		}
   193  		if !u.writeConcern.Acknowledged() {
   194  			return nil, errUnacknowledgedHint
   195  		}
   196  	}
   197  	if u.arrayFilters != nil && *u.arrayFilters {
   198  		if desc.WireVersion == nil || !desc.WireVersion.Includes(6) {
   199  			return nil, errors.New("the 'arrayFilters' command parameter requires a minimum server wire version of 6")
   200  		}
   201  	}
   202  	if u.let != nil {
   203  		dst = bsoncore.AppendDocumentElement(dst, "let", u.let)
   204  	}
   205  
   206  	return dst, nil
   207  }
   208  
   209  // BypassDocumentValidation allows the operation to opt-out of document level validation. Valid
   210  // for server versions >= 3.2. For servers < 3.2, this setting is ignored.
   211  func (u *Update) BypassDocumentValidation(bypassDocumentValidation bool) *Update {
   212  	if u == nil {
   213  		u = new(Update)
   214  	}
   215  
   216  	u.bypassDocumentValidation = &bypassDocumentValidation
   217  	return u
   218  }
   219  
   220  // Hint is a flag to indicate that the update document contains a hint. Hint is only supported by
   221  // servers >= 4.2. Older servers >= 3.4 will report an error for using the hint option. For servers <
   222  // 3.4, the driver will return an error if the hint option is used.
   223  func (u *Update) Hint(hint bool) *Update {
   224  	if u == nil {
   225  		u = new(Update)
   226  	}
   227  
   228  	u.hint = &hint
   229  	return u
   230  }
   231  
   232  // ArrayFilters is a flag to indicate that the update document contains an arrayFilters field. This option is only
   233  // supported on server versions 3.6 and higher. For servers < 3.6, the driver will return an error.
   234  func (u *Update) ArrayFilters(arrayFilters bool) *Update {
   235  	if u == nil {
   236  		u = new(Update)
   237  	}
   238  
   239  	u.arrayFilters = &arrayFilters
   240  	return u
   241  }
   242  
   243  // Ordered sets ordered. If true, when a write fails, the operation will return the error, when
   244  // false write failures do not stop execution of the operation.
   245  func (u *Update) Ordered(ordered bool) *Update {
   246  	if u == nil {
   247  		u = new(Update)
   248  	}
   249  
   250  	u.ordered = &ordered
   251  	return u
   252  }
   253  
   254  // Updates specifies an array of update statements to perform when this operation is executed.
   255  // Each update document must have the following structure:
   256  // {q: <query>, u: <update>, multi: <boolean>, collation: Optional<Document>, arrayFitlers: Optional<Array>, hint: Optional<string/Document>}.
   257  func (u *Update) Updates(updates ...bsoncore.Document) *Update {
   258  	if u == nil {
   259  		u = new(Update)
   260  	}
   261  
   262  	u.updates = updates
   263  	return u
   264  }
   265  
   266  // Session sets the session for this operation.
   267  func (u *Update) Session(session *session.Client) *Update {
   268  	if u == nil {
   269  		u = new(Update)
   270  	}
   271  
   272  	u.session = session
   273  	return u
   274  }
   275  
   276  // ClusterClock sets the cluster clock for this operation.
   277  func (u *Update) ClusterClock(clock *session.ClusterClock) *Update {
   278  	if u == nil {
   279  		u = new(Update)
   280  	}
   281  
   282  	u.clock = clock
   283  	return u
   284  }
   285  
   286  // Collection sets the collection that this command will run against.
   287  func (u *Update) Collection(collection string) *Update {
   288  	if u == nil {
   289  		u = new(Update)
   290  	}
   291  
   292  	u.collection = collection
   293  	return u
   294  }
   295  
   296  // CommandMonitor sets the monitor to use for APM events.
   297  func (u *Update) CommandMonitor(monitor *event.CommandMonitor) *Update {
   298  	if u == nil {
   299  		u = new(Update)
   300  	}
   301  
   302  	u.monitor = monitor
   303  	return u
   304  }
   305  
   306  // Comment sets a value to help trace an operation.
   307  func (u *Update) Comment(comment bsoncore.Value) *Update {
   308  	if u == nil {
   309  		u = new(Update)
   310  	}
   311  
   312  	u.comment = comment
   313  	return u
   314  }
   315  
   316  // Database sets the database to run this operation against.
   317  func (u *Update) Database(database string) *Update {
   318  	if u == nil {
   319  		u = new(Update)
   320  	}
   321  
   322  	u.database = database
   323  	return u
   324  }
   325  
   326  // Deployment sets the deployment to use for this operation.
   327  func (u *Update) Deployment(deployment driver.Deployment) *Update {
   328  	if u == nil {
   329  		u = new(Update)
   330  	}
   331  
   332  	u.deployment = deployment
   333  	return u
   334  }
   335  
   336  // ServerSelector sets the selector used to retrieve a server.
   337  func (u *Update) ServerSelector(selector description.ServerSelector) *Update {
   338  	if u == nil {
   339  		u = new(Update)
   340  	}
   341  
   342  	u.selector = selector
   343  	return u
   344  }
   345  
   346  // WriteConcern sets the write concern for this operation.
   347  func (u *Update) WriteConcern(writeConcern *writeconcern.WriteConcern) *Update {
   348  	if u == nil {
   349  		u = new(Update)
   350  	}
   351  
   352  	u.writeConcern = writeConcern
   353  	return u
   354  }
   355  
   356  // Retry enables retryable writes for this operation. Retries are not handled automatically,
   357  // instead a boolean is returned from Execute and SelectAndExecute that indicates if the
   358  // operation can be retried. Retrying is handled by calling RetryExecute.
   359  func (u *Update) Retry(retry driver.RetryMode) *Update {
   360  	if u == nil {
   361  		u = new(Update)
   362  	}
   363  
   364  	u.retry = &retry
   365  	return u
   366  }
   367  
   368  // Crypt sets the Crypt object to use for automatic encryption and decryption.
   369  func (u *Update) Crypt(crypt driver.Crypt) *Update {
   370  	if u == nil {
   371  		u = new(Update)
   372  	}
   373  
   374  	u.crypt = crypt
   375  	return u
   376  }
   377  
   378  // ServerAPI sets the server API version for this operation.
   379  func (u *Update) ServerAPI(serverAPI *driver.ServerAPIOptions) *Update {
   380  	if u == nil {
   381  		u = new(Update)
   382  	}
   383  
   384  	u.serverAPI = serverAPI
   385  	return u
   386  }
   387  
   388  // Let specifies the let document to use. This option is only valid for server versions 5.0 and above.
   389  func (u *Update) Let(let bsoncore.Document) *Update {
   390  	if u == nil {
   391  		u = new(Update)
   392  	}
   393  
   394  	u.let = let
   395  	return u
   396  }
   397  
   398  // Timeout sets the timeout for this operation.
   399  func (u *Update) Timeout(timeout *time.Duration) *Update {
   400  	if u == nil {
   401  		u = new(Update)
   402  	}
   403  
   404  	u.timeout = timeout
   405  	return u
   406  }
   407  
   408  // Logger sets the logger for this operation.
   409  func (u *Update) Logger(logger *logger.Logger) *Update {
   410  	if u == nil {
   411  		u = new(Update)
   412  	}
   413  
   414  	u.logger = logger
   415  	return u
   416  }
   417  

View as plain text