...

Source file src/go.mongodb.org/mongo-driver/mongo/options/changestreamoptions.go

Documentation: go.mongodb.org/mongo-driver/mongo/options

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package options
     8  
     9  import (
    10  	"time"
    11  
    12  	"go.mongodb.org/mongo-driver/bson"
    13  	"go.mongodb.org/mongo-driver/bson/primitive"
    14  )
    15  
    16  // ChangeStreamOptions represents options that can be used to configure a Watch operation.
    17  type ChangeStreamOptions struct {
    18  	// The maximum number of documents to be included in each batch returned by the server.
    19  	BatchSize *int32
    20  
    21  	// Specifies a collation to use for string comparisons during the operation. This option is only valid for MongoDB
    22  	// versions >= 3.4. For previous server versions, the driver will return an error if this option is used. The
    23  	// default value is nil, which means the default collation of the collection will be used.
    24  	Collation *Collation
    25  
    26  	// A string that will be included in server logs, profiling logs, and currentOp queries to help trace the operation.
    27  	// The default is nil, which means that no comment will be included in the logs.
    28  	Comment *string
    29  
    30  	// Specifies how the updated document should be returned in change notifications for update operations. The default
    31  	// is options.Default, which means that only partial update deltas will be included in the change notification.
    32  	FullDocument *FullDocument
    33  
    34  	// Specifies how the pre-update document should be returned in change notifications for update operations. The default
    35  	// is options.Off, which means that the pre-update document will not be included in the change notification.
    36  	FullDocumentBeforeChange *FullDocument
    37  
    38  	// The maximum amount of time that the server should wait for new documents to satisfy a tailable cursor query.
    39  	MaxAwaitTime *time.Duration
    40  
    41  	// A document specifying the logical starting point for the change stream. Only changes corresponding to an oplog
    42  	// entry immediately after the resume token will be returned. If this is specified, StartAtOperationTime and
    43  	// StartAfter must not be set.
    44  	ResumeAfter interface{}
    45  
    46  	// ShowExpandedEvents specifies whether the server will return an expanded list of change stream events. Additional
    47  	// events include: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection and
    48  	// refineCollectionShardKey. This option is only valid for MongoDB versions >= 6.0.
    49  	ShowExpandedEvents *bool
    50  
    51  	// If specified, the change stream will only return changes that occurred at or after the given timestamp. This
    52  	// option is only valid for MongoDB versions >= 4.0. If this is specified, ResumeAfter and StartAfter must not be
    53  	// set.
    54  	StartAtOperationTime *primitive.Timestamp
    55  
    56  	// A document specifying the logical starting point for the change stream. This is similar to the ResumeAfter
    57  	// option, but allows a resume token from an "invalidate" notification to be used. This allows a change stream on a
    58  	// collection to be resumed after the collection has been dropped and recreated or renamed. Only changes
    59  	// corresponding to an oplog entry immediately after the specified token will be returned. If this is specified,
    60  	// ResumeAfter and StartAtOperationTime must not be set. This option is only valid for MongoDB versions >= 4.1.1.
    61  	StartAfter interface{}
    62  
    63  	// Custom options to be added to the initial aggregate for the change stream. Key-value pairs of the BSON map should
    64  	// correlate with desired option names and values. Values must be Marshalable. Custom options may conflict with
    65  	// non-custom options, and custom options bypass client-side validation. Prefer using non-custom options where possible.
    66  	Custom bson.M
    67  
    68  	// Custom options to be added to the $changeStream stage in the initial aggregate. Key-value pairs of the BSON map should
    69  	// correlate with desired option names and values. Values must be Marshalable. Custom pipeline options bypass client-side
    70  	// validation. Prefer using non-custom options where possible.
    71  	CustomPipeline bson.M
    72  }
    73  
    74  // ChangeStream creates a new ChangeStreamOptions instance.
    75  func ChangeStream() *ChangeStreamOptions {
    76  	cso := &ChangeStreamOptions{}
    77  	return cso
    78  }
    79  
    80  // SetBatchSize sets the value for the BatchSize field.
    81  func (cso *ChangeStreamOptions) SetBatchSize(i int32) *ChangeStreamOptions {
    82  	cso.BatchSize = &i
    83  	return cso
    84  }
    85  
    86  // SetCollation sets the value for the Collation field.
    87  func (cso *ChangeStreamOptions) SetCollation(c Collation) *ChangeStreamOptions {
    88  	cso.Collation = &c
    89  	return cso
    90  }
    91  
    92  // SetComment sets the value for the Comment field.
    93  func (cso *ChangeStreamOptions) SetComment(comment string) *ChangeStreamOptions {
    94  	cso.Comment = &comment
    95  	return cso
    96  }
    97  
    98  // SetFullDocument sets the value for the FullDocument field.
    99  func (cso *ChangeStreamOptions) SetFullDocument(fd FullDocument) *ChangeStreamOptions {
   100  	cso.FullDocument = &fd
   101  	return cso
   102  }
   103  
   104  // SetFullDocumentBeforeChange sets the value for the FullDocumentBeforeChange field.
   105  func (cso *ChangeStreamOptions) SetFullDocumentBeforeChange(fdbc FullDocument) *ChangeStreamOptions {
   106  	cso.FullDocumentBeforeChange = &fdbc
   107  	return cso
   108  }
   109  
   110  // SetMaxAwaitTime sets the value for the MaxAwaitTime field.
   111  func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOptions {
   112  	cso.MaxAwaitTime = &d
   113  	return cso
   114  }
   115  
   116  // SetResumeAfter sets the value for the ResumeAfter field.
   117  func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions {
   118  	cso.ResumeAfter = rt
   119  	return cso
   120  }
   121  
   122  // SetShowExpandedEvents sets the value for the ShowExpandedEvents field.
   123  func (cso *ChangeStreamOptions) SetShowExpandedEvents(see bool) *ChangeStreamOptions {
   124  	cso.ShowExpandedEvents = &see
   125  	return cso
   126  }
   127  
   128  // SetStartAtOperationTime sets the value for the StartAtOperationTime field.
   129  func (cso *ChangeStreamOptions) SetStartAtOperationTime(t *primitive.Timestamp) *ChangeStreamOptions {
   130  	cso.StartAtOperationTime = t
   131  	return cso
   132  }
   133  
   134  // SetStartAfter sets the value for the StartAfter field.
   135  func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptions {
   136  	cso.StartAfter = sa
   137  	return cso
   138  }
   139  
   140  // SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate
   141  // with desired option names and values. Values must be Marshalable. Custom options may conflict
   142  // with non-custom options, and custom options bypass client-side validation. Prefer using non-custom
   143  // options where possible.
   144  func (cso *ChangeStreamOptions) SetCustom(c bson.M) *ChangeStreamOptions {
   145  	cso.Custom = c
   146  	return cso
   147  }
   148  
   149  // SetCustomPipeline sets the value for the CustomPipeline field. Key-value pairs of the BSON map
   150  // should correlate with desired option names and values. Values must be Marshalable. Custom pipeline
   151  // options bypass client-side validation. Prefer using non-custom options where possible.
   152  func (cso *ChangeStreamOptions) SetCustomPipeline(cp bson.M) *ChangeStreamOptions {
   153  	cso.CustomPipeline = cp
   154  	return cso
   155  }
   156  
   157  // MergeChangeStreamOptions combines the given ChangeStreamOptions instances into a single ChangeStreamOptions in a
   158  // last-one-wins fashion.
   159  //
   160  // Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a
   161  // single options struct instead.
   162  func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions {
   163  	csOpts := ChangeStream()
   164  	for _, cso := range opts {
   165  		if cso == nil {
   166  			continue
   167  		}
   168  		if cso.BatchSize != nil {
   169  			csOpts.BatchSize = cso.BatchSize
   170  		}
   171  		if cso.Collation != nil {
   172  			csOpts.Collation = cso.Collation
   173  		}
   174  		if cso.Comment != nil {
   175  			csOpts.Comment = cso.Comment
   176  		}
   177  		if cso.FullDocument != nil {
   178  			csOpts.FullDocument = cso.FullDocument
   179  		}
   180  		if cso.FullDocumentBeforeChange != nil {
   181  			csOpts.FullDocumentBeforeChange = cso.FullDocumentBeforeChange
   182  		}
   183  		if cso.MaxAwaitTime != nil {
   184  			csOpts.MaxAwaitTime = cso.MaxAwaitTime
   185  		}
   186  		if cso.ResumeAfter != nil {
   187  			csOpts.ResumeAfter = cso.ResumeAfter
   188  		}
   189  		if cso.ShowExpandedEvents != nil {
   190  			csOpts.ShowExpandedEvents = cso.ShowExpandedEvents
   191  		}
   192  		if cso.StartAtOperationTime != nil {
   193  			csOpts.StartAtOperationTime = cso.StartAtOperationTime
   194  		}
   195  		if cso.StartAfter != nil {
   196  			csOpts.StartAfter = cso.StartAfter
   197  		}
   198  		if cso.Custom != nil {
   199  			csOpts.Custom = cso.Custom
   200  		}
   201  		if cso.CustomPipeline != nil {
   202  			csOpts.CustomPipeline = cso.CustomPipeline
   203  		}
   204  	}
   205  
   206  	return csOpts
   207  }
   208  

View as plain text