...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/operation/aggregate.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/operation

     1  // Copyright (C) MongoDB, Inc. 2019-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package operation
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/bson/bsontype"
    15  	"go.mongodb.org/mongo-driver/event"
    16  	"go.mongodb.org/mongo-driver/internal/driverutil"
    17  	"go.mongodb.org/mongo-driver/mongo/description"
    18  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    19  	"go.mongodb.org/mongo-driver/mongo/readpref"
    20  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    21  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    22  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    23  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    24  )
    25  
    26  // Aggregate represents an aggregate operation.
    27  type Aggregate struct {
    28  	allowDiskUse             *bool
    29  	batchSize                *int32
    30  	bypassDocumentValidation *bool
    31  	collation                bsoncore.Document
    32  	comment                  *string
    33  	hint                     bsoncore.Value
    34  	maxTime                  *time.Duration
    35  	pipeline                 bsoncore.Document
    36  	session                  *session.Client
    37  	clock                    *session.ClusterClock
    38  	collection               string
    39  	monitor                  *event.CommandMonitor
    40  	database                 string
    41  	deployment               driver.Deployment
    42  	readConcern              *readconcern.ReadConcern
    43  	readPreference           *readpref.ReadPref
    44  	retry                    *driver.RetryMode
    45  	selector                 description.ServerSelector
    46  	writeConcern             *writeconcern.WriteConcern
    47  	crypt                    driver.Crypt
    48  	serverAPI                *driver.ServerAPIOptions
    49  	let                      bsoncore.Document
    50  	hasOutputStage           bool
    51  	customOptions            map[string]bsoncore.Value
    52  	timeout                  *time.Duration
    53  	omitCSOTMaxTimeMS        bool
    54  
    55  	result driver.CursorResponse
    56  }
    57  
    58  // NewAggregate constructs and returns a new Aggregate.
    59  func NewAggregate(pipeline bsoncore.Document) *Aggregate {
    60  	return &Aggregate{
    61  		pipeline: pipeline,
    62  	}
    63  }
    64  
    65  // Result returns the result of executing this operation.
    66  func (a *Aggregate) Result(opts driver.CursorOptions) (*driver.BatchCursor, error) {
    67  
    68  	clientSession := a.session
    69  
    70  	clock := a.clock
    71  	opts.ServerAPI = a.serverAPI
    72  	return driver.NewBatchCursor(a.result, clientSession, clock, opts)
    73  }
    74  
    75  // ResultCursorResponse returns the underlying CursorResponse result of executing this
    76  // operation.
    77  func (a *Aggregate) ResultCursorResponse() driver.CursorResponse {
    78  	return a.result
    79  }
    80  
    81  func (a *Aggregate) processResponse(info driver.ResponseInfo) error {
    82  	var err error
    83  
    84  	a.result, err = driver.NewCursorResponse(info)
    85  	return err
    86  
    87  }
    88  
    89  // Execute runs this operations and returns an error if the operation did not execute successfully.
    90  func (a *Aggregate) Execute(ctx context.Context) error {
    91  	if a.deployment == nil {
    92  		return errors.New("the Aggregate operation must have a Deployment set before Execute can be called")
    93  	}
    94  
    95  	return driver.Operation{
    96  		CommandFn:         a.command,
    97  		ProcessResponseFn: a.processResponse,
    98  
    99  		Client:                         a.session,
   100  		Clock:                          a.clock,
   101  		CommandMonitor:                 a.monitor,
   102  		Database:                       a.database,
   103  		Deployment:                     a.deployment,
   104  		ReadConcern:                    a.readConcern,
   105  		ReadPreference:                 a.readPreference,
   106  		Type:                           driver.Read,
   107  		RetryMode:                      a.retry,
   108  		Selector:                       a.selector,
   109  		WriteConcern:                   a.writeConcern,
   110  		Crypt:                          a.crypt,
   111  		MinimumWriteConcernWireVersion: 5,
   112  		ServerAPI:                      a.serverAPI,
   113  		IsOutputAggregate:              a.hasOutputStage,
   114  		MaxTime:                        a.maxTime,
   115  		Timeout:                        a.timeout,
   116  		Name:                           driverutil.AggregateOp,
   117  		OmitCSOTMaxTimeMS:              a.omitCSOTMaxTimeMS,
   118  	}.Execute(ctx)
   119  
   120  }
   121  
   122  func (a *Aggregate) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
   123  	header := bsoncore.Value{Type: bsontype.String, Data: bsoncore.AppendString(nil, a.collection)}
   124  	if a.collection == "" {
   125  		header = bsoncore.Value{Type: bsontype.Int32, Data: []byte{0x01, 0x00, 0x00, 0x00}}
   126  	}
   127  	dst = bsoncore.AppendValueElement(dst, "aggregate", header)
   128  
   129  	cursorIdx, cursorDoc := bsoncore.AppendDocumentStart(nil)
   130  	if a.allowDiskUse != nil {
   131  
   132  		dst = bsoncore.AppendBooleanElement(dst, "allowDiskUse", *a.allowDiskUse)
   133  	}
   134  	if a.batchSize != nil {
   135  		cursorDoc = bsoncore.AppendInt32Element(cursorDoc, "batchSize", *a.batchSize)
   136  	}
   137  	if a.bypassDocumentValidation != nil {
   138  
   139  		dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *a.bypassDocumentValidation)
   140  	}
   141  	if a.collation != nil {
   142  
   143  		if desc.WireVersion == nil || !desc.WireVersion.Includes(5) {
   144  			return nil, errors.New("the 'collation' command parameter requires a minimum server wire version of 5")
   145  		}
   146  		dst = bsoncore.AppendDocumentElement(dst, "collation", a.collation)
   147  	}
   148  	if a.comment != nil {
   149  
   150  		dst = bsoncore.AppendStringElement(dst, "comment", *a.comment)
   151  	}
   152  	if a.hint.Type != bsontype.Type(0) {
   153  
   154  		dst = bsoncore.AppendValueElement(dst, "hint", a.hint)
   155  	}
   156  	if a.pipeline != nil {
   157  
   158  		dst = bsoncore.AppendArrayElement(dst, "pipeline", a.pipeline)
   159  	}
   160  	if a.let != nil {
   161  		dst = bsoncore.AppendDocumentElement(dst, "let", a.let)
   162  	}
   163  	for optionName, optionValue := range a.customOptions {
   164  		dst = bsoncore.AppendValueElement(dst, optionName, optionValue)
   165  	}
   166  	cursorDoc, _ = bsoncore.AppendDocumentEnd(cursorDoc, cursorIdx)
   167  	dst = bsoncore.AppendDocumentElement(dst, "cursor", cursorDoc)
   168  
   169  	return dst, nil
   170  }
   171  
   172  // AllowDiskUse enables writing to temporary files. When true, aggregation stages can write to the dbPath/_tmp directory.
   173  func (a *Aggregate) AllowDiskUse(allowDiskUse bool) *Aggregate {
   174  	if a == nil {
   175  		a = new(Aggregate)
   176  	}
   177  
   178  	a.allowDiskUse = &allowDiskUse
   179  	return a
   180  }
   181  
   182  // BatchSize specifies the number of documents to return in every batch.
   183  func (a *Aggregate) BatchSize(batchSize int32) *Aggregate {
   184  	if a == nil {
   185  		a = new(Aggregate)
   186  	}
   187  
   188  	a.batchSize = &batchSize
   189  	return a
   190  }
   191  
   192  // BypassDocumentValidation allows the write to opt-out of document level validation. This only applies when the $out stage is specified.
   193  func (a *Aggregate) BypassDocumentValidation(bypassDocumentValidation bool) *Aggregate {
   194  	if a == nil {
   195  		a = new(Aggregate)
   196  	}
   197  
   198  	a.bypassDocumentValidation = &bypassDocumentValidation
   199  	return a
   200  }
   201  
   202  // Collation specifies a collation. This option is only valid for server versions 3.4 and above.
   203  func (a *Aggregate) Collation(collation bsoncore.Document) *Aggregate {
   204  	if a == nil {
   205  		a = new(Aggregate)
   206  	}
   207  
   208  	a.collation = collation
   209  	return a
   210  }
   211  
   212  // Comment specifies an arbitrary string to help trace the operation through the database profiler, currentOp, and logs.
   213  func (a *Aggregate) Comment(comment string) *Aggregate {
   214  	if a == nil {
   215  		a = new(Aggregate)
   216  	}
   217  
   218  	a.comment = &comment
   219  	return a
   220  }
   221  
   222  // Hint specifies the index to use.
   223  func (a *Aggregate) Hint(hint bsoncore.Value) *Aggregate {
   224  	if a == nil {
   225  		a = new(Aggregate)
   226  	}
   227  
   228  	a.hint = hint
   229  	return a
   230  }
   231  
   232  // MaxTime specifies the maximum amount of time to allow the query to run on the server.
   233  func (a *Aggregate) MaxTime(maxTime *time.Duration) *Aggregate {
   234  	if a == nil {
   235  		a = new(Aggregate)
   236  	}
   237  
   238  	a.maxTime = maxTime
   239  	return a
   240  }
   241  
   242  // Pipeline determines how data is transformed for an aggregation.
   243  func (a *Aggregate) Pipeline(pipeline bsoncore.Document) *Aggregate {
   244  	if a == nil {
   245  		a = new(Aggregate)
   246  	}
   247  
   248  	a.pipeline = pipeline
   249  	return a
   250  }
   251  
   252  // Session sets the session for this operation.
   253  func (a *Aggregate) Session(session *session.Client) *Aggregate {
   254  	if a == nil {
   255  		a = new(Aggregate)
   256  	}
   257  
   258  	a.session = session
   259  	return a
   260  }
   261  
   262  // ClusterClock sets the cluster clock for this operation.
   263  func (a *Aggregate) ClusterClock(clock *session.ClusterClock) *Aggregate {
   264  	if a == nil {
   265  		a = new(Aggregate)
   266  	}
   267  
   268  	a.clock = clock
   269  	return a
   270  }
   271  
   272  // Collection sets the collection that this command will run against.
   273  func (a *Aggregate) Collection(collection string) *Aggregate {
   274  	if a == nil {
   275  		a = new(Aggregate)
   276  	}
   277  
   278  	a.collection = collection
   279  	return a
   280  }
   281  
   282  // CommandMonitor sets the monitor to use for APM events.
   283  func (a *Aggregate) CommandMonitor(monitor *event.CommandMonitor) *Aggregate {
   284  	if a == nil {
   285  		a = new(Aggregate)
   286  	}
   287  
   288  	a.monitor = monitor
   289  	return a
   290  }
   291  
   292  // Database sets the database to run this operation against.
   293  func (a *Aggregate) Database(database string) *Aggregate {
   294  	if a == nil {
   295  		a = new(Aggregate)
   296  	}
   297  
   298  	a.database = database
   299  	return a
   300  }
   301  
   302  // Deployment sets the deployment to use for this operation.
   303  func (a *Aggregate) Deployment(deployment driver.Deployment) *Aggregate {
   304  	if a == nil {
   305  		a = new(Aggregate)
   306  	}
   307  
   308  	a.deployment = deployment
   309  	return a
   310  }
   311  
   312  // ReadConcern specifies the read concern for this operation.
   313  func (a *Aggregate) ReadConcern(readConcern *readconcern.ReadConcern) *Aggregate {
   314  	if a == nil {
   315  		a = new(Aggregate)
   316  	}
   317  
   318  	a.readConcern = readConcern
   319  	return a
   320  }
   321  
   322  // ReadPreference set the read preference used with this operation.
   323  func (a *Aggregate) ReadPreference(readPreference *readpref.ReadPref) *Aggregate {
   324  	if a == nil {
   325  		a = new(Aggregate)
   326  	}
   327  
   328  	a.readPreference = readPreference
   329  	return a
   330  }
   331  
   332  // ServerSelector sets the selector used to retrieve a server.
   333  func (a *Aggregate) ServerSelector(selector description.ServerSelector) *Aggregate {
   334  	if a == nil {
   335  		a = new(Aggregate)
   336  	}
   337  
   338  	a.selector = selector
   339  	return a
   340  }
   341  
   342  // WriteConcern sets the write concern for this operation.
   343  func (a *Aggregate) WriteConcern(writeConcern *writeconcern.WriteConcern) *Aggregate {
   344  	if a == nil {
   345  		a = new(Aggregate)
   346  	}
   347  
   348  	a.writeConcern = writeConcern
   349  	return a
   350  }
   351  
   352  // Retry enables retryable writes for this operation. Retries are not handled automatically,
   353  // instead a boolean is returned from Execute and SelectAndExecute that indicates if the
   354  // operation can be retried. Retrying is handled by calling RetryExecute.
   355  func (a *Aggregate) Retry(retry driver.RetryMode) *Aggregate {
   356  	if a == nil {
   357  		a = new(Aggregate)
   358  	}
   359  
   360  	a.retry = &retry
   361  	return a
   362  }
   363  
   364  // Crypt sets the Crypt object to use for automatic encryption and decryption.
   365  func (a *Aggregate) Crypt(crypt driver.Crypt) *Aggregate {
   366  	if a == nil {
   367  		a = new(Aggregate)
   368  	}
   369  
   370  	a.crypt = crypt
   371  	return a
   372  }
   373  
   374  // ServerAPI sets the server API version for this operation.
   375  func (a *Aggregate) ServerAPI(serverAPI *driver.ServerAPIOptions) *Aggregate {
   376  	if a == nil {
   377  		a = new(Aggregate)
   378  	}
   379  
   380  	a.serverAPI = serverAPI
   381  	return a
   382  }
   383  
   384  // Let specifies the let document to use. This option is only valid for server versions 5.0 and above.
   385  func (a *Aggregate) Let(let bsoncore.Document) *Aggregate {
   386  	if a == nil {
   387  		a = new(Aggregate)
   388  	}
   389  
   390  	a.let = let
   391  	return a
   392  }
   393  
   394  // HasOutputStage specifies whether the aggregate contains an output stage. Used in determining when to
   395  // append read preference at the operation level.
   396  func (a *Aggregate) HasOutputStage(hos bool) *Aggregate {
   397  	if a == nil {
   398  		a = new(Aggregate)
   399  	}
   400  
   401  	a.hasOutputStage = hos
   402  	return a
   403  }
   404  
   405  // CustomOptions specifies extra options to use in the aggregate command.
   406  func (a *Aggregate) CustomOptions(co map[string]bsoncore.Value) *Aggregate {
   407  	if a == nil {
   408  		a = new(Aggregate)
   409  	}
   410  
   411  	a.customOptions = co
   412  	return a
   413  }
   414  
   415  // Timeout sets the timeout for this operation.
   416  func (a *Aggregate) Timeout(timeout *time.Duration) *Aggregate {
   417  	if a == nil {
   418  		a = new(Aggregate)
   419  	}
   420  
   421  	a.timeout = timeout
   422  	return a
   423  }
   424  
   425  // OmitCSOTMaxTimeMS omits the automatically-calculated "maxTimeMS" from the
   426  // command when CSOT is enabled. It does not effect "maxTimeMS" set by
   427  // [Aggregate.MaxTime].
   428  func (a *Aggregate) OmitCSOTMaxTimeMS(omit bool) *Aggregate {
   429  	if a == nil {
   430  		a = new(Aggregate)
   431  	}
   432  
   433  	a.omitCSOTMaxTimeMS = omit
   434  	return a
   435  }
   436  

View as plain text