1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "testing"
20
21 "github.com/googleapis/gax-go/v2"
22 "google.golang.org/grpc"
23 "google.golang.org/grpc/metadata"
24 )
25
26 func TestTableParentFromStreamName(t *testing.T) {
27 testCases := []struct {
28 in string
29 want string
30 }{
31 {
32 "bad",
33 "bad",
34 },
35 {
36 "projects/foo/datasets/bar/tables/baz",
37 "projects/foo/datasets/bar/tables/baz",
38 },
39 {
40 "projects/foo/datasets/bar/tables/baz/zip/zam/zoomie",
41 "projects/foo/datasets/bar/tables/baz",
42 },
43 {
44 "projects/foo/datasets/bar/tables/baz/_default",
45 "projects/foo/datasets/bar/tables/baz",
46 },
47 }
48
49 for _, tc := range testCases {
50 got := TableParentFromStreamName(tc.in)
51 if got != tc.want {
52 t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want)
53 }
54 }
55 }
56
57 func TestCreatePool_Location(t *testing.T) {
58 t.Skip("skipping until new write_location is allowed")
59 c := &Client{
60 cfg: &writerClientConfig{},
61 ctx: context.Background(),
62 projectID: "myproj",
63 }
64 pool, err := c.createPool("foo", nil)
65 if err != nil {
66 t.Fatalf("createPool: %v", err)
67 }
68 meta, ok := metadata.FromOutgoingContext(pool.ctx)
69 if !ok {
70 t.Fatalf("no metadata in outgoing context")
71 }
72 vals, ok := meta["x-goog-request-params"]
73 if !ok {
74 t.Fatalf("metadata key not present")
75 }
76 found := false
77 for _, v := range vals {
78 if v == "write_location=projects/myproj/locations/foo" {
79 found = true
80 break
81 }
82 }
83 if !found {
84 t.Fatal("expected location header not found")
85 }
86 }
87
88
89
90 func TestCreatePool(t *testing.T) {
91 testCases := []struct {
92 desc string
93 cfg *writerClientConfig
94 settings *streamSettings
95 wantMaxBytes int
96 wantMaxRequests int
97 wantCallOptions int
98 wantPoolCallOptions int
99 }{
100 {
101 desc: "cfg, no settings",
102 cfg: &writerClientConfig{
103 defaultInflightRequests: 12,
104 defaultInflightBytes: 2048,
105 },
106 wantMaxBytes: 2048,
107 wantMaxRequests: 12,
108 },
109 {
110 desc: "empty cfg, w/settings",
111 cfg: &writerClientConfig{},
112 settings: &streamSettings{
113 MaxInflightRequests: 99,
114 MaxInflightBytes: 1024,
115 appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
116 },
117 wantMaxBytes: 1024,
118 wantMaxRequests: 99,
119 wantCallOptions: 1,
120 },
121 {
122 desc: "both cfg and settings",
123 cfg: &writerClientConfig{
124 defaultInflightRequests: 123,
125 defaultInflightBytes: 456,
126 defaultAppendRowsCallOptions: []gax.CallOption{gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(999))},
127 },
128 settings: &streamSettings{
129 MaxInflightRequests: 99,
130 MaxInflightBytes: 1024,
131 },
132 wantMaxBytes: 1024,
133 wantMaxRequests: 99,
134 wantPoolCallOptions: 1,
135 },
136 {
137 desc: "merge defaults and settings",
138 cfg: &writerClientConfig{
139 defaultInflightRequests: 123,
140 defaultInflightBytes: 456,
141 defaultAppendRowsCallOptions: []gax.CallOption{gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(999))},
142 },
143 settings: &streamSettings{
144 MaxInflightBytes: 1024,
145 appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
146 },
147 wantMaxBytes: 1024,
148 wantMaxRequests: 123,
149 wantCallOptions: 1,
150 wantPoolCallOptions: 1,
151 },
152 }
153
154 for _, tc := range testCases {
155 c := &Client{
156 cfg: tc.cfg,
157 ctx: context.Background(),
158 }
159 pool, err := c.createPool("", nil)
160 if err != nil {
161 t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
162 continue
163 }
164 writer := &ManagedStream{
165 id: "foo",
166 streamSettings: tc.settings,
167 }
168 if err = pool.addWriter(writer); err != nil {
169 t.Errorf("case %q: addWriter: %v", tc.desc, err)
170 }
171 pw := newPendingWrite(context.Background(), writer, nil, nil, "", "")
172 gotConn, err := pool.selectConn(pw)
173 if err != nil {
174 t.Errorf("case %q: selectConn: %v", tc.desc, err)
175 }
176
177
178 if gotVal := gotConn.fc.maxInsertBytes; gotVal != tc.wantMaxBytes {
179 t.Errorf("case %q: flowController maxInsertBytes mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxBytes)
180 }
181 if gotVal := gotConn.fc.maxInsertCount; gotVal != tc.wantMaxRequests {
182 t.Errorf("case %q: flowController maxInsertCount mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxRequests)
183 }
184 if gotVal := len(gotConn.callOptions); gotVal != tc.wantCallOptions {
185 t.Errorf("case %q: calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantCallOptions)
186 }
187 if gotVal := len(pool.callOptions); gotVal != tc.wantPoolCallOptions {
188 t.Errorf("case %q: POOL calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantPoolCallOptions)
189 }
190 }
191 }
192
View as plain text