1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package metadata_test
18
19 import (
20 "context"
21 "testing"
22 "unsafe"
23
24 "github.com/apache/arrow/go/v15/parquet"
25 "github.com/apache/arrow/go/v15/parquet/metadata"
26 "github.com/apache/arrow/go/v15/parquet/schema"
27 "github.com/stretchr/testify/assert"
28 "github.com/stretchr/testify/require"
29 )
30
31 func generateTableMetaData(schema *schema.Schema, props *parquet.WriterProperties, nrows int, statsInt, statsFloat metadata.EncodedStatistics) (*metadata.FileMetaData, error) {
32 fbuilder := metadata.NewFileMetadataBuilder(schema, props, nil)
33 rg1Builder := fbuilder.AppendRowGroup()
34
35
36 col1Builder := rg1Builder.NextColumnChunk()
37 col2Builder := rg1Builder.NextColumnChunk()
38
39 dictEncodingStats := map[parquet.Encoding]int32{parquet.Encodings.RLEDict: 1}
40 dataEncodingStats := map[parquet.Encoding]int32{parquet.Encodings.Plain: 1, parquet.Encodings.RLE: 1}
41 statsInt.Signed = true
42 col1Builder.SetStats(statsInt)
43 statsFloat.Signed = true
44 col2Builder.SetStats(statsFloat)
45
46 col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 4, 0, 10, 512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
47 col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 24, 0, 30, 512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
48
49 rg1Builder.SetNumRows(nrows / 2)
50 rg1Builder.Finish(1024, -1)
51
52
53 rg2Builder := fbuilder.AppendRowGroup()
54 col1Builder = rg2Builder.NextColumnChunk()
55 col2Builder = rg2Builder.NextColumnChunk()
56
57 col1Builder.SetStats(statsInt)
58 col2Builder.SetStats(statsFloat)
59 dictEncodingStats = make(map[parquet.Encoding]int32)
60 col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 0 , 0, 10, 512, 600}, false , false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
61 col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 16, 0, 26, 512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
62
63 rg2Builder.SetNumRows(nrows / 2)
64 rg2Builder.Finish(1024, -1)
65
66 return fbuilder.Finish()
67 }
68
69 func assertStatsSet(t *testing.T, m *metadata.ColumnChunkMetaData) {
70 ok, err := m.StatsSet()
71 assert.NoError(t, err)
72 assert.True(t, ok)
73 }
74
75 func assertStats(t *testing.T, m *metadata.ColumnChunkMetaData) metadata.TypedStatistics {
76 s, err := m.Statistics()
77 assert.NoError(t, err)
78 assert.NotNil(t, s)
79 return s
80 }
81
82 func TestBuildAccess(t *testing.T) {
83 props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST))
84
85 fields := schema.FieldList{
86 schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
87 schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
88 }
89 root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
90 require.NoError(t, err)
91 schema := schema.NewSchema(root)
92
93 var (
94 nrows int64 = 1000
95 intMin int32 = 100
96 intMax int32 = 200
97 floatMin float32 = 100.100
98 floatMax float32 = 200.200
99 statsInt metadata.EncodedStatistics
100 statsFloat metadata.EncodedStatistics
101 )
102
103 statsInt.SetNullCount(0).
104 SetDistinctCount(nrows).
105 SetMin((*(*[4]byte)(unsafe.Pointer(&intMin)))[:]).
106 SetMax((*(*[4]byte)(unsafe.Pointer(&intMax)))[:])
107
108 statsFloat.SetNullCount(0).
109 SetDistinctCount(nrows).
110 SetMin((*(*[4]byte)(unsafe.Pointer(&floatMin)))[:]).
111 SetMax((*(*[4]byte)(unsafe.Pointer(&floatMax)))[:])
112
113 faccessor, err := generateTableMetaData(schema, props, int(nrows), statsInt, statsFloat)
114 require.NoError(t, err)
115 serialized, err := faccessor.SerializeString(context.Background())
116 assert.NoError(t, err)
117 faccessorCopy, err := metadata.NewFileMetaData([]byte(serialized), nil)
118 assert.NoError(t, err)
119
120 for _, accessor := range []*metadata.FileMetaData{faccessor, faccessorCopy} {
121
122 assert.Equal(t, nrows, accessor.NumRows)
123 assert.Len(t, accessor.RowGroups, 2)
124 assert.EqualValues(t, parquet.V2_LATEST, accessor.Version())
125 assert.Equal(t, parquet.DefaultCreatedBy, accessor.GetCreatedBy())
126 assert.Equal(t, 3, accessor.NumSchemaElements())
127
128
129 rg1Access := accessor.RowGroup(0)
130 assert.Equal(t, 2, rg1Access.NumColumns())
131 assert.Equal(t, nrows/2, rg1Access.NumRows())
132 assert.Equal(t, int64(1024), rg1Access.TotalByteSize())
133 assert.Equal(t, int64(1024), rg1Access.TotalCompressedSize())
134
135 rg1Col1, err := rg1Access.ColumnChunk(0)
136 assert.NoError(t, err)
137 assert.Equal(t, rg1Access.FileOffset(), rg1Col1.DictionaryPageOffset())
138
139 rg1Col2, err := rg1Access.ColumnChunk(1)
140 assert.NoError(t, err)
141 assertStatsSet(t, rg1Col1)
142 assertStatsSet(t, rg1Col2)
143 assert.Equal(t, statsInt.Min, assertStats(t, rg1Col1).EncodeMin())
144 assert.Equal(t, statsInt.Max, assertStats(t, rg1Col1).EncodeMax())
145 assert.Equal(t, statsFloat.Min, assertStats(t, rg1Col2).EncodeMin())
146 assert.Equal(t, statsFloat.Max, assertStats(t, rg1Col2).EncodeMax())
147 assert.Zero(t, assertStats(t, rg1Col1).NullCount())
148 assert.Zero(t, assertStats(t, rg1Col2).NullCount())
149 assert.Equal(t, nrows, assertStats(t, rg1Col1).DistinctCount())
150 assert.Equal(t, nrows, assertStats(t, rg1Col2).DistinctCount())
151 assert.Equal(t, metadata.DefaultCompressionType, rg1Col1.Compression())
152 assert.Equal(t, metadata.DefaultCompressionType, rg1Col2.Compression())
153 assert.Equal(t, nrows/2, rg1Col1.NumValues())
154 assert.Equal(t, nrows/2, rg1Col2.NumValues())
155 assert.Len(t, rg1Col1.Encodings(), 3)
156 assert.Len(t, rg1Col2.Encodings(), 3)
157 assert.EqualValues(t, 512, rg1Col1.TotalCompressedSize())
158 assert.EqualValues(t, 512, rg1Col2.TotalCompressedSize())
159 assert.EqualValues(t, 600, rg1Col1.TotalUncompressedSize())
160 assert.EqualValues(t, 600, rg1Col2.TotalUncompressedSize())
161 assert.EqualValues(t, 4, rg1Col1.DictionaryPageOffset())
162 assert.EqualValues(t, 24, rg1Col2.DictionaryPageOffset())
163 assert.EqualValues(t, 10, rg1Col1.DataPageOffset())
164 assert.EqualValues(t, 30, rg1Col2.DataPageOffset())
165 assert.Len(t, rg1Col1.EncodingStats(), 3)
166 assert.Len(t, rg1Col2.EncodingStats(), 3)
167
168
169 rg2Access := accessor.RowGroup(1)
170 assert.Equal(t, 2, rg2Access.NumColumns())
171 assert.Equal(t, nrows/2, rg2Access.NumRows())
172 assert.EqualValues(t, 1024, rg2Access.TotalByteSize())
173 assert.EqualValues(t, 1024, rg2Access.TotalCompressedSize())
174
175 rg2Col1, err := rg2Access.ColumnChunk(0)
176 assert.NoError(t, err)
177 assert.Equal(t, rg2Access.FileOffset(), rg2Col1.DataPageOffset())
178
179 rg2Col2, err := rg2Access.ColumnChunk(1)
180 assert.NoError(t, err)
181 assertStatsSet(t, rg1Col1)
182 assertStatsSet(t, rg1Col2)
183 assert.Equal(t, statsInt.Min, assertStats(t, rg1Col1).EncodeMin())
184 assert.Equal(t, statsInt.Max, assertStats(t, rg1Col1).EncodeMax())
185 assert.Equal(t, statsFloat.Min, assertStats(t, rg1Col2).EncodeMin())
186 assert.Equal(t, statsFloat.Max, assertStats(t, rg1Col2).EncodeMax())
187 assert.Zero(t, assertStats(t, rg1Col1).NullCount())
188 assert.Zero(t, assertStats(t, rg1Col2).NullCount())
189 assert.Equal(t, nrows, assertStats(t, rg1Col1).DistinctCount())
190 assert.Equal(t, nrows, assertStats(t, rg1Col2).DistinctCount())
191 assert.Equal(t, metadata.DefaultCompressionType, rg2Col1.Compression())
192 assert.Equal(t, metadata.DefaultCompressionType, rg2Col2.Compression())
193 assert.Equal(t, nrows/2, rg2Col1.NumValues())
194 assert.Equal(t, nrows/2, rg2Col2.NumValues())
195 assert.Len(t, rg2Col1.Encodings(), 2)
196 assert.Len(t, rg2Col2.Encodings(), 3)
197 assert.EqualValues(t, 512, rg2Col1.TotalCompressedSize())
198 assert.EqualValues(t, 512, rg2Col2.TotalCompressedSize())
199 assert.EqualValues(t, 600, rg2Col1.TotalUncompressedSize())
200 assert.EqualValues(t, 600, rg2Col2.TotalUncompressedSize())
201 assert.EqualValues(t, 0, rg2Col1.DictionaryPageOffset())
202 assert.EqualValues(t, 16, rg2Col2.DictionaryPageOffset())
203 assert.EqualValues(t, 10, rg2Col1.DataPageOffset())
204 assert.EqualValues(t, 26, rg2Col2.DataPageOffset())
205 assert.Len(t, rg2Col1.EncodingStats(), 2)
206 assert.Len(t, rg2Col2.EncodingStats(), 2)
207
208 assert.Empty(t, rg2Col1.FilePath())
209 accessor.SetFilePath("/foo/bar/bar.parquet")
210 assert.Equal(t, "/foo/bar/bar.parquet", rg2Col1.FilePath())
211 }
212
213 faccessor2, err := generateTableMetaData(schema, props, int(nrows), statsInt, statsFloat)
214 require.NoError(t, err)
215 faccessor.AppendRowGroups(faccessor2)
216 assert.Len(t, faccessor.RowGroups, 4)
217 assert.Equal(t, nrows*2, faccessor.NumRows)
218 assert.EqualValues(t, parquet.V2_LATEST, faccessor.Version())
219 assert.Equal(t, parquet.DefaultCreatedBy, faccessor.GetCreatedBy())
220 assert.Equal(t, 3, faccessor.NumSchemaElements())
221
222 faccessor1, err := faccessor.Subset([]int{2, 3})
223 require.NoError(t, err)
224 assert.True(t, faccessor1.Equals(faccessor2))
225
226 faccessor1, err = faccessor2.Subset([]int{0})
227 require.NoError(t, err)
228
229 next, err := faccessor.Subset([]int{0})
230 require.NoError(t, err)
231 faccessor1.AppendRowGroups(next)
232
233 sub, err := faccessor.Subset([]int{2, 0})
234 require.NoError(t, err)
235 assert.True(t, faccessor1.Equals(sub))
236 }
237
238 func TestV1VersionMetadata(t *testing.T) {
239 props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
240
241 fields := schema.FieldList{
242 schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
243 schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
244 }
245 root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
246 require.NoError(t, err)
247 schema := schema.NewSchema(root)
248
249 fbuilder := metadata.NewFileMetadataBuilder(schema, props, nil)
250 faccessor, err := fbuilder.Finish()
251 require.NoError(t, err)
252 assert.EqualValues(t, parquet.V1_0, faccessor.Version())
253 }
254
255 func TestKeyValueMetadata(t *testing.T) {
256 props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
257
258 fields := schema.FieldList{
259 schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
260 schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
261 }
262 root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
263 require.NoError(t, err)
264 schema := schema.NewSchema(root)
265 kvmeta := metadata.NewKeyValueMetadata()
266 kvmeta.Append("test_key", "test_value")
267
268 fbuilder := metadata.NewFileMetadataBuilder(schema, props, kvmeta)
269 faccessor, err := fbuilder.Finish()
270 require.NoError(t, err)
271
272 assert.True(t, faccessor.KeyValueMetadata().Equals(kvmeta))
273 }
274
275 func TestKeyValueMetadataAppend(t *testing.T) {
276 props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
277
278 fields := schema.FieldList{
279 schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
280 schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
281 }
282 root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
283 require.NoError(t, err)
284 schema := schema.NewSchema(root)
285
286 kvmeta := metadata.NewKeyValueMetadata()
287 key1 := "test_key1"
288 value1 := "test_value1"
289 require.NoError(t, kvmeta.Append(key1, value1))
290
291 fbuilder := metadata.NewFileMetadataBuilder(schema, props, kvmeta)
292
293 key2 := "test_key2"
294 value2 := "test_value2"
295 require.NoError(t, fbuilder.AppendKeyValueMetadata(key2, value2))
296 faccessor, err := fbuilder.Finish()
297 require.NoError(t, err)
298
299 kv := faccessor.KeyValueMetadata()
300
301 got1 := kv.FindValue(key1)
302 require.NotNil(t, got1)
303 assert.Equal(t, value1, *got1)
304
305 got2 := kv.FindValue(key2)
306 require.NotNil(t, got2)
307 assert.Equal(t, value2, *got2)
308 }
309
310 func TestApplicationVersion(t *testing.T) {
311 version := metadata.NewAppVersion("parquet-mr version 1.7.9")
312 version1 := metadata.NewAppVersion("parquet-mr version 1.8.0")
313 version2 := metadata.NewAppVersion("parquet-cpp version 1.0.0")
314 version3 := metadata.NewAppVersion("")
315 version4 := metadata.NewAppVersion("parquet-mr version 1.5.0ab-cdh5.5.0+cd (build abcd)")
316 version5 := metadata.NewAppVersion("parquet-mr")
317
318 assert.Equal(t, "parquet-mr", version.App)
319 assert.Equal(t, 1, version.Version.Major)
320 assert.Equal(t, 7, version.Version.Minor)
321 assert.Equal(t, 9, version.Version.Patch)
322
323 assert.Equal(t, "parquet-cpp", version2.App)
324 assert.Equal(t, 1, version2.Version.Major)
325 assert.Equal(t, 0, version2.Version.Minor)
326 assert.Equal(t, 0, version2.Version.Patch)
327
328 assert.Equal(t, "parquet-mr", version4.App)
329 assert.Equal(t, "abcd", version4.Build)
330 assert.Equal(t, 1, version4.Version.Major)
331 assert.Equal(t, 5, version4.Version.Minor)
332 assert.Equal(t, 0, version4.Version.Patch)
333 assert.Equal(t, "ab", version4.Version.Unknown)
334 assert.Equal(t, "cdh5.5.0", version4.Version.PreRelease)
335 assert.Equal(t, "cd", version4.Version.BuildInfo)
336
337 assert.Equal(t, "parquet-mr", version5.App)
338 assert.Equal(t, 0, version5.Version.Major)
339 assert.Equal(t, 0, version5.Version.Minor)
340 assert.Equal(t, 0, version5.Version.Patch)
341
342 assert.True(t, version.LessThan(version1))
343
344 var stats metadata.EncodedStatistics
345 assert.False(t, version1.HasCorrectStatistics(parquet.Types.Int96, schema.NoLogicalType{}, stats, schema.SortUNKNOWN))
346 assert.True(t, version.HasCorrectStatistics(parquet.Types.Int32, schema.NoLogicalType{}, stats, schema.SortSIGNED))
347 assert.False(t, version.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, stats, schema.SortSIGNED))
348 assert.True(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, stats, schema.SortSIGNED))
349 assert.False(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, stats, schema.SortUNSIGNED))
350 assert.True(t, version3.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NoLogicalType{}, stats, schema.SortSIGNED))
351
352
353 var statsStr metadata.EncodedStatistics
354 statsStr.SetMin([]byte("a")).SetMax([]byte("b"))
355 assert.False(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsStr, schema.SortUNSIGNED))
356 statsStr.SetMax([]byte("a"))
357 assert.True(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsStr, schema.SortUNSIGNED))
358
359
360 var (
361 intMin int32 = 100
362 intMax int32 = 200
363 )
364 var statsInt metadata.EncodedStatistics
365 statsInt.SetMin((*(*[4]byte)(unsafe.Pointer(&intMin)))[:])
366 statsInt.SetMax((*(*[4]byte)(unsafe.Pointer(&intMax)))[:])
367 assert.False(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsInt, schema.SortUNSIGNED))
368 statsInt.SetMax((*(*[4]byte)(unsafe.Pointer(&intMin)))[:])
369 assert.True(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsInt, schema.SortUNSIGNED))
370 }
371
372 func TestCheckBadDecimalStats(t *testing.T) {
373 version1 := metadata.NewAppVersion("parquet-cpp version 3.0.0")
374 version2 := metadata.NewAppVersion("parquet-cpp-arrow version 3.0.0")
375 version3 := metadata.NewAppVersion("parquet-cpp-arrow version 4.0.0")
376
377 var stats metadata.EncodedStatistics
378 assert.False(t, version1.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NewDecimalLogicalType(5, 0), stats, schema.SortSIGNED))
379 assert.False(t, version2.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NewDecimalLogicalType(5, 0), stats, schema.SortSIGNED))
380 assert.True(t, version3.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NewDecimalLogicalType(5, 0), stats, schema.SortSIGNED))
381 }
382
View as plain text