...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/gridfs_bucket_operation_execution.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"bytes"
    11  	"context"
    12  	"encoding/hex"
    13  	"fmt"
    14  	"io"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson"
    18  	"go.mongodb.org/mongo-driver/bson/bsontype"
    19  	"go.mongodb.org/mongo-driver/mongo/options"
    20  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    21  )
    22  
    23  func createBucketFindCursor(ctx context.Context, operation *operation) (*cursorResult, error) {
    24  	bucket, err := entities(ctx).gridFSBucket(operation.Object)
    25  	if err != nil {
    26  		return nil, err
    27  	}
    28  
    29  	var filter bson.Raw
    30  	opts := options.GridFSFind()
    31  
    32  	elems, err := operation.Arguments.Elements()
    33  	if err != nil {
    34  		return nil, err
    35  	}
    36  	for _, elem := range elems {
    37  		key := elem.Key()
    38  		val := elem.Value()
    39  
    40  		switch key {
    41  		case "maxTimeMS":
    42  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
    43  		case "filter":
    44  			filter = val.Document()
    45  		default:
    46  			return nil, fmt.Errorf("unrecognized bucket find option %q", key)
    47  		}
    48  	}
    49  	if filter == nil {
    50  		return nil, newMissingArgumentError("filter")
    51  	}
    52  
    53  	cursor, err := bucket.FindContext(ctx, filter, opts)
    54  	res := &cursorResult{
    55  		cursor: cursor,
    56  		err:    err,
    57  	}
    58  	return res, nil
    59  }
    60  
    61  func executeBucketDelete(ctx context.Context, operation *operation) (*operationResult, error) {
    62  	bucket, err := entities(ctx).gridFSBucket(operation.Object)
    63  	if err != nil {
    64  		return nil, err
    65  	}
    66  
    67  	var id *bson.RawValue
    68  
    69  	elems, err := operation.Arguments.Elements()
    70  	if err != nil {
    71  		return nil, err
    72  	}
    73  	for _, elem := range elems {
    74  		key := elem.Key()
    75  		val := elem.Value()
    76  
    77  		switch key {
    78  		case "id":
    79  			id = &val
    80  		default:
    81  			return nil, fmt.Errorf("unrecognized bucket delete option %q", key)
    82  		}
    83  	}
    84  	if id == nil {
    85  		return nil, newMissingArgumentError("id")
    86  	}
    87  
    88  	return newErrorResult(bucket.DeleteContext(ctx, *id)), nil
    89  }
    90  
    91  func executeBucketDownload(ctx context.Context, operation *operation) (*operationResult, error) {
    92  	bucket, err := entities(ctx).gridFSBucket(operation.Object)
    93  	if err != nil {
    94  		return nil, err
    95  	}
    96  
    97  	var id *bson.RawValue
    98  	elems, err := operation.Arguments.Elements()
    99  	if err != nil {
   100  		return nil, err
   101  	}
   102  	for _, elem := range elems {
   103  		key := elem.Key()
   104  		val := elem.Value()
   105  
   106  		switch key {
   107  		case "id":
   108  			id = &val
   109  		default:
   110  			return nil, fmt.Errorf("unrecognized bucket download option %q", key)
   111  		}
   112  	}
   113  	if id == nil {
   114  		return nil, newMissingArgumentError("id")
   115  	}
   116  
   117  	stream, err := bucket.OpenDownloadStream(*id)
   118  	if err != nil {
   119  		return newErrorResult(err), nil
   120  	}
   121  
   122  	var buffer bytes.Buffer
   123  	if _, err := io.Copy(&buffer, stream); err != nil {
   124  		return newErrorResult(err), nil
   125  	}
   126  
   127  	return newValueResult(bsontype.Binary, bsoncore.AppendBinary(nil, 0, buffer.Bytes()), nil), nil
   128  }
   129  
   130  func executeBucketDownloadByName(ctx context.Context, operation *operation) (*operationResult, error) {
   131  	bucket, err := entities(ctx).gridFSBucket(operation.Object)
   132  	if err != nil {
   133  		return nil, err
   134  	}
   135  
   136  	elems, err := operation.Arguments.Elements()
   137  	if err != nil {
   138  		return nil, err
   139  	}
   140  
   141  	var filename string
   142  	opts := options.GridFSName()
   143  	for _, elem := range elems {
   144  		key := elem.Key()
   145  		val := elem.Value()
   146  
   147  		switch key {
   148  		case "filename":
   149  			filename = val.StringValue()
   150  		case "revision":
   151  			opts.SetRevision(val.AsInt32())
   152  		default:
   153  			return nil, fmt.Errorf("unrecognized bucket download option %q", key)
   154  		}
   155  	}
   156  	if filename == "" {
   157  		return nil, newMissingArgumentError("filename")
   158  	}
   159  
   160  	var buf bytes.Buffer
   161  	_, err = bucket.DownloadToStreamByName(filename, &buf, opts)
   162  	if err != nil {
   163  		return newErrorResult(err), nil
   164  	}
   165  
   166  	return newValueResult(bsontype.Binary, bsoncore.AppendBinary(nil, 0, buf.Bytes()), nil), nil
   167  }
   168  
   169  func executeBucketDrop(ctx context.Context, operation *operation) (*operationResult, error) {
   170  	bucket, err := entities(ctx).gridFSBucket(operation.Object)
   171  	if err != nil {
   172  		return nil, err
   173  	}
   174  
   175  	return newErrorResult(bucket.DropContext(ctx)), nil
   176  }
   177  
   178  func executeBucketRename(ctx context.Context, operation *operation) (*operationResult, error) {
   179  	bucket, err := entities(ctx).gridFSBucket(operation.Object)
   180  	if err != nil {
   181  		return nil, err
   182  	}
   183  
   184  	var id *bson.RawValue
   185  	var newFilename string
   186  	elems, err := operation.Arguments.Elements()
   187  	if err != nil {
   188  		return nil, err
   189  	}
   190  	for _, elem := range elems {
   191  		key := elem.Key()
   192  		val := elem.Value()
   193  
   194  		switch key {
   195  		case "id":
   196  			id = &val
   197  		case "newFilename":
   198  			newFilename = val.StringValue()
   199  		default:
   200  			return nil, fmt.Errorf("unrecognized bucket rename option %q", key)
   201  		}
   202  	}
   203  	if id == nil {
   204  		return nil, newMissingArgumentError("id")
   205  	}
   206  
   207  	return newErrorResult(bucket.RenameContext(ctx, id, newFilename)), nil
   208  }
   209  
   210  func executeBucketUpload(ctx context.Context, operation *operation) (*operationResult, error) {
   211  	bucket, err := entities(ctx).gridFSBucket(operation.Object)
   212  	if err != nil {
   213  		return nil, err
   214  	}
   215  
   216  	var filename string
   217  	var fileBytes []byte
   218  	opts := options.GridFSUpload()
   219  
   220  	elems, err := operation.Arguments.Elements()
   221  	if err != nil {
   222  		return nil, err
   223  	}
   224  	for _, elem := range elems {
   225  		key := elem.Key()
   226  		val := elem.Value()
   227  
   228  		switch key {
   229  		case "chunkSizeBytes":
   230  			opts.SetChunkSizeBytes(val.Int32())
   231  		case "filename":
   232  			filename = val.StringValue()
   233  		case "metadata":
   234  			opts.SetMetadata(val.Document())
   235  		case "source":
   236  			fileBytes, err = hex.DecodeString(val.Document().Lookup("$$hexBytes").StringValue())
   237  			if err != nil {
   238  				return nil, fmt.Errorf("error converting source string to bytes: %w", err)
   239  			}
   240  		case "contentType":
   241  			return nil, newSkipTestError("the deprecated contentType file option is not supported")
   242  		case "disableMD5":
   243  			return nil, newSkipTestError("the deprecated disableMD5 file option is not supported")
   244  		default:
   245  			return nil, fmt.Errorf("unrecognized bucket upload option %q", key)
   246  		}
   247  	}
   248  	if filename == "" {
   249  		return nil, newMissingArgumentError("filename")
   250  	}
   251  	if fileBytes == nil {
   252  		return nil, newMissingArgumentError("source")
   253  	}
   254  
   255  	fileID, err := bucket.UploadFromStream(filename, bytes.NewReader(fileBytes), opts)
   256  	if err != nil {
   257  		return newErrorResult(err), nil
   258  	}
   259  
   260  	if operation.ResultEntityID != nil {
   261  		fileIDValue := bson.RawValue{
   262  			Type:  bsontype.ObjectID,
   263  			Value: fileID[:],
   264  		}
   265  		if err := entities(ctx).addBSONEntity(*operation.ResultEntityID, fileIDValue); err != nil {
   266  			return nil, fmt.Errorf("error storing result as BSON entity: %w", err)
   267  		}
   268  	}
   269  
   270  	return newValueResult(bsontype.ObjectID, fileID[:], nil), nil
   271  }
   272  

View as plain text