...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/bulkwrite_helpers.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"fmt"
    11  
    12  	"go.mongodb.org/mongo-driver/bson"
    13  	"go.mongodb.org/mongo-driver/internal/bsonutil"
    14  	"go.mongodb.org/mongo-driver/mongo"
    15  	"go.mongodb.org/mongo-driver/mongo/options"
    16  )
    17  
    18  // This file provides helper functions to convert BSON documents to WriteModel instances.
    19  
    20  // createBulkWriteModels converts a bson.Raw that is internally an array to a slice of WriteModel. Each value in the
    21  // array must be a document in the form { requestType: { optionKey1: optionValue1, ... } }. For example, the document
    22  // { insertOne: { document: { x: 1 } } } would be translated to an InsertOneModel to insert the document { x: 1 }.
    23  func createBulkWriteModels(rawModels bson.Raw) ([]mongo.WriteModel, error) {
    24  	vals, _ := rawModels.Values()
    25  	models := make([]mongo.WriteModel, 0, len(vals))
    26  
    27  	for idx, val := range vals {
    28  		model, err := createBulkWriteModel(val.Document())
    29  		if err != nil {
    30  			return nil, fmt.Errorf("error creating model at index %d: %w", idx, err)
    31  		}
    32  		models = append(models, model)
    33  	}
    34  	return models, nil
    35  }
    36  
    37  // createBulkWriteModel converts the provided BSON document to a WriteModel.
    38  func createBulkWriteModel(rawModel bson.Raw) (mongo.WriteModel, error) {
    39  	firstElem := rawModel.Index(0)
    40  	requestType := firstElem.Key()
    41  	args := firstElem.Value().Document()
    42  
    43  	switch requestType {
    44  	case "insertOne":
    45  		var document bson.Raw
    46  		elems, _ := args.Elements()
    47  		for _, elem := range elems {
    48  			key := elem.Key()
    49  			val := elem.Value()
    50  
    51  			switch key {
    52  			case "document":
    53  				document = val.Document()
    54  			default:
    55  				return nil, fmt.Errorf("unrecognized insertOne option %q", key)
    56  			}
    57  		}
    58  		if document == nil {
    59  			return nil, newMissingArgumentError("document")
    60  		}
    61  
    62  		return mongo.NewInsertOneModel().SetDocument(document), nil
    63  	case "updateOne":
    64  		uom := mongo.NewUpdateOneModel()
    65  		var filter bson.Raw
    66  		var update interface{}
    67  		var err error
    68  
    69  		elems, _ := args.Elements()
    70  		for _, elem := range elems {
    71  			key := elem.Key()
    72  			val := elem.Value()
    73  
    74  			switch key {
    75  			case "arrayFilters":
    76  				uom.SetArrayFilters(options.ArrayFilters{
    77  					Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...),
    78  				})
    79  			case "collation":
    80  				collation, err := createCollation(val.Document())
    81  				if err != nil {
    82  					return nil, fmt.Errorf("error creating collation: %w", err)
    83  				}
    84  				uom.SetCollation(collation)
    85  			case "filter":
    86  				filter = val.Document()
    87  			case "hint":
    88  				hint, err := createHint(val)
    89  				if err != nil {
    90  					return nil, fmt.Errorf("error creating hint: %w", err)
    91  				}
    92  				uom.SetHint(hint)
    93  			case "update":
    94  				update, err = createUpdateValue(val)
    95  				if err != nil {
    96  					return nil, fmt.Errorf("error creating update: %w", err)
    97  				}
    98  			case "upsert":
    99  				uom.SetUpsert(val.Boolean())
   100  			default:
   101  				return nil, fmt.Errorf("unrecognized updateOne option %q", key)
   102  			}
   103  		}
   104  		if filter == nil {
   105  			return nil, newMissingArgumentError("filter")
   106  		}
   107  		if update == nil {
   108  			return nil, newMissingArgumentError("update")
   109  		}
   110  
   111  		return uom.SetFilter(filter).SetUpdate(update), nil
   112  	case "updateMany":
   113  		umm := mongo.NewUpdateManyModel()
   114  		var filter bson.Raw
   115  		var update interface{}
   116  		var err error
   117  
   118  		elems, _ := args.Elements()
   119  		for _, elem := range elems {
   120  			key := elem.Key()
   121  			val := elem.Value()
   122  
   123  			switch key {
   124  			case "arrayFilters":
   125  				umm.SetArrayFilters(options.ArrayFilters{
   126  					Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...),
   127  				})
   128  			case "collation":
   129  				collation, err := createCollation(val.Document())
   130  				if err != nil {
   131  					return nil, fmt.Errorf("error creating collation: %w", err)
   132  				}
   133  				umm.SetCollation(collation)
   134  			case "filter":
   135  				filter = val.Document()
   136  			case "hint":
   137  				hint, err := createHint(val)
   138  				if err != nil {
   139  					return nil, fmt.Errorf("error creating hint: %w", err)
   140  				}
   141  				umm.SetHint(hint)
   142  			case "update":
   143  				update, err = createUpdateValue(val)
   144  				if err != nil {
   145  					return nil, fmt.Errorf("error creating update: %w", err)
   146  				}
   147  			case "upsert":
   148  				umm.SetUpsert(val.Boolean())
   149  			default:
   150  				return nil, fmt.Errorf("unrecognized updateMany option %q", key)
   151  			}
   152  		}
   153  		if filter == nil {
   154  			return nil, newMissingArgumentError("filter")
   155  		}
   156  		if update == nil {
   157  			return nil, newMissingArgumentError("update")
   158  		}
   159  
   160  		return umm.SetFilter(filter).SetUpdate(update), nil
   161  	case "deleteOne":
   162  		dom := mongo.NewDeleteOneModel()
   163  		var filter bson.Raw
   164  
   165  		elems, _ := args.Elements()
   166  		for _, elem := range elems {
   167  			key := elem.Key()
   168  			val := elem.Value()
   169  
   170  			switch key {
   171  			case "filter":
   172  				filter = val.Document()
   173  			case "hint":
   174  				hint, err := createHint(val)
   175  				if err != nil {
   176  					return nil, fmt.Errorf("error creating hint: %w", err)
   177  				}
   178  				dom.SetHint(hint)
   179  			default:
   180  				return nil, fmt.Errorf("unrecognized deleteOne option %q", key)
   181  			}
   182  		}
   183  		if filter == nil {
   184  			return nil, newMissingArgumentError("filter")
   185  		}
   186  
   187  		return dom.SetFilter(filter), nil
   188  	case "deleteMany":
   189  		dmm := mongo.NewDeleteManyModel()
   190  		var filter bson.Raw
   191  
   192  		elems, _ := args.Elements()
   193  		for _, elem := range elems {
   194  			key := elem.Key()
   195  			val := elem.Value()
   196  
   197  			switch key {
   198  			case "collation":
   199  				collation, err := createCollation(val.Document())
   200  				if err != nil {
   201  					return nil, fmt.Errorf("error creating collation: %w", err)
   202  				}
   203  				dmm.SetCollation(collation)
   204  			case "filter":
   205  				filter = val.Document()
   206  			case "hint":
   207  				hint, err := createHint(val)
   208  				if err != nil {
   209  					return nil, fmt.Errorf("error creating hint: %w", err)
   210  				}
   211  				dmm.SetHint(hint)
   212  			default:
   213  				return nil, fmt.Errorf("unrecognized deleteMany option %q", key)
   214  			}
   215  		}
   216  		if filter == nil {
   217  			return nil, newMissingArgumentError("filter")
   218  		}
   219  
   220  		return dmm.SetFilter(filter), nil
   221  	case "replaceOne":
   222  		rom := mongo.NewReplaceOneModel()
   223  		var filter, replacement bson.Raw
   224  
   225  		elems, _ := args.Elements()
   226  		for _, elem := range elems {
   227  			key := elem.Key()
   228  			val := elem.Value()
   229  
   230  			switch key {
   231  			case "collation":
   232  				collation, err := createCollation(val.Document())
   233  				if err != nil {
   234  					return nil, fmt.Errorf("error creating collation: %w", err)
   235  				}
   236  				rom.SetCollation(collation)
   237  			case "filter":
   238  				filter = val.Document()
   239  			case "hint":
   240  				hint, err := createHint(val)
   241  				if err != nil {
   242  					return nil, fmt.Errorf("error creating hint: %w", err)
   243  				}
   244  				rom.SetHint(hint)
   245  			case "replacement":
   246  				replacement = val.Document()
   247  			case "upsert":
   248  				rom.SetUpsert(val.Boolean())
   249  			default:
   250  				return nil, fmt.Errorf("unrecognized replaceOne option %q", key)
   251  			}
   252  		}
   253  		if filter == nil {
   254  			return nil, newMissingArgumentError("filter")
   255  		}
   256  		if replacement == nil {
   257  			return nil, newMissingArgumentError("replacement")
   258  		}
   259  
   260  		return rom.SetFilter(filter).SetReplacement(replacement), nil
   261  	default:
   262  		return nil, fmt.Errorf("unrecognized request type: %v", requestType)
   263  	}
   264  }
   265  
   266  // createUpdateValue converts the provided RawValue to a value that can be passed to UpdateOne/UpdateMany functions.
   267  // This helper handles both document and pipeline-style updates.
   268  func createUpdateValue(updateVal bson.RawValue) (interface{}, error) {
   269  	switch updateVal.Type {
   270  	case bson.TypeEmbeddedDocument:
   271  		return updateVal.Document(), nil
   272  	case bson.TypeArray:
   273  		var updateDocs []bson.Raw
   274  		docs, _ := updateVal.Array().Values()
   275  		for _, doc := range docs {
   276  			updateDocs = append(updateDocs, doc.Document())
   277  		}
   278  
   279  		return updateDocs, nil
   280  	default:
   281  		return nil, fmt.Errorf("unrecognized update type: %s", updateVal.Type)
   282  	}
   283  }
   284  

View as plain text