1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/column_reader.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <memory> |
23 | #include <utility> |
24 | |
25 | #include <arrow/buffer.h> |
26 | #include <arrow/memory_pool.h> |
27 | #include <arrow/util/bit-util.h> |
28 | #include <arrow/util/compression.h> |
29 | #include <arrow/util/rle-encoding.h> |
30 | |
31 | #include "parquet/column_page.h" |
32 | #include "parquet/encoding-internal.h" |
33 | #include "parquet/properties.h" |
34 | #include "parquet/thrift.h" |
35 | |
36 | using arrow::MemoryPool; |
37 | |
38 | namespace parquet { |
39 | |
40 | LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} |
41 | |
42 | LevelDecoder::~LevelDecoder() {} |
43 | |
44 | int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, |
45 | int num_buffered_values, const uint8_t* data) { |
46 | int32_t num_bytes = 0; |
47 | encoding_ = encoding; |
48 | num_values_remaining_ = num_buffered_values; |
49 | bit_width_ = BitUtil::Log2(max_level + 1); |
50 | switch (encoding) { |
51 | case Encoding::RLE: { |
52 | num_bytes = *reinterpret_cast<const int32_t*>(data); |
53 | const uint8_t* decoder_data = data + sizeof(int32_t); |
54 | if (!rle_decoder_) { |
55 | rle_decoder_.reset( |
56 | new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_)); |
57 | } else { |
58 | rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); |
59 | } |
60 | return static_cast<int>(sizeof(int32_t)) + num_bytes; |
61 | } |
62 | case Encoding::BIT_PACKED: { |
63 | num_bytes = |
64 | static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_)); |
65 | if (!bit_packed_decoder_) { |
66 | bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes)); |
67 | } else { |
68 | bit_packed_decoder_->Reset(data, num_bytes); |
69 | } |
70 | return num_bytes; |
71 | } |
72 | default: |
73 | throw ParquetException("Unknown encoding type for levels." ); |
74 | } |
75 | return -1; |
76 | } |
77 | |
78 | int LevelDecoder::Decode(int batch_size, int16_t* levels) { |
79 | int num_decoded = 0; |
80 | |
81 | int num_values = std::min(num_values_remaining_, batch_size); |
82 | if (encoding_ == Encoding::RLE) { |
83 | num_decoded = rle_decoder_->GetBatch(levels, num_values); |
84 | } else { |
85 | num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); |
86 | } |
87 | num_values_remaining_ -= num_decoded; |
88 | return num_decoded; |
89 | } |
90 | |
91 | ReaderProperties default_reader_properties() { |
92 | static ReaderProperties default_reader_properties; |
93 | return default_reader_properties; |
94 | } |
95 | |
96 | // ---------------------------------------------------------------------- |
97 | // SerializedPageReader deserializes Thrift metadata and pages that have been |
98 | // assembled in a serialized stream for storing in a Parquet files |
99 | |
100 | // This subclass delimits pages appearing in a serialized stream, each preceded |
101 | // by a serialized Thrift format::PageHeader indicating the type of each page |
102 | // and the page metadata. |
103 | class : public PageReader { |
104 | public: |
105 | (std::unique_ptr<InputStream> stream, int64_t total_num_rows, |
106 | Compression::type codec, ::arrow::MemoryPool* pool) |
107 | : stream_(std::move(stream)), |
108 | decompression_buffer_(AllocateBuffer(pool, 0)), |
109 | seen_num_rows_(0), |
110 | total_num_rows_(total_num_rows) { |
111 | max_page_header_size_ = kDefaultMaxPageHeaderSize; |
112 | decompressor_ = GetCodecFromArrow(codec); |
113 | } |
114 | |
115 | // Implement the PageReader interface |
116 | std::shared_ptr<Page> NextPage() override; |
117 | |
118 | void (uint32_t size) override { max_page_header_size_ = size; } |
119 | |
120 | private: |
121 | std::unique_ptr<InputStream> ; |
122 | |
123 | format::PageHeader ; |
124 | std::shared_ptr<Page> ; |
125 | |
126 | // Compression codec to use. |
127 | std::unique_ptr<::arrow::util::Codec> ; |
128 | std::shared_ptr<ResizableBuffer> ; |
129 | |
130 | // Maximum allowed page size |
131 | uint32_t ; |
132 | |
133 | // Number of rows read in data pages so far |
134 | int64_t ; |
135 | |
136 | // Number of rows in all the data pages |
137 | int64_t ; |
138 | }; |
139 | |
140 | std::shared_ptr<Page> SerializedPageReader::() { |
141 | // Loop here because there may be unhandled page types that we skip until |
142 | // finding a page that we do know what to do with |
143 | while (seen_num_rows_ < total_num_rows_) { |
144 | int64_t bytes_read = 0; |
145 | int64_t bytes_available = 0; |
146 | uint32_t = 0; |
147 | const uint8_t* buffer; |
148 | uint32_t allowed_page_size = kDefaultPageHeaderSize; |
149 | |
150 | // Page headers can be very large because of page statistics |
151 | // We try to deserialize a larger buffer progressively |
152 | // until a maximum allowed header limit |
153 | while (true) { |
154 | buffer = stream_->Peek(allowed_page_size, &bytes_available); |
155 | if (bytes_available == 0) { |
156 | return std::shared_ptr<Page>(nullptr); |
157 | } |
158 | |
159 | // This gets used, then set by DeserializeThriftMsg |
160 | header_size = static_cast<uint32_t>(bytes_available); |
161 | try { |
162 | DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); |
163 | break; |
164 | } catch (std::exception& e) { |
165 | // Failed to deserialize. Double the allowed page header size and try again |
166 | std::stringstream ss; |
167 | ss << e.what(); |
168 | allowed_page_size *= 2; |
169 | if (allowed_page_size > max_page_header_size_) { |
170 | ss << "Deserializing page header failed.\n" ; |
171 | throw ParquetException(ss.str()); |
172 | } |
173 | } |
174 | } |
175 | // Advance the stream offset |
176 | stream_->Advance(header_size); |
177 | |
178 | int compressed_len = current_page_header_.compressed_page_size; |
179 | int uncompressed_len = current_page_header_.uncompressed_page_size; |
180 | |
181 | // Read the compressed data page. |
182 | buffer = stream_->Read(compressed_len, &bytes_read); |
183 | if (bytes_read != compressed_len) { |
184 | std::stringstream ss; |
185 | ss << "Page was smaller (" << bytes_read << ") than expected (" << compressed_len |
186 | << ")" ; |
187 | ParquetException::EofException(ss.str()); |
188 | } |
189 | |
190 | // Uncompress it if we need to |
191 | if (decompressor_ != nullptr) { |
192 | // Grow the uncompressed buffer if we need to. |
193 | if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) { |
194 | PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); |
195 | } |
196 | PARQUET_THROW_NOT_OK( |
197 | decompressor_->Decompress(compressed_len, buffer, uncompressed_len, |
198 | decompression_buffer_->mutable_data())); |
199 | buffer = decompression_buffer_->data(); |
200 | } |
201 | |
202 | auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len); |
203 | |
204 | if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) { |
205 | const format::DictionaryPageHeader& = |
206 | current_page_header_.dictionary_page_header; |
207 | |
208 | bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; |
209 | |
210 | return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values, |
211 | FromThrift(dict_header.encoding), |
212 | is_sorted); |
213 | } else if (current_page_header_.type == format::PageType::DATA_PAGE) { |
214 | const format::DataPageHeader& = current_page_header_.data_page_header; |
215 | |
216 | EncodedStatistics page_statistics; |
217 | if (header.__isset.statistics) { |
218 | const format::Statistics& stats = header.statistics; |
219 | if (stats.__isset.max) { |
220 | page_statistics.set_max(stats.max); |
221 | } |
222 | if (stats.__isset.min) { |
223 | page_statistics.set_min(stats.min); |
224 | } |
225 | if (stats.__isset.null_count) { |
226 | page_statistics.set_null_count(stats.null_count); |
227 | } |
228 | if (stats.__isset.distinct_count) { |
229 | page_statistics.set_distinct_count(stats.distinct_count); |
230 | } |
231 | } |
232 | |
233 | seen_num_rows_ += header.num_values; |
234 | |
235 | return std::make_shared<DataPage>( |
236 | page_buffer, header.num_values, FromThrift(header.encoding), |
237 | FromThrift(header.definition_level_encoding), |
238 | FromThrift(header.repetition_level_encoding), page_statistics); |
239 | } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) { |
240 | const format::DataPageHeaderV2& = current_page_header_.data_page_header_v2; |
241 | bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; |
242 | |
243 | seen_num_rows_ += header.num_values; |
244 | |
245 | return std::make_shared<DataPageV2>( |
246 | page_buffer, header.num_values, header.num_nulls, header.num_rows, |
247 | FromThrift(header.encoding), header.definition_levels_byte_length, |
248 | header.repetition_levels_byte_length, is_compressed); |
249 | } else { |
250 | // We don't know what this page type is. We're allowed to skip non-data |
251 | // pages. |
252 | continue; |
253 | } |
254 | } |
255 | return std::shared_ptr<Page>(nullptr); |
256 | } |
257 | |
258 | std::unique_ptr<PageReader> PageReader::(std::unique_ptr<InputStream> stream, |
259 | int64_t total_num_rows, |
260 | Compression::type codec, |
261 | ::arrow::MemoryPool* pool) { |
262 | return std::unique_ptr<PageReader>( |
263 | new SerializedPageReader(std::move(stream), total_num_rows, codec, pool)); |
264 | } |
265 | |
266 | // ---------------------------------------------------------------------- |
267 | |
268 | ColumnReader::ColumnReader(const ColumnDescriptor* descr, |
269 | std::unique_ptr<PageReader> , MemoryPool* pool) |
270 | : descr_(descr), |
271 | pager_(std::move(pager)), |
272 | num_buffered_values_(0), |
273 | num_decoded_values_(0), |
274 | pool_(pool) {} |
275 | |
276 | ColumnReader::~ColumnReader() {} |
277 | |
278 | template <typename DType> |
279 | void TypedColumnReader<DType>::ConfigureDictionary(const DictionaryPage* page) { |
280 | int encoding = static_cast<int>(page->encoding()); |
281 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
282 | page->encoding() == Encoding::PLAIN) { |
283 | encoding = static_cast<int>(Encoding::RLE_DICTIONARY); |
284 | } |
285 | |
286 | auto it = decoders_.find(encoding); |
287 | if (it != decoders_.end()) { |
288 | throw ParquetException("Column cannot have more than one dictionary." ); |
289 | } |
290 | |
291 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
292 | page->encoding() == Encoding::PLAIN) { |
293 | PlainDecoder<DType> dictionary(descr_); |
294 | dictionary.SetData(page->num_values(), page->data(), page->size()); |
295 | |
296 | // The dictionary is fully decoded during DictionaryDecoder::Init, so the |
297 | // DictionaryPage buffer is no longer required after this step |
298 | // |
299 | // TODO(wesm): investigate whether this all-or-nothing decoding of the |
300 | // dictionary makes sense and whether performance can be improved |
301 | |
302 | auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_); |
303 | decoder->SetDict(&dictionary); |
304 | decoders_[encoding] = decoder; |
305 | } else { |
306 | ParquetException::NYI("only plain dictionary encoding has been implemented" ); |
307 | } |
308 | |
309 | current_decoder_ = decoders_[encoding].get(); |
310 | } |
311 | |
312 | // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index |
313 | // encoding. |
314 | static bool IsDictionaryIndexEncoding(const Encoding::type& e) { |
315 | return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; |
316 | } |
317 | |
318 | template <typename DType> |
319 | bool TypedColumnReader<DType>::ReadNewPage() { |
320 | // Loop until we find the next data page. |
321 | const uint8_t* buffer; |
322 | |
323 | while (true) { |
324 | current_page_ = pager_->NextPage(); |
325 | if (!current_page_) { |
326 | // EOS |
327 | return false; |
328 | } |
329 | |
330 | if (current_page_->type() == PageType::DICTIONARY_PAGE) { |
331 | ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); |
332 | continue; |
333 | } else if (current_page_->type() == PageType::DATA_PAGE) { |
334 | const DataPage* page = static_cast<const DataPage*>(current_page_.get()); |
335 | |
336 | // Read a data page. |
337 | num_buffered_values_ = page->num_values(); |
338 | |
339 | // Have not decoded any values from the data page yet |
340 | num_decoded_values_ = 0; |
341 | |
342 | buffer = page->data(); |
343 | |
344 | // If the data page includes repetition and definition levels, we |
345 | // initialize the level decoder and subtract the encoded level bytes from |
346 | // the page size to determine the number of bytes in the encoded data. |
347 | int64_t data_size = page->size(); |
348 | |
349 | // Data page Layout: Repetition Levels - Definition Levels - encoded values. |
350 | // Levels are encoded as rle or bit-packed. |
351 | // Init repetition levels |
352 | if (descr_->max_repetition_level() > 0) { |
353 | int64_t rep_levels_bytes = repetition_level_decoder_.SetData( |
354 | page->repetition_level_encoding(), descr_->max_repetition_level(), |
355 | static_cast<int>(num_buffered_values_), buffer); |
356 | buffer += rep_levels_bytes; |
357 | data_size -= rep_levels_bytes; |
358 | } |
359 | // TODO figure a way to set max_definition_level_ to 0 |
360 | // if the initial value is invalid |
361 | |
362 | // Init definition levels |
363 | if (descr_->max_definition_level() > 0) { |
364 | int64_t def_levels_bytes = definition_level_decoder_.SetData( |
365 | page->definition_level_encoding(), descr_->max_definition_level(), |
366 | static_cast<int>(num_buffered_values_), buffer); |
367 | buffer += def_levels_bytes; |
368 | data_size -= def_levels_bytes; |
369 | } |
370 | |
371 | // Get a decoder object for this page or create a new decoder if this is the |
372 | // first page with this encoding. |
373 | Encoding::type encoding = page->encoding(); |
374 | |
375 | if (IsDictionaryIndexEncoding(encoding)) { |
376 | encoding = Encoding::RLE_DICTIONARY; |
377 | } |
378 | |
379 | auto it = decoders_.find(static_cast<int>(encoding)); |
380 | if (it != decoders_.end()) { |
381 | if (encoding == Encoding::RLE_DICTIONARY) { |
382 | DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); |
383 | } |
384 | current_decoder_ = it->second.get(); |
385 | } else { |
386 | switch (encoding) { |
387 | case Encoding::PLAIN: { |
388 | std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_)); |
389 | decoders_[static_cast<int>(encoding)] = decoder; |
390 | current_decoder_ = decoder.get(); |
391 | break; |
392 | } |
393 | case Encoding::RLE_DICTIONARY: |
394 | throw ParquetException("Dictionary page must be before data page." ); |
395 | |
396 | case Encoding::DELTA_BINARY_PACKED: |
397 | case Encoding::DELTA_LENGTH_BYTE_ARRAY: |
398 | case Encoding::DELTA_BYTE_ARRAY: |
399 | ParquetException::NYI("Unsupported encoding" ); |
400 | |
401 | default: |
402 | throw ParquetException("Unknown encoding type." ); |
403 | } |
404 | } |
405 | current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer, |
406 | static_cast<int>(data_size)); |
407 | return true; |
408 | } else { |
409 | // We don't know what this page type is. We're allowed to skip non-data |
410 | // pages. |
411 | continue; |
412 | } |
413 | } |
414 | return true; |
415 | } |
416 | |
417 | // ---------------------------------------------------------------------- |
418 | // Batch read APIs |
419 | |
420 | int64_t ColumnReader::ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { |
421 | if (descr_->max_definition_level() == 0) { |
422 | return 0; |
423 | } |
424 | return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
425 | } |
426 | |
427 | int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { |
428 | if (descr_->max_repetition_level() == 0) { |
429 | return 0; |
430 | } |
431 | return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
432 | } |
433 | |
434 | // ---------------------------------------------------------------------- |
435 | // Dynamic column reader constructor |
436 | |
437 | std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr, |
438 | std::unique_ptr<PageReader> , |
439 | MemoryPool* pool) { |
440 | switch (descr->physical_type()) { |
441 | case Type::BOOLEAN: |
442 | return std::make_shared<BoolReader>(descr, std::move(pager), pool); |
443 | case Type::INT32: |
444 | return std::make_shared<Int32Reader>(descr, std::move(pager), pool); |
445 | case Type::INT64: |
446 | return std::make_shared<Int64Reader>(descr, std::move(pager), pool); |
447 | case Type::INT96: |
448 | return std::make_shared<Int96Reader>(descr, std::move(pager), pool); |
449 | case Type::FLOAT: |
450 | return std::make_shared<FloatReader>(descr, std::move(pager), pool); |
451 | case Type::DOUBLE: |
452 | return std::make_shared<DoubleReader>(descr, std::move(pager), pool); |
453 | case Type::BYTE_ARRAY: |
454 | return std::make_shared<ByteArrayReader>(descr, std::move(pager), pool); |
455 | case Type::FIXED_LEN_BYTE_ARRAY: |
456 | return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager), pool); |
457 | default: |
458 | ParquetException::NYI("type reader not implemented" ); |
459 | } |
460 | // Unreachable code, but supress compiler warning |
461 | return std::shared_ptr<ColumnReader>(nullptr); |
462 | } |
463 | |
464 | // ---------------------------------------------------------------------- |
465 | // Instantiate templated classes |
466 | |
467 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<BooleanType>; |
468 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int32Type>; |
469 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int64Type>; |
470 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int96Type>; |
471 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FloatType>; |
472 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<DoubleType>; |
473 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<ByteArrayType>; |
474 | template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FLBAType>; |
475 | |
476 | } // namespace parquet |
477 | |