...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package pqarrow
20
21 import (
22 "context"
23
24 "github.com/apache/arrow/go/v15/arrow"
25 "github.com/apache/arrow/go/v15/arrow/array"
26 "github.com/apache/arrow/go/v15/arrow/compute"
27 "github.com/apache/arrow/go/v15/arrow/memory"
28 "github.com/apache/arrow/go/v15/parquet"
29 "github.com/apache/arrow/go/v15/parquet/file"
30 "github.com/apache/arrow/go/v15/parquet/internal/debug"
31 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
32 )
33
34 func isDictEncoding(enc parquet.Encoding) bool {
35 return enc == parquet.Encodings.PlainDict
36 }
37
38 func dictionaryDirectWriteSupported(arr arrow.Array) bool {
39 debug.Assert(arr.DataType().ID() == arrow.DICTIONARY, "should only be called with dictionary type")
40 dt := arr.DataType().(*arrow.DictionaryType)
41 return arrow.IsPrimitive(dt.ValueType.ID()) || arrow.IsBaseBinary(dt.ValueType.ID())
42 }
43
44 func convertDictionaryToDense(mem memory.Allocator, arr arrow.Array) (arrow.Array, error) {
45 dt := arr.DataType().(*arrow.DictionaryType).ValueType
46 ctx := compute.WithAllocator(context.Background(), mem)
47 return compute.CastArray(ctx, arr, compute.SafeCastOptions(dt))
48 }
49
50 func writeDictionaryArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err error) {
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 writeDense := func() error {
66 denseArr, err := convertDictionaryToDense(ctx.props.mem, leafArr)
67 if err != nil {
68 return err
69 }
70 defer denseArr.Release()
71 return writeDenseArrow(ctx, cw, denseArr, defLevels, repLevels, maybeParentNulls)
72 }
73
74 if !isDictEncoding(cw.CurrentEncoder().Encoding()) || !dictionaryDirectWriteSupported(leafArr) {
75
76
77
78
79
80
81 return writeDense()
82 }
83
84 var (
85 dictEncoder = cw.CurrentEncoder().(encoding.DictEncoder)
86 data = leafArr.(*array.Dictionary)
87 dict = data.Dictionary()
88 indices = data.Indices()
89 preserved = dictEncoder.PreservedDictionary()
90 pageStats = cw.PageStatistics()
91 )
92
93 updateStats := func() error {
94 var referencedDict arrow.Array
95
96 ctx := compute.WithAllocator(context.Background(), ctx.props.mem)
97
98 if preserved != nil && preserved == dict {
99 referencedDict = preserved
100 } else {
101 referencedIndices, err := compute.UniqueArray(ctx, indices)
102 if err != nil {
103 return err
104 }
105
106
107 if referencedIndices.Len() == dict.Len() {
108 referencedDict = dict
109 } else {
110 referencedDict, err = compute.TakeArrayOpts(ctx, dict, referencedIndices, compute.TakeOptions{BoundsCheck: false})
111 if err != nil {
112 return err
113 }
114 defer referencedDict.Release()
115 }
116 referencedIndices.Release()
117 }
118
119 nonNullCount := indices.Len() - indices.NullN()
120 pageStats.IncNulls(int64(len(defLevels) - nonNullCount))
121 pageStats.IncNumValues(int64(nonNullCount))
122 return pageStats.UpdateFromArrow(referencedDict, false)
123 }
124
125 switch {
126 case preserved == nil:
127 if err := dictEncoder.PutDictionary(dict); err != nil {
128 return err
129 }
130
131
132
133
134
135 if dictEncoder.NumEntries() != dict.Len() {
136 cw.FallbackToPlain()
137 return writeDense()
138 }
139
140 if pageStats != nil {
141 if err := updateStats(); err != nil {
142 return err
143 }
144 }
145
146 case !array.Equal(dict, preserved):
147
148 cw.FallbackToPlain()
149 return writeDense()
150 default:
151
152 if pageStats != nil {
153 if err := updateStats(); err != nil {
154 return err
155 }
156 }
157 }
158
159 return cw.WriteDictIndices(indices, defLevels, repLevels)
160 }
161
View as plain text