1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #pragma once |
19 | |
20 | #include <memory> |
21 | #include <vector> |
22 | |
23 | #include "parquet/column_page.h" |
24 | #include "parquet/encoding.h" |
25 | #include "parquet/metadata.h" |
26 | #include "parquet/properties.h" |
27 | #include "parquet/schema.h" |
28 | #include "parquet/statistics.h" |
29 | #include "parquet/types.h" |
30 | #include "parquet/util/macros.h" |
31 | #include "parquet/util/memory.h" |
32 | #include "parquet/util/visibility.h" |
33 | |
34 | namespace arrow { |
35 | |
36 | namespace BitUtil { |
37 | class BitWriter; |
38 | } // namespace BitUtil |
39 | |
40 | namespace util { |
41 | class RleEncoder; |
42 | } // namespace util |
43 | |
44 | } // namespace arrow |
45 | |
46 | namespace parquet { |
47 | |
48 | class PARQUET_EXPORT LevelEncoder { |
49 | public: |
50 | LevelEncoder(); |
51 | ~LevelEncoder(); |
52 | |
53 | static int MaxBufferSize(Encoding::type encoding, int16_t max_level, |
54 | int num_buffered_values); |
55 | |
56 | // Initialize the LevelEncoder. |
57 | void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values, |
58 | uint8_t* data, int data_size); |
59 | |
60 | // Encodes a batch of levels from an array and returns the number of levels encoded |
61 | int Encode(int batch_size, const int16_t* levels); |
62 | |
63 | int32_t len() { |
64 | if (encoding_ != Encoding::RLE) { |
65 | throw ParquetException("Only implemented for RLE encoding" ); |
66 | } |
67 | return rle_length_; |
68 | } |
69 | |
70 | private: |
71 | int bit_width_; |
72 | int rle_length_; |
73 | Encoding::type encoding_; |
74 | std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_; |
75 | std::unique_ptr<::arrow::BitUtil::BitWriter> bit_packed_encoder_; |
76 | }; |
77 | |
78 | class PARQUET_EXPORT PageWriter { |
79 | public: |
80 | virtual ~PageWriter() {} |
81 | |
82 | static std::unique_ptr<PageWriter> Open( |
83 | OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata, |
84 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), |
85 | bool buffered_row_group = false); |
86 | |
87 | // The Column Writer decides if dictionary encoding is used if set and |
88 | // if the dictionary encoding has fallen back to default encoding on reaching dictionary |
89 | // page limit |
90 | virtual void Close(bool has_dictionary, bool fallback) = 0; |
91 | |
92 | virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0; |
93 | |
94 | virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; |
95 | |
96 | virtual bool has_compressor() = 0; |
97 | |
98 | virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0; |
99 | }; |
100 | |
101 | static constexpr int WRITE_BATCH_SIZE = 1000; |
102 | class PARQUET_EXPORT ColumnWriter { |
103 | public: |
104 | ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>, |
105 | bool has_dictionary, Encoding::type encoding, |
106 | const WriterProperties* properties); |
107 | |
108 | virtual ~ColumnWriter() = default; |
109 | |
110 | static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*, |
111 | std::unique_ptr<PageWriter>, |
112 | const WriterProperties* properties); |
113 | |
114 | Type::type type() const { return descr_->physical_type(); } |
115 | |
116 | const ColumnDescriptor* descr() const { return descr_; } |
117 | |
118 | /** |
119 | * Closes the ColumnWriter, commits any buffered values to pages. |
120 | * |
121 | * @return Total size of the column in bytes |
122 | */ |
123 | int64_t Close(); |
124 | |
125 | int64_t rows_written() const { return rows_written_; } |
126 | |
127 | // Only considers the size of the compressed pages + page header |
128 | // Some values might be still buffered an not written to a page yet |
129 | int64_t total_compressed_bytes() const { return total_compressed_bytes_; } |
130 | |
131 | int64_t total_bytes_written() const { return total_bytes_written_; } |
132 | |
133 | const WriterProperties* properties() { return properties_; } |
134 | |
135 | protected: |
136 | virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0; |
137 | |
138 | // Serializes Dictionary Page if enabled |
139 | virtual void WriteDictionaryPage() = 0; |
140 | |
141 | // Checks if the Dictionary Page size limit is reached |
142 | // If the limit is reached, the Dictionary and Data Pages are serialized |
143 | // The encoding is switched to PLAIN |
144 | |
145 | virtual void CheckDictionarySizeLimit() = 0; |
146 | |
147 | // Plain-encoded statistics of the current page |
148 | virtual EncodedStatistics GetPageStatistics() = 0; |
149 | |
150 | // Plain-encoded statistics of the whole chunk |
151 | virtual EncodedStatistics GetChunkStatistics() = 0; |
152 | |
153 | // Merges page statistics into chunk statistics, then resets the values |
154 | virtual void ResetPageStatistics() = 0; |
155 | |
156 | // Adds Data Pages to an in memory buffer in dictionary encoding mode |
157 | // Serializes the Data Pages in other encoding modes |
158 | void AddDataPage(); |
159 | |
160 | // Serializes Data Pages |
161 | void WriteDataPage(const CompressedDataPage& page); |
162 | |
163 | // Write multiple definition levels |
164 | void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels); |
165 | |
166 | // Write multiple repetition levels |
167 | void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels); |
168 | |
169 | // RLE encode the src_buffer into dest_buffer and return the encoded size |
170 | int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer, |
171 | int16_t max_level); |
172 | |
173 | // Serialize the buffered Data Pages |
174 | void FlushBufferedDataPages(); |
175 | |
176 | ColumnChunkMetaDataBuilder* metadata_; |
177 | const ColumnDescriptor* descr_; |
178 | |
179 | std::unique_ptr<PageWriter> pager_; |
180 | |
181 | bool has_dictionary_; |
182 | Encoding::type encoding_; |
183 | const WriterProperties* properties_; |
184 | |
185 | LevelEncoder level_encoder_; |
186 | |
187 | ::arrow::MemoryPool* allocator_; |
188 | |
189 | // The total number of values stored in the data page. This is the maximum of |
190 | // the number of encoded definition levels or encoded values. For |
191 | // non-repeated, required columns, this is equal to the number of encoded |
192 | // values. For repeated or optional values, there may be fewer data values |
193 | // than levels, and this tells you how many encoded levels there are in that |
194 | // case. |
195 | int64_t num_buffered_values_; |
196 | |
197 | // The total number of stored values. For repeated or optional values, this |
198 | // number may be lower than num_buffered_values_. |
199 | int64_t num_buffered_encoded_values_; |
200 | |
201 | // Total number of rows written with this ColumnWriter |
202 | int rows_written_; |
203 | |
204 | // Records the total number of bytes written by the serializer |
205 | int64_t total_bytes_written_; |
206 | |
207 | // Records the current number of compressed bytes in a column |
208 | int64_t total_compressed_bytes_; |
209 | |
210 | // Flag to check if the Writer has been closed |
211 | bool closed_; |
212 | |
213 | // Flag to infer if dictionary encoding has fallen back to PLAIN |
214 | bool fallback_; |
215 | |
216 | std::unique_ptr<InMemoryOutputStream> definition_levels_sink_; |
217 | std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_; |
218 | |
219 | std::shared_ptr<ResizableBuffer> definition_levels_rle_; |
220 | std::shared_ptr<ResizableBuffer> repetition_levels_rle_; |
221 | |
222 | std::shared_ptr<ResizableBuffer> uncompressed_data_; |
223 | std::shared_ptr<ResizableBuffer> compressed_data_; |
224 | |
225 | std::vector<CompressedDataPage> data_pages_; |
226 | |
227 | private: |
228 | void InitSinks(); |
229 | }; |
230 | |
231 | // API to write values to a single column. This is the main client facing API. |
232 | template <typename DType> |
233 | class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter { |
234 | public: |
235 | typedef typename DType::c_type T; |
236 | |
237 | TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, |
238 | std::unique_ptr<PageWriter> , const bool use_dictionary, |
239 | Encoding::type encoding, const WriterProperties* properties); |
240 | |
241 | // Write a batch of repetition levels, definition levels, and values to the |
242 | // column. |
243 | void WriteBatch(int64_t num_values, const int16_t* def_levels, |
244 | const int16_t* rep_levels, const T* values); |
245 | |
246 | /// Write a batch of repetition levels, definition levels, and values to the |
247 | /// column. |
248 | /// |
249 | /// In comparision to WriteBatch the length of repetition and definition levels |
250 | /// is the same as of the number of values read for max_definition_level == 1. |
251 | /// In the case of max_definition_level > 1, the repetition and definition |
252 | /// levels are larger than the values but the values include the null entries |
253 | /// with definition_level == (max_definition_level - 1). Thus we have to differentiate |
254 | /// in the parameters of this function if the input has the length of num_values or the |
255 | /// _number of rows in the lowest nesting level_. |
256 | /// |
257 | /// In the case that the most inner node in the Parquet is required, the _number of rows |
258 | /// in the lowest nesting level_ is equal to the number of non-null values. If the |
259 | /// inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
260 | /// also includes all values with definition_level == (max_definition_level - 1). |
261 | /// |
262 | /// @param num_values number of levels to write. |
263 | /// @param def_levels The Parquet definiton levels, length is num_values |
264 | /// @param rep_levels The Parquet repetition levels, length is num_values |
265 | /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting |
266 | /// level. The length is number of rows in the lowest nesting level. |
267 | /// @param valid_bits_offset The offset in bits of the valid_bits where the |
268 | /// first relevant bit resides. |
269 | /// @param values The values in the lowest nested level including |
270 | /// spacing for nulls on the lowest levels; input has the length |
271 | /// of the number of rows on the lowest nesting level. |
272 | void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, |
273 | const int16_t* rep_levels, const uint8_t* valid_bits, |
274 | int64_t valid_bits_offset, const T* values); |
275 | |
276 | // Estimated size of the values that are not written to a page yet |
277 | int64_t EstimatedBufferedValueBytes() const { |
278 | return current_encoder_->EstimatedDataEncodedSize(); |
279 | } |
280 | |
281 | protected: |
282 | std::shared_ptr<Buffer> GetValuesBuffer() override { |
283 | return current_encoder_->FlushValues(); |
284 | } |
285 | void WriteDictionaryPage() override; |
286 | void CheckDictionarySizeLimit() override; |
287 | EncodedStatistics GetPageStatistics() override; |
288 | EncodedStatistics GetChunkStatistics() override; |
289 | void ResetPageStatistics() override; |
290 | |
291 | private: |
292 | int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels, |
293 | const int16_t* rep_levels, const T* values); |
294 | |
295 | int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels, |
296 | const int16_t* rep_levels, const uint8_t* valid_bits, |
297 | int64_t valid_bits_offset, const T* values, |
298 | int64_t* num_spaced_written); |
299 | |
300 | typedef Encoder<DType> EncoderType; |
301 | |
302 | // Write values to a temporary buffer before they are encoded into pages |
303 | void WriteValues(int64_t num_values, const T* values); |
304 | void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits, |
305 | int64_t valid_bits_offset, const T* values); |
306 | std::unique_ptr<EncoderType> current_encoder_; |
307 | |
308 | typedef TypedRowGroupStatistics<DType> TypedStats; |
309 | std::unique_ptr<TypedStats> page_statistics_; |
310 | std::unique_ptr<TypedStats> chunk_statistics_; |
311 | }; |
312 | |
313 | typedef TypedColumnWriter<BooleanType> BoolWriter; |
314 | typedef TypedColumnWriter<Int32Type> Int32Writer; |
315 | typedef TypedColumnWriter<Int64Type> Int64Writer; |
316 | typedef TypedColumnWriter<Int96Type> Int96Writer; |
317 | typedef TypedColumnWriter<FloatType> FloatWriter; |
318 | typedef TypedColumnWriter<DoubleType> DoubleWriter; |
319 | typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter; |
320 | typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter; |
321 | |
322 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<BooleanType>; |
323 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int32Type>; |
324 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int64Type>; |
325 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int96Type>; |
326 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FloatType>; |
327 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<DoubleType>; |
328 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<ByteArrayType>; |
329 | PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FLBAType>; |
330 | |
331 | } // namespace parquet |
332 | |