1
2
3
4
5
6
7 package unified
8
9 import (
10 "context"
11 "fmt"
12 "time"
13
14 "go.mongodb.org/mongo-driver/bson/primitive"
15 "go.mongodb.org/mongo-driver/internal/bsonutil"
16 "go.mongodb.org/mongo-driver/mongo"
17 "go.mongodb.org/mongo-driver/mongo/options"
18 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
19 )
20
21
22
23 func executeCreateChangeStream(ctx context.Context, operation *operation) (*operationResult, error) {
24 var watcher interface {
25 Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
26 }
27 var err error
28
29 watcher, err = entities(ctx).client(operation.Object)
30 if err != nil {
31 watcher, err = entities(ctx).database(operation.Object)
32 }
33 if err != nil {
34 watcher, err = entities(ctx).collection(operation.Object)
35 }
36 if err != nil {
37 return nil, fmt.Errorf("no client, database, or collection entity found with ID %q", operation.Object)
38 }
39
40 var pipeline []interface{}
41 opts := options.ChangeStream()
42
43 elems, _ := operation.Arguments.Elements()
44 for _, elem := range elems {
45 key := elem.Key()
46 val := elem.Value()
47
48 switch key {
49 case "batchSize":
50 opts.SetBatchSize(val.Int32())
51 case "collation":
52 collation, err := createCollation(val.Document())
53 if err != nil {
54 return nil, fmt.Errorf("error creating collation: %w", err)
55 }
56 opts.SetCollation(*collation)
57 case "comment":
58 commentString, err := createCommentString(val)
59 if err != nil {
60 return nil, fmt.Errorf("error creating comment: %w", err)
61 }
62 opts.SetComment(commentString)
63 case "fullDocument":
64 switch fd := val.StringValue(); fd {
65 case "default":
66 opts.SetFullDocument(options.Default)
67 case "required":
68 opts.SetFullDocument(options.Required)
69 case "updateLookup":
70 opts.SetFullDocument(options.UpdateLookup)
71 case "whenAvailable":
72 opts.SetFullDocument(options.WhenAvailable)
73 default:
74 return nil, fmt.Errorf("unrecognized fullDocument value %q", fd)
75 }
76 case "fullDocumentBeforeChange":
77 switch fdbc := val.StringValue(); fdbc {
78 case "off":
79 opts.SetFullDocumentBeforeChange(options.Off)
80 case "required":
81 opts.SetFullDocumentBeforeChange(options.Required)
82 case "whenAvailable":
83 opts.SetFullDocumentBeforeChange(options.WhenAvailable)
84 }
85 case "maxAwaitTimeMS":
86 opts.SetMaxAwaitTime(time.Duration(val.Int32()) * time.Millisecond)
87 case "pipeline":
88 pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
89 case "resumeAfter":
90 opts.SetResumeAfter(val.Document())
91 case "showExpandedEvents":
92 opts.SetShowExpandedEvents(val.Boolean())
93 case "startAfter":
94 opts.SetStartAfter(val.Document())
95 case "startAtOperationTime":
96 t, i := val.Timestamp()
97 opts.SetStartAtOperationTime(&primitive.Timestamp{T: t, I: i})
98 default:
99 return nil, fmt.Errorf("unrecognized createChangeStream option %q", key)
100 }
101 }
102 if pipeline == nil {
103 return nil, newMissingArgumentError("pipeline")
104 }
105
106 stream, err := watcher.Watch(ctx, pipeline, opts)
107 if err != nil {
108 return newErrorResult(err), nil
109 }
110
111
112
113 if operation.ResultEntityID != nil {
114 if err := entities(ctx).addCursorEntity(*operation.ResultEntityID, stream); err != nil {
115 return nil, fmt.Errorf("error storing result as cursor entity: %w", err)
116 }
117 }
118 return newEmptyResult(), nil
119 }
120
121 func executeListDatabases(ctx context.Context, operation *operation, nameOnly bool) (*operationResult, error) {
122 client, err := entities(ctx).client(operation.Object)
123 if err != nil {
124 return nil, err
125 }
126
127
128
129 filter := emptyDocument
130 opts := options.ListDatabases().SetNameOnly(nameOnly)
131
132 elems, _ := operation.Arguments.Elements()
133 for _, elem := range elems {
134 key := elem.Key()
135 val := elem.Value()
136
137 switch key {
138 case "authorizedDatabases":
139 opts.SetAuthorizedDatabases(val.Boolean())
140 case "filter":
141 filter = val.Document()
142 case "nameOnly":
143 opts.SetNameOnly(val.Boolean())
144 default:
145 return nil, fmt.Errorf("unrecognized listDatabases option %q", key)
146 }
147 }
148
149 res, err := client.ListDatabases(ctx, filter, opts)
150 if err != nil {
151 return newErrorResult(err), nil
152 }
153
154 specsArray := bsoncore.NewArrayBuilder()
155 for _, spec := range res.Databases {
156 rawSpec := bsoncore.NewDocumentBuilder().
157 AppendString("name", spec.Name).
158 AppendInt64("sizeOnDisk", spec.SizeOnDisk).
159 AppendBoolean("empty", spec.Empty).
160 Build()
161
162 specsArray.AppendDocument(rawSpec)
163 }
164 raw := bsoncore.NewDocumentBuilder().
165 AppendArray("databases", specsArray.Build()).
166 AppendInt64("totalSize", res.TotalSize).
167 Build()
168 return newDocumentResult(raw, nil), nil
169 }
170
View as plain text