...
1
2
3
4
5
6
7 package operation
8
9 import (
10 "context"
11 "errors"
12 "time"
13
14 "go.mongodb.org/mongo-driver/event"
15 "go.mongodb.org/mongo-driver/internal/logger"
16 "go.mongodb.org/mongo-driver/mongo/description"
17 "go.mongodb.org/mongo-driver/mongo/readpref"
18 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
19 "go.mongodb.org/mongo-driver/x/mongo/driver"
20 "go.mongodb.org/mongo-driver/x/mongo/driver/session"
21 )
22
23
24 type Command struct {
25 command bsoncore.Document
26 database string
27 deployment driver.Deployment
28 selector description.ServerSelector
29 readPreference *readpref.ReadPref
30 clock *session.ClusterClock
31 session *session.Client
32 monitor *event.CommandMonitor
33 resultResponse bsoncore.Document
34 resultCursor *driver.BatchCursor
35 crypt driver.Crypt
36 serverAPI *driver.ServerAPIOptions
37 createCursor bool
38 cursorOpts driver.CursorOptions
39 timeout *time.Duration
40 logger *logger.Logger
41 }
42
43
44
45 func NewCommand(command bsoncore.Document) *Command {
46 return &Command{
47 command: command,
48 }
49 }
50
51
52
53 func NewCursorCommand(command bsoncore.Document, cursorOpts driver.CursorOptions) *Command {
54 return &Command{
55 command: command,
56 cursorOpts: cursorOpts,
57 createCursor: true,
58 }
59 }
60
61
62 func (c *Command) Result() bsoncore.Document { return c.resultResponse }
63
64
65
66
67 func (c *Command) ResultCursor() (*driver.BatchCursor, error) {
68 if !c.createCursor {
69 return nil, errors.New("command operation was not configured to create a cursor, but a result cursor was requested")
70 }
71 return c.resultCursor, nil
72 }
73
74
75 func (c *Command) Execute(ctx context.Context) error {
76 if c.deployment == nil {
77 return errors.New("the Command operation must have a Deployment set before Execute can be called")
78 }
79
80 return driver.Operation{
81 CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
82 return append(dst, c.command[4:len(c.command)-1]...), nil
83 },
84 ProcessResponseFn: func(info driver.ResponseInfo) error {
85 c.resultResponse = info.ServerResponse
86
87 if c.createCursor {
88 cursorRes, err := driver.NewCursorResponse(info)
89 if err != nil {
90 return err
91 }
92
93 c.resultCursor, err = driver.NewBatchCursor(cursorRes, c.session, c.clock, c.cursorOpts)
94 return err
95 }
96
97 return nil
98 },
99 Client: c.session,
100 Clock: c.clock,
101 CommandMonitor: c.monitor,
102 Database: c.database,
103 Deployment: c.deployment,
104 ReadPreference: c.readPreference,
105 Selector: c.selector,
106 Crypt: c.crypt,
107 ServerAPI: c.serverAPI,
108 Timeout: c.timeout,
109 Logger: c.logger,
110 }.Execute(ctx)
111 }
112
113
114 func (c *Command) Session(session *session.Client) *Command {
115 if c == nil {
116 c = new(Command)
117 }
118
119 c.session = session
120 return c
121 }
122
123
124 func (c *Command) ClusterClock(clock *session.ClusterClock) *Command {
125 if c == nil {
126 c = new(Command)
127 }
128
129 c.clock = clock
130 return c
131 }
132
133
134 func (c *Command) CommandMonitor(monitor *event.CommandMonitor) *Command {
135 if c == nil {
136 c = new(Command)
137 }
138
139 c.monitor = monitor
140 return c
141 }
142
143
144 func (c *Command) Database(database string) *Command {
145 if c == nil {
146 c = new(Command)
147 }
148
149 c.database = database
150 return c
151 }
152
153
154 func (c *Command) Deployment(deployment driver.Deployment) *Command {
155 if c == nil {
156 c = new(Command)
157 }
158
159 c.deployment = deployment
160 return c
161 }
162
163
164 func (c *Command) ReadPreference(readPreference *readpref.ReadPref) *Command {
165 if c == nil {
166 c = new(Command)
167 }
168
169 c.readPreference = readPreference
170 return c
171 }
172
173
174 func (c *Command) ServerSelector(selector description.ServerSelector) *Command {
175 if c == nil {
176 c = new(Command)
177 }
178
179 c.selector = selector
180 return c
181 }
182
183
184 func (c *Command) Crypt(crypt driver.Crypt) *Command {
185 if c == nil {
186 c = new(Command)
187 }
188
189 c.crypt = crypt
190 return c
191 }
192
193
194 func (c *Command) ServerAPI(serverAPI *driver.ServerAPIOptions) *Command {
195 if c == nil {
196 c = new(Command)
197 }
198
199 c.serverAPI = serverAPI
200 return c
201 }
202
203
204 func (c *Command) Timeout(timeout *time.Duration) *Command {
205 if c == nil {
206 c = new(Command)
207 }
208
209 c.timeout = timeout
210 return c
211 }
212
213
214 func (c *Command) Logger(logger *logger.Logger) *Command {
215 if c == nil {
216 c = new(Command)
217 }
218
219 c.logger = logger
220 return c
221 }
222
View as plain text