...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/operation/command.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/operation

     1  // Copyright (C) MongoDB, Inc. 2021-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package operation
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/event"
    15  	"go.mongodb.org/mongo-driver/internal/logger"
    16  	"go.mongodb.org/mongo-driver/mongo/description"
    17  	"go.mongodb.org/mongo-driver/mongo/readpref"
    18  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    19  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    20  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    21  )
    22  
    23  // Command is used to run a generic operation.
    24  type Command struct {
    25  	command        bsoncore.Document
    26  	database       string
    27  	deployment     driver.Deployment
    28  	selector       description.ServerSelector
    29  	readPreference *readpref.ReadPref
    30  	clock          *session.ClusterClock
    31  	session        *session.Client
    32  	monitor        *event.CommandMonitor
    33  	resultResponse bsoncore.Document
    34  	resultCursor   *driver.BatchCursor
    35  	crypt          driver.Crypt
    36  	serverAPI      *driver.ServerAPIOptions
    37  	createCursor   bool
    38  	cursorOpts     driver.CursorOptions
    39  	timeout        *time.Duration
    40  	logger         *logger.Logger
    41  }
    42  
    43  // NewCommand constructs and returns a new Command. Once the operation is executed, the result may only be accessed via
    44  // the Result() function.
    45  func NewCommand(command bsoncore.Document) *Command {
    46  	return &Command{
    47  		command: command,
    48  	}
    49  }
    50  
    51  // NewCursorCommand constructs a new Command. Once the operation is executed, the server response will be used to
    52  // construct a cursor, which can be accessed via the ResultCursor() function.
    53  func NewCursorCommand(command bsoncore.Document, cursorOpts driver.CursorOptions) *Command {
    54  	return &Command{
    55  		command:      command,
    56  		cursorOpts:   cursorOpts,
    57  		createCursor: true,
    58  	}
    59  }
    60  
    61  // Result returns the result of executing this operation.
    62  func (c *Command) Result() bsoncore.Document { return c.resultResponse }
    63  
    64  // ResultCursor returns the BatchCursor that was constructed using the command response. If the operation was not
    65  // configured to create a cursor (i.e. it was created using NewCommand rather than NewCursorCommand), this function
    66  // will return nil and an error.
    67  func (c *Command) ResultCursor() (*driver.BatchCursor, error) {
    68  	if !c.createCursor {
    69  		return nil, errors.New("command operation was not configured to create a cursor, but a result cursor was requested")
    70  	}
    71  	return c.resultCursor, nil
    72  }
    73  
    74  // Execute runs this operations and returns an error if the operation did not execute successfully.
    75  func (c *Command) Execute(ctx context.Context) error {
    76  	if c.deployment == nil {
    77  		return errors.New("the Command operation must have a Deployment set before Execute can be called")
    78  	}
    79  
    80  	return driver.Operation{
    81  		CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
    82  			return append(dst, c.command[4:len(c.command)-1]...), nil
    83  		},
    84  		ProcessResponseFn: func(info driver.ResponseInfo) error {
    85  			c.resultResponse = info.ServerResponse
    86  
    87  			if c.createCursor {
    88  				cursorRes, err := driver.NewCursorResponse(info)
    89  				if err != nil {
    90  					return err
    91  				}
    92  
    93  				c.resultCursor, err = driver.NewBatchCursor(cursorRes, c.session, c.clock, c.cursorOpts)
    94  				return err
    95  			}
    96  
    97  			return nil
    98  		},
    99  		Client:         c.session,
   100  		Clock:          c.clock,
   101  		CommandMonitor: c.monitor,
   102  		Database:       c.database,
   103  		Deployment:     c.deployment,
   104  		ReadPreference: c.readPreference,
   105  		Selector:       c.selector,
   106  		Crypt:          c.crypt,
   107  		ServerAPI:      c.serverAPI,
   108  		Timeout:        c.timeout,
   109  		Logger:         c.logger,
   110  	}.Execute(ctx)
   111  }
   112  
   113  // Session sets the session for this operation.
   114  func (c *Command) Session(session *session.Client) *Command {
   115  	if c == nil {
   116  		c = new(Command)
   117  	}
   118  
   119  	c.session = session
   120  	return c
   121  }
   122  
   123  // ClusterClock sets the cluster clock for this operation.
   124  func (c *Command) ClusterClock(clock *session.ClusterClock) *Command {
   125  	if c == nil {
   126  		c = new(Command)
   127  	}
   128  
   129  	c.clock = clock
   130  	return c
   131  }
   132  
   133  // CommandMonitor sets the monitor to use for APM events.
   134  func (c *Command) CommandMonitor(monitor *event.CommandMonitor) *Command {
   135  	if c == nil {
   136  		c = new(Command)
   137  	}
   138  
   139  	c.monitor = monitor
   140  	return c
   141  }
   142  
   143  // Database sets the database to run this operation against.
   144  func (c *Command) Database(database string) *Command {
   145  	if c == nil {
   146  		c = new(Command)
   147  	}
   148  
   149  	c.database = database
   150  	return c
   151  }
   152  
   153  // Deployment sets the deployment to use for this operation.
   154  func (c *Command) Deployment(deployment driver.Deployment) *Command {
   155  	if c == nil {
   156  		c = new(Command)
   157  	}
   158  
   159  	c.deployment = deployment
   160  	return c
   161  }
   162  
   163  // ReadPreference set the read preference used with this operation.
   164  func (c *Command) ReadPreference(readPreference *readpref.ReadPref) *Command {
   165  	if c == nil {
   166  		c = new(Command)
   167  	}
   168  
   169  	c.readPreference = readPreference
   170  	return c
   171  }
   172  
   173  // ServerSelector sets the selector used to retrieve a server.
   174  func (c *Command) ServerSelector(selector description.ServerSelector) *Command {
   175  	if c == nil {
   176  		c = new(Command)
   177  	}
   178  
   179  	c.selector = selector
   180  	return c
   181  }
   182  
   183  // Crypt sets the Crypt object to use for automatic encryption and decryption.
   184  func (c *Command) Crypt(crypt driver.Crypt) *Command {
   185  	if c == nil {
   186  		c = new(Command)
   187  	}
   188  
   189  	c.crypt = crypt
   190  	return c
   191  }
   192  
   193  // ServerAPI sets the server API version for this operation.
   194  func (c *Command) ServerAPI(serverAPI *driver.ServerAPIOptions) *Command {
   195  	if c == nil {
   196  		c = new(Command)
   197  	}
   198  
   199  	c.serverAPI = serverAPI
   200  	return c
   201  }
   202  
   203  // Timeout sets the timeout for this operation.
   204  func (c *Command) Timeout(timeout *time.Duration) *Command {
   205  	if c == nil {
   206  		c = new(Command)
   207  	}
   208  
   209  	c.timeout = timeout
   210  	return c
   211  }
   212  
   213  // Logger sets the logger for this operation.
   214  func (c *Command) Logger(logger *logger.Logger) *Command {
   215  	if c == nil {
   216  		c = new(Command)
   217  	}
   218  
   219  	c.logger = logger
   220  	return c
   221  }
   222  

View as plain text