...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/operation/insert.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/operation

     1  // Copyright (C) MongoDB, Inc. 2019-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package operation
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"time"
    14  
    15  	"go.mongodb.org/mongo-driver/bson/bsontype"
    16  	"go.mongodb.org/mongo-driver/event"
    17  	"go.mongodb.org/mongo-driver/internal/driverutil"
    18  	"go.mongodb.org/mongo-driver/internal/logger"
    19  	"go.mongodb.org/mongo-driver/mongo/description"
    20  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    21  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    22  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    23  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    24  )
    25  
    26  // Insert performs an insert operation.
    27  type Insert struct {
    28  	bypassDocumentValidation *bool
    29  	comment                  bsoncore.Value
    30  	documents                []bsoncore.Document
    31  	ordered                  *bool
    32  	session                  *session.Client
    33  	clock                    *session.ClusterClock
    34  	collection               string
    35  	monitor                  *event.CommandMonitor
    36  	crypt                    driver.Crypt
    37  	database                 string
    38  	deployment               driver.Deployment
    39  	selector                 description.ServerSelector
    40  	writeConcern             *writeconcern.WriteConcern
    41  	retry                    *driver.RetryMode
    42  	result                   InsertResult
    43  	serverAPI                *driver.ServerAPIOptions
    44  	timeout                  *time.Duration
    45  	logger                   *logger.Logger
    46  }
    47  
    48  // InsertResult represents an insert result returned by the server.
    49  type InsertResult struct {
    50  	// Number of documents successfully inserted.
    51  	N int64
    52  }
    53  
    54  func buildInsertResult(response bsoncore.Document) (InsertResult, error) {
    55  	elements, err := response.Elements()
    56  	if err != nil {
    57  		return InsertResult{}, err
    58  	}
    59  	ir := InsertResult{}
    60  	for _, element := range elements {
    61  		switch element.Key() {
    62  		case "n":
    63  			var ok bool
    64  			ir.N, ok = element.Value().AsInt64OK()
    65  			if !ok {
    66  				return ir, fmt.Errorf("response field 'n' is type int32 or int64, but received BSON type %s", element.Value().Type)
    67  			}
    68  		}
    69  	}
    70  	return ir, nil
    71  }
    72  
    73  // NewInsert constructs and returns a new Insert.
    74  func NewInsert(documents ...bsoncore.Document) *Insert {
    75  	return &Insert{
    76  		documents: documents,
    77  	}
    78  }
    79  
    80  // Result returns the result of executing this operation.
    81  func (i *Insert) Result() InsertResult { return i.result }
    82  
    83  func (i *Insert) processResponse(info driver.ResponseInfo) error {
    84  	ir, err := buildInsertResult(info.ServerResponse)
    85  	i.result.N += ir.N
    86  	return err
    87  }
    88  
    89  // Execute runs this operations and returns an error if the operation did not execute successfully.
    90  func (i *Insert) Execute(ctx context.Context) error {
    91  	if i.deployment == nil {
    92  		return errors.New("the Insert operation must have a Deployment set before Execute can be called")
    93  	}
    94  	batches := &driver.Batches{
    95  		Identifier: "documents",
    96  		Documents:  i.documents,
    97  		Ordered:    i.ordered,
    98  	}
    99  
   100  	return driver.Operation{
   101  		CommandFn:         i.command,
   102  		ProcessResponseFn: i.processResponse,
   103  		Batches:           batches,
   104  		RetryMode:         i.retry,
   105  		Type:              driver.Write,
   106  		Client:            i.session,
   107  		Clock:             i.clock,
   108  		CommandMonitor:    i.monitor,
   109  		Crypt:             i.crypt,
   110  		Database:          i.database,
   111  		Deployment:        i.deployment,
   112  		Selector:          i.selector,
   113  		WriteConcern:      i.writeConcern,
   114  		ServerAPI:         i.serverAPI,
   115  		Timeout:           i.timeout,
   116  		Logger:            i.logger,
   117  		Name:              driverutil.InsertOp,
   118  	}.Execute(ctx)
   119  
   120  }
   121  
   122  func (i *Insert) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
   123  	dst = bsoncore.AppendStringElement(dst, "insert", i.collection)
   124  	if i.bypassDocumentValidation != nil && (desc.WireVersion != nil && desc.WireVersion.Includes(4)) {
   125  		dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *i.bypassDocumentValidation)
   126  	}
   127  	if i.comment.Type != bsontype.Type(0) {
   128  		dst = bsoncore.AppendValueElement(dst, "comment", i.comment)
   129  	}
   130  	if i.ordered != nil {
   131  		dst = bsoncore.AppendBooleanElement(dst, "ordered", *i.ordered)
   132  	}
   133  	return dst, nil
   134  }
   135  
   136  // BypassDocumentValidation allows the operation to opt-out of document level validation. Valid
   137  // for server versions >= 3.2. For servers < 3.2, this setting is ignored.
   138  func (i *Insert) BypassDocumentValidation(bypassDocumentValidation bool) *Insert {
   139  	if i == nil {
   140  		i = new(Insert)
   141  	}
   142  
   143  	i.bypassDocumentValidation = &bypassDocumentValidation
   144  	return i
   145  }
   146  
   147  // Comment sets a value to help trace an operation.
   148  func (i *Insert) Comment(comment bsoncore.Value) *Insert {
   149  	if i == nil {
   150  		i = new(Insert)
   151  	}
   152  
   153  	i.comment = comment
   154  	return i
   155  }
   156  
   157  // Documents adds documents to this operation that will be inserted when this operation is
   158  // executed.
   159  func (i *Insert) Documents(documents ...bsoncore.Document) *Insert {
   160  	if i == nil {
   161  		i = new(Insert)
   162  	}
   163  
   164  	i.documents = documents
   165  	return i
   166  }
   167  
   168  // Ordered sets ordered. If true, when a write fails, the operation will return the error, when
   169  // false write failures do not stop execution of the operation.
   170  func (i *Insert) Ordered(ordered bool) *Insert {
   171  	if i == nil {
   172  		i = new(Insert)
   173  	}
   174  
   175  	i.ordered = &ordered
   176  	return i
   177  }
   178  
   179  // Session sets the session for this operation.
   180  func (i *Insert) Session(session *session.Client) *Insert {
   181  	if i == nil {
   182  		i = new(Insert)
   183  	}
   184  
   185  	i.session = session
   186  	return i
   187  }
   188  
   189  // ClusterClock sets the cluster clock for this operation.
   190  func (i *Insert) ClusterClock(clock *session.ClusterClock) *Insert {
   191  	if i == nil {
   192  		i = new(Insert)
   193  	}
   194  
   195  	i.clock = clock
   196  	return i
   197  }
   198  
   199  // Collection sets the collection that this command will run against.
   200  func (i *Insert) Collection(collection string) *Insert {
   201  	if i == nil {
   202  		i = new(Insert)
   203  	}
   204  
   205  	i.collection = collection
   206  	return i
   207  }
   208  
   209  // CommandMonitor sets the monitor to use for APM events.
   210  func (i *Insert) CommandMonitor(monitor *event.CommandMonitor) *Insert {
   211  	if i == nil {
   212  		i = new(Insert)
   213  	}
   214  
   215  	i.monitor = monitor
   216  	return i
   217  }
   218  
   219  // Crypt sets the Crypt object to use for automatic encryption and decryption.
   220  func (i *Insert) Crypt(crypt driver.Crypt) *Insert {
   221  	if i == nil {
   222  		i = new(Insert)
   223  	}
   224  
   225  	i.crypt = crypt
   226  	return i
   227  }
   228  
   229  // Database sets the database to run this operation against.
   230  func (i *Insert) Database(database string) *Insert {
   231  	if i == nil {
   232  		i = new(Insert)
   233  	}
   234  
   235  	i.database = database
   236  	return i
   237  }
   238  
   239  // Deployment sets the deployment to use for this operation.
   240  func (i *Insert) Deployment(deployment driver.Deployment) *Insert {
   241  	if i == nil {
   242  		i = new(Insert)
   243  	}
   244  
   245  	i.deployment = deployment
   246  	return i
   247  }
   248  
   249  // ServerSelector sets the selector used to retrieve a server.
   250  func (i *Insert) ServerSelector(selector description.ServerSelector) *Insert {
   251  	if i == nil {
   252  		i = new(Insert)
   253  	}
   254  
   255  	i.selector = selector
   256  	return i
   257  }
   258  
   259  // WriteConcern sets the write concern for this operation.
   260  func (i *Insert) WriteConcern(writeConcern *writeconcern.WriteConcern) *Insert {
   261  	if i == nil {
   262  		i = new(Insert)
   263  	}
   264  
   265  	i.writeConcern = writeConcern
   266  	return i
   267  }
   268  
   269  // Retry enables retryable mode for this operation. Retries are handled automatically in driver.Operation.Execute based
   270  // on how the operation is set.
   271  func (i *Insert) Retry(retry driver.RetryMode) *Insert {
   272  	if i == nil {
   273  		i = new(Insert)
   274  	}
   275  
   276  	i.retry = &retry
   277  	return i
   278  }
   279  
   280  // ServerAPI sets the server API version for this operation.
   281  func (i *Insert) ServerAPI(serverAPI *driver.ServerAPIOptions) *Insert {
   282  	if i == nil {
   283  		i = new(Insert)
   284  	}
   285  
   286  	i.serverAPI = serverAPI
   287  	return i
   288  }
   289  
   290  // Timeout sets the timeout for this operation.
   291  func (i *Insert) Timeout(timeout *time.Duration) *Insert {
   292  	if i == nil {
   293  		i = new(Insert)
   294  	}
   295  
   296  	i.timeout = timeout
   297  	return i
   298  }
   299  
   300  // Logger sets the logger for this operation.
   301  func (i *Insert) Logger(logger *logger.Logger) *Insert {
   302  	if i == nil {
   303  		i = new(Insert)
   304  	}
   305  
   306  	i.logger = logger
   307  	return i
   308  }
   309  

View as plain text