...
1
2
3
4
5
6
7 package unified
8
9 import (
10 "fmt"
11 "time"
12
13 "go.mongodb.org/mongo-driver/bson"
14 "go.mongodb.org/mongo-driver/mongo/readconcern"
15 "go.mongodb.org/mongo-driver/mongo/readpref"
16 "go.mongodb.org/mongo-driver/mongo/writeconcern"
17 "go.mongodb.org/mongo-driver/tag"
18 )
19
20
21
22 type readConcern struct {
23 Level string `bson:"level"`
24 }
25
26 func (rc *readConcern) toReadConcernOption() *readconcern.ReadConcern {
27 return readconcern.New(readconcern.Level(rc.Level))
28 }
29
30 type writeConcern struct {
31 Journal *bool `bson:"journal"`
32 W interface{} `bson:"w"`
33 WTimeoutMS *int32 `bson:"wtimeoutMS"`
34 }
35
36 func (wc *writeConcern) toWriteConcernOption() (*writeconcern.WriteConcern, error) {
37 var wcOptions []writeconcern.Option
38 if wc.Journal != nil {
39 wcOptions = append(wcOptions, writeconcern.J(*wc.Journal))
40 }
41 if wc.W != nil {
42 switch converted := wc.W.(type) {
43 case string:
44 if converted != "majority" {
45 return nil, fmt.Errorf("invalid write concern 'w' string value %q", converted)
46 }
47 wcOptions = append(wcOptions, writeconcern.WMajority())
48 case int32:
49 wcOptions = append(wcOptions, writeconcern.W(int(converted)))
50 default:
51 return nil, fmt.Errorf("invalid type for write concern 'w' field %T", wc.W)
52 }
53 }
54 if wc.WTimeoutMS != nil {
55 wTimeout := time.Duration(*wc.WTimeoutMS) * time.Millisecond
56 wcOptions = append(wcOptions, writeconcern.WTimeout(wTimeout))
57 }
58
59 return writeconcern.New(wcOptions...), nil
60 }
61
62
63 type ReadPreference struct {
64 Mode string `bson:"mode"`
65 TagSets []map[string]string `bson:"tagSets"`
66 MaxStalenessSeconds *int64 `bson:"maxStalenessSeconds"`
67 Hedge bson.M `bson:"hedge"`
68 }
69
70
71
72 func (rp *ReadPreference) ToReadPrefOption() (*readpref.ReadPref, error) {
73 mode, err := readpref.ModeFromString(rp.Mode)
74 if err != nil {
75 return nil, fmt.Errorf("invalid read preference mode %q", rp.Mode)
76 }
77
78 var rpOptions []readpref.Option
79 if rp.TagSets != nil {
80
81 sets := make([]tag.Set, 0, len(rp.TagSets))
82 for _, rawSet := range rp.TagSets {
83 parsed := make(tag.Set, 0, len(rawSet))
84 for k, v := range rawSet {
85 parsed = append(parsed, tag.Tag{Name: k, Value: v})
86 }
87 sets = append(sets, parsed)
88 }
89
90 rpOptions = append(rpOptions, readpref.WithTagSets(sets...))
91 }
92 if rp.MaxStalenessSeconds != nil {
93 maxStaleness := time.Duration(*rp.MaxStalenessSeconds) * time.Second
94 rpOptions = append(rpOptions, readpref.WithMaxStaleness(maxStaleness))
95 }
96 if rp.Hedge != nil {
97 if len(rp.Hedge) > 1 {
98 return nil, fmt.Errorf("invalid read preference hedge document: length cannot be greater than 1")
99 }
100 if enabled, ok := rp.Hedge["enabled"]; ok {
101 rpOptions = append(rpOptions, readpref.WithHedgeEnabled(enabled.(bool)))
102 }
103 }
104
105 return readpref.New(mode, rpOptions...)
106 }
107
View as plain text