1 // Copyright (C) MongoDB, Inc. 2017-present. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may 4 // not use this file except in compliance with the License. You may obtain 5 // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7 package options 8 9 import ( 10 "time" 11 12 "go.mongodb.org/mongo-driver/bson" 13 "go.mongodb.org/mongo-driver/bson/primitive" 14 ) 15 16 // ChangeStreamOptions represents options that can be used to configure a Watch operation. 17 type ChangeStreamOptions struct { 18 // The maximum number of documents to be included in each batch returned by the server. 19 BatchSize *int32 20 21 // Specifies a collation to use for string comparisons during the operation. This option is only valid for MongoDB 22 // versions >= 3.4. For previous server versions, the driver will return an error if this option is used. The 23 // default value is nil, which means the default collation of the collection will be used. 24 Collation *Collation 25 26 // A string that will be included in server logs, profiling logs, and currentOp queries to help trace the operation. 27 // The default is nil, which means that no comment will be included in the logs. 28 Comment *string 29 30 // Specifies how the updated document should be returned in change notifications for update operations. The default 31 // is options.Default, which means that only partial update deltas will be included in the change notification. 32 FullDocument *FullDocument 33 34 // Specifies how the pre-update document should be returned in change notifications for update operations. The default 35 // is options.Off, which means that the pre-update document will not be included in the change notification. 36 FullDocumentBeforeChange *FullDocument 37 38 // The maximum amount of time that the server should wait for new documents to satisfy a tailable cursor query. 39 MaxAwaitTime *time.Duration 40 41 // A document specifying the logical starting point for the change stream. Only changes corresponding to an oplog 42 // entry immediately after the resume token will be returned. If this is specified, StartAtOperationTime and 43 // StartAfter must not be set. 44 ResumeAfter interface{} 45 46 // ShowExpandedEvents specifies whether the server will return an expanded list of change stream events. Additional 47 // events include: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection and 48 // refineCollectionShardKey. This option is only valid for MongoDB versions >= 6.0. 49 ShowExpandedEvents *bool 50 51 // If specified, the change stream will only return changes that occurred at or after the given timestamp. This 52 // option is only valid for MongoDB versions >= 4.0. If this is specified, ResumeAfter and StartAfter must not be 53 // set. 54 StartAtOperationTime *primitive.Timestamp 55 56 // A document specifying the logical starting point for the change stream. This is similar to the ResumeAfter 57 // option, but allows a resume token from an "invalidate" notification to be used. This allows a change stream on a 58 // collection to be resumed after the collection has been dropped and recreated or renamed. Only changes 59 // corresponding to an oplog entry immediately after the specified token will be returned. If this is specified, 60 // ResumeAfter and StartAtOperationTime must not be set. This option is only valid for MongoDB versions >= 4.1.1. 61 StartAfter interface{} 62 63 // Custom options to be added to the initial aggregate for the change stream. Key-value pairs of the BSON map should 64 // correlate with desired option names and values. Values must be Marshalable. Custom options may conflict with 65 // non-custom options, and custom options bypass client-side validation. Prefer using non-custom options where possible. 66 Custom bson.M 67 68 // Custom options to be added to the $changeStream stage in the initial aggregate. Key-value pairs of the BSON map should 69 // correlate with desired option names and values. Values must be Marshalable. Custom pipeline options bypass client-side 70 // validation. Prefer using non-custom options where possible. 71 CustomPipeline bson.M 72 } 73 74 // ChangeStream creates a new ChangeStreamOptions instance. 75 func ChangeStream() *ChangeStreamOptions { 76 cso := &ChangeStreamOptions{} 77 return cso 78 } 79 80 // SetBatchSize sets the value for the BatchSize field. 81 func (cso *ChangeStreamOptions) SetBatchSize(i int32) *ChangeStreamOptions { 82 cso.BatchSize = &i 83 return cso 84 } 85 86 // SetCollation sets the value for the Collation field. 87 func (cso *ChangeStreamOptions) SetCollation(c Collation) *ChangeStreamOptions { 88 cso.Collation = &c 89 return cso 90 } 91 92 // SetComment sets the value for the Comment field. 93 func (cso *ChangeStreamOptions) SetComment(comment string) *ChangeStreamOptions { 94 cso.Comment = &comment 95 return cso 96 } 97 98 // SetFullDocument sets the value for the FullDocument field. 99 func (cso *ChangeStreamOptions) SetFullDocument(fd FullDocument) *ChangeStreamOptions { 100 cso.FullDocument = &fd 101 return cso 102 } 103 104 // SetFullDocumentBeforeChange sets the value for the FullDocumentBeforeChange field. 105 func (cso *ChangeStreamOptions) SetFullDocumentBeforeChange(fdbc FullDocument) *ChangeStreamOptions { 106 cso.FullDocumentBeforeChange = &fdbc 107 return cso 108 } 109 110 // SetMaxAwaitTime sets the value for the MaxAwaitTime field. 111 func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOptions { 112 cso.MaxAwaitTime = &d 113 return cso 114 } 115 116 // SetResumeAfter sets the value for the ResumeAfter field. 117 func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions { 118 cso.ResumeAfter = rt 119 return cso 120 } 121 122 // SetShowExpandedEvents sets the value for the ShowExpandedEvents field. 123 func (cso *ChangeStreamOptions) SetShowExpandedEvents(see bool) *ChangeStreamOptions { 124 cso.ShowExpandedEvents = &see 125 return cso 126 } 127 128 // SetStartAtOperationTime sets the value for the StartAtOperationTime field. 129 func (cso *ChangeStreamOptions) SetStartAtOperationTime(t *primitive.Timestamp) *ChangeStreamOptions { 130 cso.StartAtOperationTime = t 131 return cso 132 } 133 134 // SetStartAfter sets the value for the StartAfter field. 135 func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptions { 136 cso.StartAfter = sa 137 return cso 138 } 139 140 // SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate 141 // with desired option names and values. Values must be Marshalable. Custom options may conflict 142 // with non-custom options, and custom options bypass client-side validation. Prefer using non-custom 143 // options where possible. 144 func (cso *ChangeStreamOptions) SetCustom(c bson.M) *ChangeStreamOptions { 145 cso.Custom = c 146 return cso 147 } 148 149 // SetCustomPipeline sets the value for the CustomPipeline field. Key-value pairs of the BSON map 150 // should correlate with desired option names and values. Values must be Marshalable. Custom pipeline 151 // options bypass client-side validation. Prefer using non-custom options where possible. 152 func (cso *ChangeStreamOptions) SetCustomPipeline(cp bson.M) *ChangeStreamOptions { 153 cso.CustomPipeline = cp 154 return cso 155 } 156 157 // MergeChangeStreamOptions combines the given ChangeStreamOptions instances into a single ChangeStreamOptions in a 158 // last-one-wins fashion. 159 // 160 // Deprecated: Merging options structs will not be supported in Go Driver 2.0. Users should create a 161 // single options struct instead. 162 func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions { 163 csOpts := ChangeStream() 164 for _, cso := range opts { 165 if cso == nil { 166 continue 167 } 168 if cso.BatchSize != nil { 169 csOpts.BatchSize = cso.BatchSize 170 } 171 if cso.Collation != nil { 172 csOpts.Collation = cso.Collation 173 } 174 if cso.Comment != nil { 175 csOpts.Comment = cso.Comment 176 } 177 if cso.FullDocument != nil { 178 csOpts.FullDocument = cso.FullDocument 179 } 180 if cso.FullDocumentBeforeChange != nil { 181 csOpts.FullDocumentBeforeChange = cso.FullDocumentBeforeChange 182 } 183 if cso.MaxAwaitTime != nil { 184 csOpts.MaxAwaitTime = cso.MaxAwaitTime 185 } 186 if cso.ResumeAfter != nil { 187 csOpts.ResumeAfter = cso.ResumeAfter 188 } 189 if cso.ShowExpandedEvents != nil { 190 csOpts.ShowExpandedEvents = cso.ShowExpandedEvents 191 } 192 if cso.StartAtOperationTime != nil { 193 csOpts.StartAtOperationTime = cso.StartAtOperationTime 194 } 195 if cso.StartAfter != nil { 196 csOpts.StartAfter = cso.StartAfter 197 } 198 if cso.Custom != nil { 199 csOpts.Custom = cso.Custom 200 } 201 if cso.CustomPipeline != nil { 202 csOpts.CustomPipeline = cso.CustomPipeline 203 } 204 } 205 206 return csOpts 207 } 208