1 | /** |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | */ |
18 | |
19 | #ifndef ORC_COLUMN_WRITER_HH |
20 | #define ORC_COLUMN_WRITER_HH |
21 | |
22 | #include "orc/Vector.hh" |
23 | |
24 | #include "ByteRLE.hh" |
25 | #include "Compression.hh" |
26 | #include "orc/Exceptions.hh" |
27 | #include "Statistics.hh" |
28 | |
29 | #include "wrap/orc-proto-wrapper.hh" |
30 | |
31 | namespace orc { |
32 | |
33 | class StreamsFactory { |
34 | public: |
35 | virtual ~StreamsFactory(); |
36 | |
37 | /** |
38 | * Get the stream for the given column/kind in this stripe. |
39 | * @param kind the kind of the stream |
40 | * @return the buffer output stream |
41 | */ |
42 | virtual std::unique_ptr<BufferedOutputStream> |
43 | createStream(proto::Stream_Kind kind) const = 0; |
44 | }; |
45 | |
46 | std::unique_ptr<StreamsFactory> createStreamsFactory( |
47 | const WriterOptions& options, |
48 | OutputStream * outStream); |
49 | |
50 | /** |
51 | * record stream positions for row index |
52 | */ |
53 | class RowIndexPositionRecorder : public PositionRecorder { |
54 | public: |
55 | virtual ~RowIndexPositionRecorder() override; |
56 | |
57 | RowIndexPositionRecorder(proto::RowIndexEntry& entry): |
58 | rowIndexEntry(entry) {} |
59 | |
60 | virtual void add(uint64_t pos) override { |
61 | rowIndexEntry.add_positions(pos); |
62 | } |
63 | |
64 | private: |
65 | proto::RowIndexEntry& rowIndexEntry; |
66 | }; |
67 | |
68 | /** |
69 | * The interface for writing ORC data types. |
70 | */ |
71 | class ColumnWriter { |
72 | protected: |
73 | std::unique_ptr<ByteRleEncoder> notNullEncoder; |
74 | uint64_t columnId; |
75 | std::unique_ptr<MutableColumnStatistics> colIndexStatistics; |
76 | std::unique_ptr<MutableColumnStatistics> colStripeStatistics; |
77 | std::unique_ptr<MutableColumnStatistics> colFileStatistics; |
78 | |
79 | bool enableIndex; |
80 | // row index for this column, contains all RowIndexEntries in 1 stripe |
81 | std::unique_ptr<proto::RowIndex> rowIndex; |
82 | std::unique_ptr<proto::RowIndexEntry> rowIndexEntry; |
83 | std::unique_ptr<RowIndexPositionRecorder> rowIndexPosition; |
84 | |
85 | public: |
86 | ColumnWriter(const Type& type, const StreamsFactory& factory, |
87 | const WriterOptions& options); |
88 | |
89 | virtual ~ColumnWriter(); |
90 | |
91 | /** |
92 | * Write the next group of values from this rowBatch. |
93 | * @param rowBatch the row batch data to write |
94 | * @param offset the starting point of row batch to write |
95 | * @param numValues the number of values to write |
96 | */ |
97 | virtual void add(ColumnVectorBatch& rowBatch, |
98 | uint64_t offset, |
99 | uint64_t numValues); |
100 | /** |
101 | * Flush column writer output steams |
102 | * @param streams vector to store generated stream by flush() |
103 | */ |
104 | virtual void flush(std::vector<proto::Stream>& streams); |
105 | |
106 | /** |
107 | * Get estimated sized of buffer used |
108 | */ |
109 | virtual uint64_t getEstimatedSize() const; |
110 | |
111 | /** |
112 | * Get the encoding used by the writer for this column. |
113 | * ColumnEncoding info is pushed into the vector |
114 | */ |
115 | virtual void getColumnEncoding( |
116 | std::vector<proto::ColumnEncoding>& encodings) const = 0; |
117 | |
118 | /** |
119 | * Get the stripe statistics for this column |
120 | */ |
121 | virtual void getStripeStatistics( |
122 | std::vector<proto::ColumnStatistics>& stats) const; |
123 | |
124 | /** |
125 | * Get the file statistics for this column |
126 | */ |
127 | virtual void getFileStatistics( |
128 | std::vector<proto::ColumnStatistics>& stats) const; |
129 | |
130 | /** |
131 | * Merge index stats into stripe stats and reset index stats |
132 | */ |
133 | virtual void mergeRowGroupStatsIntoStripeStats(); |
134 | |
135 | /** |
136 | * Merge stripe stats into file stats and reset stripe stats |
137 | */ |
138 | virtual void mergeStripeStatsIntoFileStats(); |
139 | |
140 | /** |
141 | * Create a row index entry with the previous location and the current |
142 | * index statistics. Also merges the index statistics into the stripe |
143 | * statistics before they are cleared. Finally, it records the start of the |
144 | * next index and ensures all of the children columns also create an entry. |
145 | */ |
146 | virtual void createRowIndexEntry(); |
147 | |
148 | /** |
149 | * Write row index streams for this column |
150 | * @param streams output list of ROW_INDEX streams |
151 | */ |
152 | virtual void writeIndex(std::vector<proto::Stream> &streams) const; |
153 | |
154 | /** |
155 | * Record positions for index |
156 | * |
157 | * This function is called by createRowIndexEntry() and ColumnWrtier's |
158 | * constructor. So base classes do not need to call inherited classes' |
159 | * recordPosition() function. |
160 | */ |
161 | virtual void recordPosition() const; |
162 | |
163 | /** |
164 | * Reset positions for index |
165 | */ |
166 | virtual void reset(); |
167 | |
168 | protected: |
169 | /** |
170 | * Utility function to translate ColumnStatistics into protobuf form and |
171 | * add it to output list |
172 | * @param statsList output list for protobuf stats |
173 | * @param stats ColumnStatistics to be transformed and added |
174 | */ |
175 | void getProtoBufStatistics( |
176 | std::vector<proto::ColumnStatistics>& statsList, |
177 | const MutableColumnStatistics* stats) const { |
178 | proto::ColumnStatistics pbStats; |
179 | stats->toProtoBuf(pbStats); |
180 | statsList.push_back(pbStats); |
181 | } |
182 | |
183 | protected: |
184 | MemoryPool& memPool; |
185 | std::unique_ptr<BufferedOutputStream> indexStream; |
186 | }; |
187 | |
188 | /** |
189 | * Create a writer for the given type. |
190 | */ |
191 | std::unique_ptr<ColumnWriter> buildWriter( |
192 | const Type& type, |
193 | const StreamsFactory& factory, |
194 | const WriterOptions& options); |
195 | } |
196 | |
197 | #endif |
198 | |