...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/client_operation_execution.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/bson/primitive"
    15  	"go.mongodb.org/mongo-driver/internal/bsonutil"
    16  	"go.mongodb.org/mongo-driver/mongo"
    17  	"go.mongodb.org/mongo-driver/mongo/options"
    18  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    19  )
    20  
    21  // This file contains helpers to execute client operations.
    22  
    23  func executeCreateChangeStream(ctx context.Context, operation *operation) (*operationResult, error) {
    24  	var watcher interface {
    25  		Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
    26  	}
    27  	var err error
    28  
    29  	watcher, err = entities(ctx).client(operation.Object)
    30  	if err != nil {
    31  		watcher, err = entities(ctx).database(operation.Object)
    32  	}
    33  	if err != nil {
    34  		watcher, err = entities(ctx).collection(operation.Object)
    35  	}
    36  	if err != nil {
    37  		return nil, fmt.Errorf("no client, database, or collection entity found with ID %q", operation.Object)
    38  	}
    39  
    40  	var pipeline []interface{}
    41  	opts := options.ChangeStream()
    42  
    43  	elems, _ := operation.Arguments.Elements()
    44  	for _, elem := range elems {
    45  		key := elem.Key()
    46  		val := elem.Value()
    47  
    48  		switch key {
    49  		case "batchSize":
    50  			opts.SetBatchSize(val.Int32())
    51  		case "collation":
    52  			collation, err := createCollation(val.Document())
    53  			if err != nil {
    54  				return nil, fmt.Errorf("error creating collation: %w", err)
    55  			}
    56  			opts.SetCollation(*collation)
    57  		case "comment":
    58  			commentString, err := createCommentString(val)
    59  			if err != nil {
    60  				return nil, fmt.Errorf("error creating comment: %w", err)
    61  			}
    62  			opts.SetComment(commentString)
    63  		case "fullDocument":
    64  			switch fd := val.StringValue(); fd {
    65  			case "default":
    66  				opts.SetFullDocument(options.Default)
    67  			case "required":
    68  				opts.SetFullDocument(options.Required)
    69  			case "updateLookup":
    70  				opts.SetFullDocument(options.UpdateLookup)
    71  			case "whenAvailable":
    72  				opts.SetFullDocument(options.WhenAvailable)
    73  			default:
    74  				return nil, fmt.Errorf("unrecognized fullDocument value %q", fd)
    75  			}
    76  		case "fullDocumentBeforeChange":
    77  			switch fdbc := val.StringValue(); fdbc {
    78  			case "off":
    79  				opts.SetFullDocumentBeforeChange(options.Off)
    80  			case "required":
    81  				opts.SetFullDocumentBeforeChange(options.Required)
    82  			case "whenAvailable":
    83  				opts.SetFullDocumentBeforeChange(options.WhenAvailable)
    84  			}
    85  		case "maxAwaitTimeMS":
    86  			opts.SetMaxAwaitTime(time.Duration(val.Int32()) * time.Millisecond)
    87  		case "pipeline":
    88  			pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
    89  		case "resumeAfter":
    90  			opts.SetResumeAfter(val.Document())
    91  		case "showExpandedEvents":
    92  			opts.SetShowExpandedEvents(val.Boolean())
    93  		case "startAfter":
    94  			opts.SetStartAfter(val.Document())
    95  		case "startAtOperationTime":
    96  			t, i := val.Timestamp()
    97  			opts.SetStartAtOperationTime(&primitive.Timestamp{T: t, I: i})
    98  		default:
    99  			return nil, fmt.Errorf("unrecognized createChangeStream option %q", key)
   100  		}
   101  	}
   102  	if pipeline == nil {
   103  		return nil, newMissingArgumentError("pipeline")
   104  	}
   105  
   106  	stream, err := watcher.Watch(ctx, pipeline, opts)
   107  	if err != nil {
   108  		return newErrorResult(err), nil
   109  	}
   110  
   111  	// createChangeStream is sometimes used with no corresponding saveResultAsEntity field. Return an
   112  	// empty result in this case.
   113  	if operation.ResultEntityID != nil {
   114  		if err := entities(ctx).addCursorEntity(*operation.ResultEntityID, stream); err != nil {
   115  			return nil, fmt.Errorf("error storing result as cursor entity: %w", err)
   116  		}
   117  	}
   118  	return newEmptyResult(), nil
   119  }
   120  
   121  func executeListDatabases(ctx context.Context, operation *operation, nameOnly bool) (*operationResult, error) {
   122  	client, err := entities(ctx).client(operation.Object)
   123  	if err != nil {
   124  		return nil, err
   125  	}
   126  
   127  	// We set a default filter rather than erroring if the Arguments doc doesn't have a "filter" field because the
   128  	// spec says drivers should support this field, not must.
   129  	filter := emptyDocument
   130  	opts := options.ListDatabases().SetNameOnly(nameOnly)
   131  
   132  	elems, _ := operation.Arguments.Elements()
   133  	for _, elem := range elems {
   134  		key := elem.Key()
   135  		val := elem.Value()
   136  
   137  		switch key {
   138  		case "authorizedDatabases":
   139  			opts.SetAuthorizedDatabases(val.Boolean())
   140  		case "filter":
   141  			filter = val.Document()
   142  		case "nameOnly":
   143  			opts.SetNameOnly(val.Boolean())
   144  		default:
   145  			return nil, fmt.Errorf("unrecognized listDatabases option %q", key)
   146  		}
   147  	}
   148  
   149  	res, err := client.ListDatabases(ctx, filter, opts)
   150  	if err != nil {
   151  		return newErrorResult(err), nil
   152  	}
   153  
   154  	specsArray := bsoncore.NewArrayBuilder()
   155  	for _, spec := range res.Databases {
   156  		rawSpec := bsoncore.NewDocumentBuilder().
   157  			AppendString("name", spec.Name).
   158  			AppendInt64("sizeOnDisk", spec.SizeOnDisk).
   159  			AppendBoolean("empty", spec.Empty).
   160  			Build()
   161  
   162  		specsArray.AppendDocument(rawSpec)
   163  	}
   164  	raw := bsoncore.NewDocumentBuilder().
   165  		AppendArray("databases", specsArray.Build()).
   166  		AppendInt64("totalSize", res.TotalSize).
   167  		Build()
   168  	return newDocumentResult(raw, nil), nil
   169  }
   170  

View as plain text