1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package parquet
18
19 import (
20 "github.com/apache/arrow/go/v15/arrow/memory"
21 "github.com/apache/arrow/go/v15/parquet/compress"
22 )
23
24
25 const (
26
27 DefaultBufSize int64 = 4096 * 4
28
29
30 DefaultDataPageSize int64 = 1024 * 1024
31
32
33 DefaultDictionaryEnabled = true
34
35
36
37 DefaultDictionaryPageSizeLimit = DefaultDataPageSize
38
39
40
41 DefaultWriteBatchSize int64 = 1024
42
43 DefaultMaxRowGroupLen int64 = 64 * 1024 * 1024
44
45
46 DefaultStatsEnabled = true
47
48 DefaultMaxStatsSize int64 = 4096
49 DefaultCreatedBy = "parquet-go version 15.0.2"
50 DefaultRootName = "schema"
51 )
52
53
54 type ColumnProperties struct {
55 Encoding Encoding
56 Codec compress.Compression
57 DictionaryEnabled bool
58 StatsEnabled bool
59 MaxStatsSize int64
60 CompressionLevel int
61 }
62
63
64
65
66
67
68
69
70
71
72
73 func DefaultColumnProperties() ColumnProperties {
74 return ColumnProperties{
75 Encoding: Encodings.Plain,
76 Codec: compress.Codecs.Uncompressed,
77 DictionaryEnabled: DefaultDictionaryEnabled,
78 StatsEnabled: DefaultStatsEnabled,
79 MaxStatsSize: DefaultMaxStatsSize,
80 CompressionLevel: compress.DefaultCompressionLevel,
81 }
82 }
83
84 type writerPropConfig struct {
85 wr *WriterProperties
86 encodings map[string]Encoding
87 codecs map[string]compress.Compression
88 compressLevel map[string]int
89 dictEnabled map[string]bool
90 statsEnabled map[string]bool
91 }
92
93
94 type WriterProperty func(*writerPropConfig)
95
96
97 func WithAllocator(mem memory.Allocator) WriterProperty {
98 return func(cfg *writerPropConfig) {
99 cfg.wr.mem = mem
100 }
101 }
102
103
104 func WithDictionaryDefault(dict bool) WriterProperty {
105 return func(cfg *writerPropConfig) {
106 cfg.wr.defColumnProps.DictionaryEnabled = dict
107 }
108 }
109
110
111 func WithDictionaryFor(path string, dict bool) WriterProperty {
112 return func(cfg *writerPropConfig) {
113 cfg.dictEnabled[path] = dict
114 }
115 }
116
117
118 func WithDictionaryPath(path ColumnPath, dict bool) WriterProperty {
119 return WithDictionaryFor(path.String(), dict)
120 }
121
122
123
124 func WithDictionaryPageSizeLimit(limit int64) WriterProperty {
125 return func(cfg *writerPropConfig) {
126 cfg.wr.dictPagesize = limit
127 }
128 }
129
130
131 func WithBatchSize(batch int64) WriterProperty {
132 return func(cfg *writerPropConfig) {
133 cfg.wr.batchSize = batch
134 }
135 }
136
137
138 func WithMaxRowGroupLength(nrows int64) WriterProperty {
139 return func(cfg *writerPropConfig) {
140 cfg.wr.maxRowGroupLen = nrows
141 }
142 }
143
144
145 func WithDataPageSize(pgsize int64) WriterProperty {
146 return func(cfg *writerPropConfig) {
147 cfg.wr.pageSize = pgsize
148 }
149 }
150
151
152 func WithDataPageVersion(version DataPageVersion) WriterProperty {
153 return func(cfg *writerPropConfig) {
154 cfg.wr.dataPageVersion = version
155 }
156 }
157
158
159 func WithVersion(version Version) WriterProperty {
160 return func(cfg *writerPropConfig) {
161 cfg.wr.parquetVersion = version
162 }
163 }
164
165
166 func WithCreatedBy(createdby string) WriterProperty {
167 return func(cfg *writerPropConfig) {
168 cfg.wr.createdBy = createdby
169 }
170 }
171
172
173
174 func WithRootName(name string) WriterProperty {
175 return func(cfg *writerPropConfig) {
176 cfg.wr.rootName = name
177 }
178 }
179
180
181
182 func WithRootRepetition(repetition Repetition) WriterProperty {
183 return func(cfg *writerPropConfig) {
184 cfg.wr.rootRepetition = repetition
185 }
186 }
187
188
189
190
191
192 func WithEncoding(encoding Encoding) WriterProperty {
193 return func(cfg *writerPropConfig) {
194 if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict {
195 panic("parquet: can't use dictionary encoding as fallback encoding")
196 }
197 cfg.wr.defColumnProps.Encoding = encoding
198 }
199 }
200
201
202
203 func WithEncodingFor(path string, encoding Encoding) WriterProperty {
204 return func(cfg *writerPropConfig) {
205 if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict {
206 panic("parquet: can't use dictionary encoding as fallback encoding")
207 }
208 cfg.encodings[path] = encoding
209 }
210 }
211
212
213 func WithEncodingPath(path ColumnPath, encoding Encoding) WriterProperty {
214 return WithEncodingFor(path.String(), encoding)
215 }
216
217
218 func WithCompression(codec compress.Compression) WriterProperty {
219 return func(cfg *writerPropConfig) {
220 cfg.wr.defColumnProps.Codec = codec
221 }
222 }
223
224
225 func WithCompressionFor(path string, codec compress.Compression) WriterProperty {
226 return func(cfg *writerPropConfig) {
227 cfg.codecs[path] = codec
228 }
229 }
230
231
232 func WithCompressionPath(path ColumnPath, codec compress.Compression) WriterProperty {
233 return WithCompressionFor(path.String(), codec)
234 }
235
236
237 func WithMaxStatsSize(maxStatsSize int64) WriterProperty {
238 return func(cfg *writerPropConfig) {
239 cfg.wr.defColumnProps.MaxStatsSize = maxStatsSize
240 }
241 }
242
243
244
245
246
247
248
249
250 func WithCompressionLevel(level int) WriterProperty {
251 return func(cfg *writerPropConfig) {
252 cfg.wr.defColumnProps.CompressionLevel = level
253 }
254 }
255
256
257 func WithCompressionLevelFor(path string, level int) WriterProperty {
258 return func(cfg *writerPropConfig) {
259 cfg.compressLevel[path] = level
260 }
261 }
262
263
264 func WithCompressionLevelPath(path ColumnPath, level int) WriterProperty {
265 return WithCompressionLevelFor(path.String(), level)
266 }
267
268
269 func WithStats(enabled bool) WriterProperty {
270 return func(cfg *writerPropConfig) {
271 cfg.wr.defColumnProps.StatsEnabled = enabled
272 }
273 }
274
275
276 func WithStatsFor(path string, enabled bool) WriterProperty {
277 return func(cfg *writerPropConfig) {
278 cfg.statsEnabled[path] = enabled
279 }
280 }
281
282
283 func WithStatsPath(path ColumnPath, enabled bool) WriterProperty {
284 return WithStatsFor(path.String(), enabled)
285 }
286
287
288 func WithEncryptionProperties(props *FileEncryptionProperties) WriterProperty {
289 return func(cfg *writerPropConfig) {
290 cfg.wr.encryptionProps = props
291 }
292 }
293
294
295
296 func WithStoreDecimalAsInteger(enabled bool) WriterProperty {
297 return func(cfg *writerPropConfig) {
298 cfg.wr.storeDecimalAsInt = enabled
299 }
300 }
301
302
303
304 type WriterProperties struct {
305 mem memory.Allocator
306 dictPagesize int64
307 batchSize int64
308 maxRowGroupLen int64
309 pageSize int64
310 parquetVersion Version
311 createdBy string
312 dataPageVersion DataPageVersion
313 rootName string
314 rootRepetition Repetition
315 storeDecimalAsInt bool
316
317 defColumnProps ColumnProperties
318 columnProps map[string]*ColumnProperties
319 encryptionProps *FileEncryptionProperties
320 }
321
322 func defaultWriterProperties() *WriterProperties {
323 return &WriterProperties{
324 mem: memory.DefaultAllocator,
325 dictPagesize: DefaultDictionaryPageSizeLimit,
326 batchSize: DefaultWriteBatchSize,
327 maxRowGroupLen: DefaultMaxRowGroupLen,
328 pageSize: DefaultDataPageSize,
329 parquetVersion: V2_LATEST,
330 dataPageVersion: DataPageV1,
331 createdBy: DefaultCreatedBy,
332 rootName: DefaultRootName,
333 rootRepetition: Repetitions.Repeated,
334 defColumnProps: DefaultColumnProperties(),
335 }
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352 func NewWriterProperties(opts ...WriterProperty) *WriterProperties {
353 cfg := writerPropConfig{
354 wr: defaultWriterProperties(),
355 encodings: make(map[string]Encoding),
356 codecs: make(map[string]compress.Compression),
357 compressLevel: make(map[string]int),
358 dictEnabled: make(map[string]bool),
359 statsEnabled: make(map[string]bool),
360 }
361 for _, o := range opts {
362 o(&cfg)
363 }
364
365 cfg.wr.columnProps = make(map[string]*ColumnProperties)
366 get := func(key string) *ColumnProperties {
367 if p, ok := cfg.wr.columnProps[key]; ok {
368 return p
369 }
370 cfg.wr.columnProps[key] = new(ColumnProperties)
371 *cfg.wr.columnProps[key] = cfg.wr.defColumnProps
372 return cfg.wr.columnProps[key]
373 }
374
375 for key, value := range cfg.encodings {
376 get(key).Encoding = value
377 }
378
379 for key, value := range cfg.codecs {
380 get(key).Codec = value
381 }
382
383 for key, value := range cfg.compressLevel {
384 get(key).CompressionLevel = value
385 }
386
387 for key, value := range cfg.dictEnabled {
388 get(key).DictionaryEnabled = value
389 }
390
391 for key, value := range cfg.statsEnabled {
392 get(key).StatsEnabled = value
393 }
394 return cfg.wr
395 }
396
397
398
399 func (w *WriterProperties) FileEncryptionProperties() *FileEncryptionProperties {
400 return w.encryptionProps
401 }
402
403 func (w *WriterProperties) Allocator() memory.Allocator { return w.mem }
404 func (w *WriterProperties) CreatedBy() string { return w.createdBy }
405 func (w *WriterProperties) RootName() string { return w.rootName }
406 func (w *WriterProperties) RootRepetition() Repetition { return w.rootRepetition }
407 func (w *WriterProperties) WriteBatchSize() int64 { return w.batchSize }
408 func (w *WriterProperties) DataPageSize() int64 { return w.pageSize }
409 func (w *WriterProperties) DictionaryPageSizeLimit() int64 { return w.dictPagesize }
410 func (w *WriterProperties) Version() Version { return w.parquetVersion }
411 func (w *WriterProperties) DataPageVersion() DataPageVersion { return w.dataPageVersion }
412 func (w *WriterProperties) MaxRowGroupLength() int64 { return w.maxRowGroupLen }
413
414
415
416 func (w *WriterProperties) Compression() compress.Compression { return w.defColumnProps.Codec }
417
418
419
420 func (w *WriterProperties) CompressionFor(path string) compress.Compression {
421 if p, ok := w.columnProps[path]; ok {
422 return p.Codec
423 }
424 return w.defColumnProps.Codec
425 }
426
427
428 func (w *WriterProperties) CompressionPath(path ColumnPath) compress.Compression {
429 return w.CompressionFor(path.String())
430 }
431
432
433
434 func (w *WriterProperties) CompressionLevel() int { return w.defColumnProps.CompressionLevel }
435
436
437
438 func (w *WriterProperties) CompressionLevelFor(path string) int {
439 if p, ok := w.columnProps[path]; ok {
440 return p.CompressionLevel
441 }
442 return w.defColumnProps.CompressionLevel
443 }
444
445
446 func (w *WriterProperties) CompressionLevelPath(path ColumnPath) int {
447 return w.CompressionLevelFor(path.String())
448 }
449
450
451
452 func (w *WriterProperties) Encoding() Encoding { return w.defColumnProps.Encoding }
453
454
455
456 func (w *WriterProperties) EncodingFor(path string) Encoding {
457 if p, ok := w.columnProps[path]; ok {
458 return p.Encoding
459 }
460 return w.defColumnProps.Encoding
461 }
462
463
464 func (w *WriterProperties) EncodingPath(path ColumnPath) Encoding {
465 return w.EncodingFor(path.String())
466 }
467
468
469
470 func (w *WriterProperties) DictionaryIndexEncoding() Encoding {
471 if w.parquetVersion == V1_0 {
472 return Encodings.PlainDict
473 }
474 return Encodings.RLEDict
475 }
476
477
478
479 func (w *WriterProperties) DictionaryPageEncoding() Encoding {
480 if w.parquetVersion == V1_0 {
481 return Encodings.PlainDict
482 }
483 return Encodings.Plain
484 }
485
486
487
488 func (w *WriterProperties) DictionaryEnabled() bool { return w.defColumnProps.DictionaryEnabled }
489
490
491
492 func (w *WriterProperties) DictionaryEnabledFor(path string) bool {
493 if p, ok := w.columnProps[path]; ok {
494 return p.DictionaryEnabled
495 }
496 return w.defColumnProps.DictionaryEnabled
497 }
498
499
500 func (w *WriterProperties) DictionaryEnabledPath(path ColumnPath) bool {
501 return w.DictionaryEnabledFor(path.String())
502 }
503
504
505
506 func (w *WriterProperties) StatisticsEnabled() bool { return w.defColumnProps.StatsEnabled }
507
508
509
510 func (w *WriterProperties) StatisticsEnabledFor(path string) bool {
511 if p, ok := w.columnProps[path]; ok {
512 return p.StatsEnabled
513 }
514 return w.defColumnProps.StatsEnabled
515 }
516
517
518 func (w *WriterProperties) StatisticsEnabledPath(path ColumnPath) bool {
519 return w.StatisticsEnabledFor(path.String())
520 }
521
522
523 func (w *WriterProperties) MaxStatsSize() int64 { return w.defColumnProps.MaxStatsSize }
524
525
526 func (w *WriterProperties) MaxStatsSizeFor(path string) int64 {
527 if p, ok := w.columnProps[path]; ok {
528 return p.MaxStatsSize
529 }
530 return w.defColumnProps.MaxStatsSize
531 }
532
533
534 func (w *WriterProperties) MaxStatsSizePath(path ColumnPath) int64 {
535 return w.MaxStatsSizeFor(path.String())
536 }
537
538
539 func (w *WriterProperties) ColumnEncryptionProperties(path string) *ColumnEncryptionProperties {
540 if w.encryptionProps != nil {
541 return w.encryptionProps.ColumnEncryptionProperties(path)
542 }
543 return nil
544 }
545
546
547
548
549
550 func (w *WriterProperties) StoreDecimalAsInteger() bool {
551 return w.storeDecimalAsInt
552 }
553
View as plain text