1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/column_reader.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstring> |
23 | #include <exception> |
24 | #include <iostream> |
25 | #include <memory> |
26 | #include <unordered_map> |
27 | #include <utility> |
28 | |
29 | #include "arrow/array.h" |
30 | #include "arrow/builder.h" |
31 | #include "arrow/table.h" |
32 | #include "arrow/type.h" |
33 | #include "arrow/util/bit_stream_utils.h" |
34 | #include "arrow/util/checked_cast.h" |
35 | #include "arrow/util/compression.h" |
36 | #include "arrow/util/logging.h" |
37 | #include "arrow/util/rle_encoding.h" |
38 | |
39 | #include "parquet/column_page.h" |
40 | #include "parquet/encoding.h" |
41 | #include "parquet/properties.h" |
42 | #include "parquet/statistics.h" |
43 | #include "parquet/thrift_internal.h" // IWYU pragma: keep |
44 | |
45 | using arrow::MemoryPool; |
46 | using arrow::internal::checked_cast; |
47 | |
48 | namespace parquet { |
49 | |
50 | LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} |
51 | |
52 | LevelDecoder::~LevelDecoder() {} |
53 | |
54 | int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, |
55 | int num_buffered_values, const uint8_t* data) { |
56 | int32_t num_bytes = 0; |
57 | encoding_ = encoding; |
58 | num_values_remaining_ = num_buffered_values; |
59 | bit_width_ = BitUtil::Log2(max_level + 1); |
60 | switch (encoding) { |
61 | case Encoding::RLE: { |
62 | num_bytes = arrow::util::SafeLoadAs<int32_t>(data); |
63 | const uint8_t* decoder_data = data + sizeof(int32_t); |
64 | if (!rle_decoder_) { |
65 | rle_decoder_.reset( |
66 | new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_)); |
67 | } else { |
68 | rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); |
69 | } |
70 | return static_cast<int>(sizeof(int32_t)) + num_bytes; |
71 | } |
72 | case Encoding::BIT_PACKED: { |
73 | num_bytes = |
74 | static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_)); |
75 | if (!bit_packed_decoder_) { |
76 | bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes)); |
77 | } else { |
78 | bit_packed_decoder_->Reset(data, num_bytes); |
79 | } |
80 | return num_bytes; |
81 | } |
82 | default: |
83 | throw ParquetException("Unknown encoding type for levels." ); |
84 | } |
85 | return -1; |
86 | } |
87 | |
88 | int LevelDecoder::Decode(int batch_size, int16_t* levels) { |
89 | int num_decoded = 0; |
90 | |
91 | int num_values = std::min(num_values_remaining_, batch_size); |
92 | if (encoding_ == Encoding::RLE) { |
93 | num_decoded = rle_decoder_->GetBatch(levels, num_values); |
94 | } else { |
95 | num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); |
96 | } |
97 | num_values_remaining_ -= num_decoded; |
98 | return num_decoded; |
99 | } |
100 | |
101 | ReaderProperties default_reader_properties() { |
102 | static ReaderProperties default_reader_properties; |
103 | return default_reader_properties; |
104 | } |
105 | |
106 | // ---------------------------------------------------------------------- |
107 | // SerializedPageReader deserializes Thrift metadata and pages that have been |
108 | // assembled in a serialized stream for storing in a Parquet files |
109 | |
110 | // This subclass delimits pages appearing in a serialized stream, each preceded |
111 | // by a serialized Thrift format::PageHeader indicating the type of each page |
112 | // and the page metadata. |
113 | class : public PageReader { |
114 | public: |
115 | (const std::shared_ptr<ArrowInputStream>& stream, |
116 | int64_t total_num_rows, Compression::type codec, |
117 | ::arrow::MemoryPool* pool) |
118 | : stream_(stream), |
119 | decompression_buffer_(AllocateBuffer(pool, 0)), |
120 | seen_num_rows_(0), |
121 | total_num_rows_(total_num_rows) { |
122 | max_page_header_size_ = kDefaultMaxPageHeaderSize; |
123 | decompressor_ = GetCodec(codec); |
124 | } |
125 | |
126 | // Implement the PageReader interface |
127 | std::shared_ptr<Page> NextPage() override; |
128 | |
129 | void (uint32_t size) override { max_page_header_size_ = size; } |
130 | |
131 | private: |
132 | std::shared_ptr<ArrowInputStream> ; |
133 | |
134 | format::PageHeader ; |
135 | std::shared_ptr<Page> ; |
136 | |
137 | // Compression codec to use. |
138 | std::unique_ptr<::arrow::util::Codec> ; |
139 | std::shared_ptr<ResizableBuffer> ; |
140 | |
141 | // Maximum allowed page size |
142 | uint32_t ; |
143 | |
144 | // Number of rows read in data pages so far |
145 | int64_t ; |
146 | |
147 | // Number of rows in all the data pages |
148 | int64_t ; |
149 | }; |
150 | |
151 | std::shared_ptr<Page> SerializedPageReader::() { |
152 | // Loop here because there may be unhandled page types that we skip until |
153 | // finding a page that we do know what to do with |
154 | while (seen_num_rows_ < total_num_rows_) { |
155 | uint32_t = 0; |
156 | uint32_t allowed_page_size = kDefaultPageHeaderSize; |
157 | |
158 | // Page headers can be very large because of page statistics |
159 | // We try to deserialize a larger buffer progressively |
160 | // until a maximum allowed header limit |
161 | while (true) { |
162 | string_view buffer; |
163 | PARQUET_THROW_NOT_OK(stream_->Peek(allowed_page_size, &buffer)); |
164 | if (buffer.size() == 0) { |
165 | return std::shared_ptr<Page>(nullptr); |
166 | } |
167 | |
168 | // This gets used, then set by DeserializeThriftMsg |
169 | header_size = static_cast<uint32_t>(buffer.size()); |
170 | try { |
171 | DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(buffer.data()), |
172 | &header_size, ¤t_page_header_); |
173 | break; |
174 | } catch (std::exception& e) { |
175 | // Failed to deserialize. Double the allowed page header size and try again |
176 | std::stringstream ss; |
177 | ss << e.what(); |
178 | allowed_page_size *= 2; |
179 | if (allowed_page_size > max_page_header_size_) { |
180 | ss << "Deserializing page header failed.\n" ; |
181 | throw ParquetException(ss.str()); |
182 | } |
183 | } |
184 | } |
185 | // Advance the stream offset |
186 | PARQUET_THROW_NOT_OK(stream_->Advance(header_size)); |
187 | |
188 | int compressed_len = current_page_header_.compressed_page_size; |
189 | int uncompressed_len = current_page_header_.uncompressed_page_size; |
190 | |
191 | // Read the compressed data page. |
192 | std::shared_ptr<Buffer> page_buffer; |
193 | PARQUET_THROW_NOT_OK(stream_->Read(compressed_len, &page_buffer)); |
194 | if (page_buffer->size() != compressed_len) { |
195 | std::stringstream ss; |
196 | ss << "Page was smaller (" << page_buffer->size() << ") than expected (" |
197 | << compressed_len << ")" ; |
198 | ParquetException::EofException(ss.str()); |
199 | } |
200 | |
201 | // Uncompress it if we need to |
202 | if (decompressor_ != nullptr) { |
203 | // Grow the uncompressed buffer if we need to. |
204 | if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) { |
205 | PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); |
206 | } |
207 | PARQUET_THROW_NOT_OK( |
208 | decompressor_->Decompress(compressed_len, page_buffer->data(), uncompressed_len, |
209 | decompression_buffer_->mutable_data())); |
210 | page_buffer = decompression_buffer_; |
211 | } |
212 | |
213 | if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) { |
214 | const format::DictionaryPageHeader& = |
215 | current_page_header_.dictionary_page_header; |
216 | |
217 | bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; |
218 | |
219 | return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values, |
220 | FromThrift(dict_header.encoding), |
221 | is_sorted); |
222 | } else if (current_page_header_.type == format::PageType::DATA_PAGE) { |
223 | const format::DataPageHeader& = current_page_header_.data_page_header; |
224 | |
225 | EncodedStatistics page_statistics; |
226 | if (header.__isset.statistics) { |
227 | const format::Statistics& stats = header.statistics; |
228 | if (stats.__isset.max) { |
229 | page_statistics.set_max(stats.max); |
230 | } |
231 | if (stats.__isset.min) { |
232 | page_statistics.set_min(stats.min); |
233 | } |
234 | if (stats.__isset.null_count) { |
235 | page_statistics.set_null_count(stats.null_count); |
236 | } |
237 | if (stats.__isset.distinct_count) { |
238 | page_statistics.set_distinct_count(stats.distinct_count); |
239 | } |
240 | } |
241 | |
242 | seen_num_rows_ += header.num_values; |
243 | |
244 | return std::make_shared<DataPageV1>( |
245 | page_buffer, header.num_values, FromThrift(header.encoding), |
246 | FromThrift(header.definition_level_encoding), |
247 | FromThrift(header.repetition_level_encoding), page_statistics); |
248 | } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) { |
249 | const format::DataPageHeaderV2& = current_page_header_.data_page_header_v2; |
250 | bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; |
251 | |
252 | seen_num_rows_ += header.num_values; |
253 | |
254 | return std::make_shared<DataPageV2>( |
255 | page_buffer, header.num_values, header.num_nulls, header.num_rows, |
256 | FromThrift(header.encoding), header.definition_levels_byte_length, |
257 | header.repetition_levels_byte_length, is_compressed); |
258 | } else { |
259 | // We don't know what this page type is. We're allowed to skip non-data |
260 | // pages. |
261 | continue; |
262 | } |
263 | } |
264 | return std::shared_ptr<Page>(nullptr); |
265 | } |
266 | |
267 | std::unique_ptr<PageReader> PageReader::( |
268 | const std::shared_ptr<ArrowInputStream>& stream, int64_t total_num_rows, |
269 | Compression::type codec, ::arrow::MemoryPool* pool) { |
270 | return std::unique_ptr<PageReader>( |
271 | new SerializedPageReader(stream, total_num_rows, codec, pool)); |
272 | } |
273 | |
274 | // ---------------------------------------------------------------------- |
275 | // Impl base class for TypedColumnReader and RecordReader |
276 | |
277 | // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index |
278 | // encoding. |
279 | static bool IsDictionaryIndexEncoding(const Encoding::type& e) { |
280 | return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; |
281 | } |
282 | |
283 | template <typename DType> |
284 | class ColumnReaderImplBase { |
285 | public: |
286 | using T = typename DType::c_type; |
287 | |
288 | ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
289 | : descr_(descr), |
290 | max_def_level_(descr->max_definition_level()), |
291 | max_rep_level_(descr->max_repetition_level()), |
292 | num_buffered_values_(0), |
293 | num_decoded_values_(0), |
294 | pool_(pool), |
295 | current_decoder_(nullptr), |
296 | current_encoding_(Encoding::UNKNOWN) {} |
297 | |
298 | virtual ~ColumnReaderImplBase() = default; |
299 | |
300 | protected: |
301 | // Read up to batch_size values from the current data page into the |
302 | // pre-allocated memory T* |
303 | // |
304 | // @returns: the number of values read into the out buffer |
305 | int64_t ReadValues(int64_t batch_size, T* out) { |
306 | int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size)); |
307 | return num_decoded; |
308 | } |
309 | |
310 | // Read up to batch_size values from the current data page into the |
311 | // pre-allocated memory T*, leaving spaces for null entries according |
312 | // to the def_levels. |
313 | // |
314 | // @returns: the number of values read into the out buffer |
315 | int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, |
316 | uint8_t* valid_bits, int64_t valid_bits_offset) { |
317 | return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), |
318 | static_cast<int>(null_count), valid_bits, |
319 | valid_bits_offset); |
320 | } |
321 | |
322 | // Read multiple definition levels into preallocated memory |
323 | // |
324 | // Returns the number of decoded definition levels |
325 | int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { |
326 | if (max_def_level_ == 0) { |
327 | return 0; |
328 | } |
329 | return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
330 | } |
331 | |
332 | bool HasNextInternal() { |
333 | // Either there is no data page available yet, or the data page has been |
334 | // exhausted |
335 | if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { |
336 | if (!ReadNewPage() || num_buffered_values_ == 0) { |
337 | return false; |
338 | } |
339 | } |
340 | return true; |
341 | } |
342 | |
343 | // Read multiple repetition levels into preallocated memory |
344 | // Returns the number of decoded repetition levels |
345 | int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { |
346 | if (max_rep_level_ == 0) { |
347 | return 0; |
348 | } |
349 | return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
350 | } |
351 | |
352 | // Advance to the next data page |
353 | bool ReadNewPage() { |
354 | // Loop until we find the next data page. |
355 | while (true) { |
356 | current_page_ = pager_->NextPage(); |
357 | if (!current_page_) { |
358 | // EOS |
359 | return false; |
360 | } |
361 | |
362 | if (current_page_->type() == PageType::DICTIONARY_PAGE) { |
363 | ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); |
364 | continue; |
365 | } else if (current_page_->type() == PageType::DATA_PAGE) { |
366 | const auto page = std::static_pointer_cast<DataPageV1>(current_page_); |
367 | const int64_t levels_byte_size = InitializeLevelDecoders( |
368 | *page, page->repetition_level_encoding(), page->definition_level_encoding()); |
369 | InitializeDataDecoder(*page, levels_byte_size); |
370 | return true; |
371 | } else if (current_page_->type() == PageType::DATA_PAGE_V2) { |
372 | const auto page = std::static_pointer_cast<DataPageV2>(current_page_); |
373 | // Repetition and definition levels are always encoded using RLE encoding |
374 | // in the DataPageV2 format. |
375 | const int64_t levels_byte_size = |
376 | InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE); |
377 | InitializeDataDecoder(*page, levels_byte_size); |
378 | return true; |
379 | } else { |
380 | // We don't know what this page type is. We're allowed to skip non-data |
381 | // pages. |
382 | continue; |
383 | } |
384 | } |
385 | return true; |
386 | } |
387 | |
388 | void ConfigureDictionary(const DictionaryPage* page) { |
389 | int encoding = static_cast<int>(page->encoding()); |
390 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
391 | page->encoding() == Encoding::PLAIN) { |
392 | encoding = static_cast<int>(Encoding::RLE_DICTIONARY); |
393 | } |
394 | |
395 | auto it = decoders_.find(encoding); |
396 | if (it != decoders_.end()) { |
397 | throw ParquetException("Column cannot have more than one dictionary." ); |
398 | } |
399 | |
400 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
401 | page->encoding() == Encoding::PLAIN) { |
402 | auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_); |
403 | dictionary->SetData(page->num_values(), page->data(), page->size()); |
404 | |
405 | // The dictionary is fully decoded during DictionaryDecoder::Init, so the |
406 | // DictionaryPage buffer is no longer required after this step |
407 | // |
408 | // TODO(wesm): investigate whether this all-or-nothing decoding of the |
409 | // dictionary makes sense and whether performance can be improved |
410 | |
411 | std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_); |
412 | decoder->SetDict(dictionary.get()); |
413 | decoders_[encoding] = |
414 | std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release())); |
415 | } else { |
416 | ParquetException::NYI("only plain dictionary encoding has been implemented" ); |
417 | } |
418 | |
419 | new_dictionary_ = true; |
420 | current_decoder_ = decoders_[encoding].get(); |
421 | DCHECK(current_decoder_); |
422 | } |
423 | |
424 | // Initialize repetition and definition level decoders on the next data page. |
425 | |
426 | // If the data page includes repetition and definition levels, we |
427 | // initialize the level decoders and return the number of encoded level bytes. |
428 | // The return value helps determine the number of bytes in the encoded data. |
429 | int64_t InitializeLevelDecoders(const DataPage& page, |
430 | Encoding::type repetition_level_encoding, |
431 | Encoding::type definition_level_encoding) { |
432 | // Read a data page. |
433 | num_buffered_values_ = page.num_values(); |
434 | |
435 | // Have not decoded any values from the data page yet |
436 | num_decoded_values_ = 0; |
437 | |
438 | const uint8_t* buffer = page.data(); |
439 | int64_t levels_byte_size = 0; |
440 | |
441 | // Data page Layout: Repetition Levels - Definition Levels - encoded values. |
442 | // Levels are encoded as rle or bit-packed. |
443 | // Init repetition levels |
444 | if (max_rep_level_ > 0) { |
445 | int64_t rep_levels_bytes = repetition_level_decoder_.SetData( |
446 | repetition_level_encoding, max_rep_level_, |
447 | static_cast<int>(num_buffered_values_), buffer); |
448 | buffer += rep_levels_bytes; |
449 | levels_byte_size += rep_levels_bytes; |
450 | } |
451 | // TODO figure a way to set max_def_level_ to 0 |
452 | // if the initial value is invalid |
453 | |
454 | // Init definition levels |
455 | if (max_def_level_ > 0) { |
456 | int64_t def_levels_bytes = definition_level_decoder_.SetData( |
457 | definition_level_encoding, max_def_level_, |
458 | static_cast<int>(num_buffered_values_), buffer); |
459 | levels_byte_size += def_levels_bytes; |
460 | } |
461 | |
462 | return levels_byte_size; |
463 | } |
464 | |
465 | // Get a decoder object for this page or create a new decoder if this is the |
466 | // first page with this encoding. |
467 | void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) { |
468 | const uint8_t* buffer = page.data() + levels_byte_size; |
469 | const int64_t data_size = page.size() - levels_byte_size; |
470 | |
471 | Encoding::type encoding = page.encoding(); |
472 | |
473 | if (IsDictionaryIndexEncoding(encoding)) { |
474 | encoding = Encoding::RLE_DICTIONARY; |
475 | } |
476 | |
477 | auto it = decoders_.find(static_cast<int>(encoding)); |
478 | if (it != decoders_.end()) { |
479 | DCHECK(it->second.get() != nullptr); |
480 | if (encoding == Encoding::RLE_DICTIONARY) { |
481 | DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); |
482 | } |
483 | current_decoder_ = it->second.get(); |
484 | } else { |
485 | switch (encoding) { |
486 | case Encoding::PLAIN: { |
487 | auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_); |
488 | current_decoder_ = decoder.get(); |
489 | decoders_[static_cast<int>(encoding)] = std::move(decoder); |
490 | break; |
491 | } |
492 | case Encoding::RLE_DICTIONARY: |
493 | throw ParquetException("Dictionary page must be before data page." ); |
494 | |
495 | case Encoding::DELTA_BINARY_PACKED: |
496 | case Encoding::DELTA_LENGTH_BYTE_ARRAY: |
497 | case Encoding::DELTA_BYTE_ARRAY: |
498 | ParquetException::NYI("Unsupported encoding" ); |
499 | |
500 | default: |
501 | throw ParquetException("Unknown encoding type." ); |
502 | } |
503 | } |
504 | current_encoding_ = encoding; |
505 | current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer, |
506 | static_cast<int>(data_size)); |
507 | } |
508 | |
509 | const ColumnDescriptor* descr_; |
510 | const int16_t max_def_level_; |
511 | const int16_t max_rep_level_; |
512 | |
513 | std::unique_ptr<PageReader> pager_; |
514 | std::shared_ptr<Page> current_page_; |
515 | |
516 | // Not set if full schema for this field has no optional or repeated elements |
517 | LevelDecoder definition_level_decoder_; |
518 | |
519 | // Not set for flat schemas. |
520 | LevelDecoder repetition_level_decoder_; |
521 | |
522 | // The total number of values stored in the data page. This is the maximum of |
523 | // the number of encoded definition levels or encoded values. For |
524 | // non-repeated, required columns, this is equal to the number of encoded |
525 | // values. For repeated or optional values, there may be fewer data values |
526 | // than levels, and this tells you how many encoded levels there are in that |
527 | // case. |
528 | int64_t num_buffered_values_; |
529 | |
530 | // The number of values from the current data page that have been decoded |
531 | // into memory |
532 | int64_t num_decoded_values_; |
533 | |
534 | ::arrow::MemoryPool* pool_; |
535 | |
536 | using DecoderType = typename EncodingTraits<DType>::Decoder; |
537 | DecoderType* current_decoder_; |
538 | Encoding::type current_encoding_; |
539 | |
540 | /// Flag to signal when a new dictionary has been set, for the benefit of |
541 | /// DictionaryRecordReader |
542 | bool new_dictionary_; |
543 | |
544 | // Map of encoding type to the respective decoder object. For example, a |
545 | // column chunk's data pages may include both dictionary-encoded and |
546 | // plain-encoded data. |
547 | std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_; |
548 | |
549 | void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } |
550 | }; |
551 | |
552 | // ---------------------------------------------------------------------- |
553 | // TypedColumnReader implementations |
554 | |
555 | template <typename DType> |
556 | class TypedColumnReaderImpl : public TypedColumnReader<DType>, |
557 | public ColumnReaderImplBase<DType> { |
558 | public: |
559 | using T = typename DType::c_type; |
560 | |
561 | TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> , |
562 | ::arrow::MemoryPool* pool) |
563 | : ColumnReaderImplBase<DType>(descr, pool) { |
564 | this->pager_ = std::move(pager); |
565 | } |
566 | |
567 | bool HasNext() override { return this->HasNextInternal(); } |
568 | |
569 | int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
570 | T* values, int64_t* values_read) override; |
571 | |
572 | int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
573 | T* values, uint8_t* valid_bits, int64_t valid_bits_offset, |
574 | int64_t* levels_read, int64_t* values_read, |
575 | int64_t* null_count) override; |
576 | |
577 | int64_t Skip(int64_t num_rows_to_skip) override; |
578 | |
579 | Type::type type() const override { return this->descr_->physical_type(); } |
580 | |
581 | const ColumnDescriptor* descr() const override { return this->descr_; } |
582 | }; |
583 | |
584 | template <typename DType> |
585 | int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels, |
586 | int16_t* rep_levels, T* values, |
587 | int64_t* values_read) { |
588 | // HasNext invokes ReadNewPage |
589 | if (!HasNext()) { |
590 | *values_read = 0; |
591 | return 0; |
592 | } |
593 | |
594 | // TODO(wesm): keep reading data pages until batch_size is reached, or the |
595 | // row group is finished |
596 | batch_size = |
597 | std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); |
598 | |
599 | int64_t num_def_levels = 0; |
600 | int64_t num_rep_levels = 0; |
601 | |
602 | int64_t values_to_read = 0; |
603 | |
604 | // If the field is required and non-repeated, there are no definition levels |
605 | if (this->max_def_level_ > 0 && def_levels) { |
606 | num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); |
607 | // TODO(wesm): this tallying of values-to-decode can be performed with better |
608 | // cache-efficiency if fused with the level decoding. |
609 | for (int64_t i = 0; i < num_def_levels; ++i) { |
610 | if (def_levels[i] == this->max_def_level_) { |
611 | ++values_to_read; |
612 | } |
613 | } |
614 | } else { |
615 | // Required field, read all values |
616 | values_to_read = batch_size; |
617 | } |
618 | |
619 | // Not present for non-repeated fields |
620 | if (this->max_rep_level_ > 0 && rep_levels) { |
621 | num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); |
622 | if (def_levels && num_def_levels != num_rep_levels) { |
623 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
624 | } |
625 | } |
626 | |
627 | *values_read = this->ReadValues(values_to_read, values); |
628 | int64_t total_values = std::max(num_def_levels, *values_read); |
629 | this->ConsumeBufferedValues(total_values); |
630 | |
631 | return total_values; |
632 | } |
633 | |
634 | template <typename DType> |
635 | int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced( |
636 | int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, |
637 | uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, |
638 | int64_t* values_read, int64_t* null_count_out) { |
639 | // HasNext invokes ReadNewPage |
640 | if (!HasNext()) { |
641 | *levels_read = 0; |
642 | *values_read = 0; |
643 | *null_count_out = 0; |
644 | return 0; |
645 | } |
646 | |
647 | int64_t total_values; |
648 | // TODO(wesm): keep reading data pages until batch_size is reached, or the |
649 | // row group is finished |
650 | batch_size = |
651 | std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); |
652 | |
653 | // If the field is required and non-repeated, there are no definition levels |
654 | if (this->max_def_level_ > 0) { |
655 | int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); |
656 | |
657 | // Not present for non-repeated fields |
658 | if (this->max_rep_level_ > 0) { |
659 | int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); |
660 | if (num_def_levels != num_rep_levels) { |
661 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
662 | } |
663 | } |
664 | |
665 | const bool has_spaced_values = internal::HasSpacedValues(this->descr_); |
666 | |
667 | int64_t null_count = 0; |
668 | if (!has_spaced_values) { |
669 | int values_to_read = 0; |
670 | for (int64_t i = 0; i < num_def_levels; ++i) { |
671 | if (def_levels[i] == this->max_def_level_) { |
672 | ++values_to_read; |
673 | } |
674 | } |
675 | total_values = this->ReadValues(values_to_read, values); |
676 | for (int64_t i = 0; i < total_values; i++) { |
677 | ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); |
678 | } |
679 | *values_read = total_values; |
680 | } else { |
681 | internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_, |
682 | this->max_rep_level_, values_read, &null_count, |
683 | valid_bits, valid_bits_offset); |
684 | total_values = |
685 | this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count), |
686 | valid_bits, valid_bits_offset); |
687 | } |
688 | *levels_read = num_def_levels; |
689 | *null_count_out = null_count; |
690 | |
691 | } else { |
692 | // Required field, read all values |
693 | total_values = this->ReadValues(batch_size, values); |
694 | for (int64_t i = 0; i < total_values; i++) { |
695 | ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); |
696 | } |
697 | *null_count_out = 0; |
698 | *levels_read = total_values; |
699 | } |
700 | |
701 | this->ConsumeBufferedValues(*levels_read); |
702 | return total_values; |
703 | } |
704 | |
705 | template <typename DType> |
706 | int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_rows_to_skip) { |
707 | int64_t rows_to_skip = num_rows_to_skip; |
708 | while (HasNext() && rows_to_skip > 0) { |
709 | // If the number of rows to skip is more than the number of undecoded values, skip the |
710 | // Page. |
711 | if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) { |
712 | rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_; |
713 | this->num_decoded_values_ = this->num_buffered_values_; |
714 | } else { |
715 | // We need to read this Page |
716 | // Jump to the right offset in the Page |
717 | int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint |
718 | int64_t values_read = 0; |
719 | |
720 | // This will be enough scratch space to accommodate 16-bit levels or any |
721 | // value type |
722 | std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer( |
723 | this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size); |
724 | |
725 | do { |
726 | batch_size = std::min(batch_size, rows_to_skip); |
727 | values_read = |
728 | ReadBatch(static_cast<int>(batch_size), |
729 | reinterpret_cast<int16_t*>(scratch->mutable_data()), |
730 | reinterpret_cast<int16_t*>(scratch->mutable_data()), |
731 | reinterpret_cast<T*>(scratch->mutable_data()), &values_read); |
732 | rows_to_skip -= values_read; |
733 | } while (values_read > 0 && rows_to_skip > 0); |
734 | } |
735 | } |
736 | return num_rows_to_skip - rows_to_skip; |
737 | } |
738 | |
739 | // ---------------------------------------------------------------------- |
740 | // Dynamic column reader constructor |
741 | |
742 | std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr, |
743 | std::unique_ptr<PageReader> , |
744 | MemoryPool* pool) { |
745 | switch (descr->physical_type()) { |
746 | case Type::BOOLEAN: |
747 | return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager), |
748 | pool); |
749 | case Type::INT32: |
750 | return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager), |
751 | pool); |
752 | case Type::INT64: |
753 | return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager), |
754 | pool); |
755 | case Type::INT96: |
756 | return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager), |
757 | pool); |
758 | case Type::FLOAT: |
759 | return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager), |
760 | pool); |
761 | case Type::DOUBLE: |
762 | return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager), |
763 | pool); |
764 | case Type::BYTE_ARRAY: |
765 | return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>( |
766 | descr, std::move(pager), pool); |
767 | case Type::FIXED_LEN_BYTE_ARRAY: |
768 | return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager), |
769 | pool); |
770 | default: |
771 | ParquetException::NYI("type reader not implemented" ); |
772 | } |
773 | // Unreachable code, but supress compiler warning |
774 | return std::shared_ptr<ColumnReader>(nullptr); |
775 | } |
776 | |
777 | // ---------------------------------------------------------------------- |
778 | // RecordReader |
779 | |
780 | namespace internal { |
781 | |
782 | // The minimum number of repetition/definition levels to decode at a time, for |
783 | // better vectorized performance when doing many smaller record reads |
784 | constexpr int64_t kMinLevelBatchSize = 1024; |
785 | |
786 | template <typename DType> |
787 | class TypedRecordReader : public ColumnReaderImplBase<DType>, |
788 | virtual public RecordReader { |
789 | public: |
790 | using T = typename DType::c_type; |
791 | using BASE = ColumnReaderImplBase<DType>; |
792 | TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) { |
793 | nullable_values_ = internal::HasSpacedValues(descr); |
794 | at_record_start_ = true; |
795 | records_read_ = 0; |
796 | values_written_ = 0; |
797 | values_capacity_ = 0; |
798 | null_count_ = 0; |
799 | levels_written_ = 0; |
800 | levels_position_ = 0; |
801 | levels_capacity_ = 0; |
802 | uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY); |
803 | |
804 | if (uses_values_) { |
805 | values_ = AllocateBuffer(pool); |
806 | } |
807 | valid_bits_ = AllocateBuffer(pool); |
808 | def_levels_ = AllocateBuffer(pool); |
809 | rep_levels_ = AllocateBuffer(pool); |
810 | Reset(); |
811 | } |
812 | |
813 | int64_t available_values_current_page() const { |
814 | return this->num_buffered_values_ - this->num_decoded_values_; |
815 | } |
816 | |
817 | int64_t ReadRecords(int64_t num_records) override { |
818 | // Delimit records, then read values at the end |
819 | int64_t records_read = 0; |
820 | |
821 | if (levels_position_ < levels_written_) { |
822 | records_read += ReadRecordData(num_records); |
823 | } |
824 | |
825 | int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); |
826 | |
827 | // If we are in the middle of a record, we continue until reaching the |
828 | // desired number of records or the end of the current record if we've found |
829 | // enough records |
830 | while (!at_record_start_ || records_read < num_records) { |
831 | // Is there more data to read in this row group? |
832 | if (!this->HasNextInternal()) { |
833 | if (!at_record_start_) { |
834 | // We ended the row group while inside a record that we haven't seen |
835 | // the end of yet. So increment the record count for the last record in |
836 | // the row group |
837 | ++records_read; |
838 | at_record_start_ = true; |
839 | } |
840 | break; |
841 | } |
842 | |
843 | /// We perform multiple batch reads until we either exhaust the row group |
844 | /// or observe the desired number of records |
845 | int64_t batch_size = std::min(level_batch_size, available_values_current_page()); |
846 | |
847 | // No more data in column |
848 | if (batch_size == 0) { |
849 | break; |
850 | } |
851 | |
852 | if (this->max_def_level_ > 0) { |
853 | ReserveLevels(batch_size); |
854 | |
855 | int16_t* def_levels = this->def_levels() + levels_written_; |
856 | int16_t* rep_levels = this->rep_levels() + levels_written_; |
857 | |
858 | // Not present for non-repeated fields |
859 | int64_t levels_read = 0; |
860 | if (this->max_rep_level_ > 0) { |
861 | levels_read = this->ReadDefinitionLevels(batch_size, def_levels); |
862 | if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { |
863 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
864 | } |
865 | } else if (this->max_def_level_ > 0) { |
866 | levels_read = this->ReadDefinitionLevels(batch_size, def_levels); |
867 | } |
868 | |
869 | // Exhausted column chunk |
870 | if (levels_read == 0) { |
871 | break; |
872 | } |
873 | |
874 | levels_written_ += levels_read; |
875 | records_read += ReadRecordData(num_records - records_read); |
876 | } else { |
877 | // No repetition or definition levels |
878 | batch_size = std::min(num_records - records_read, batch_size); |
879 | records_read += ReadRecordData(batch_size); |
880 | } |
881 | } |
882 | |
883 | return records_read; |
884 | } |
885 | |
886 | // We may outwardly have the appearance of having exhausted a column chunk |
887 | // when in fact we are in the middle of processing the last batch |
888 | bool has_values_to_process() const { return levels_position_ < levels_written_; } |
889 | |
890 | std::shared_ptr<ResizableBuffer> ReleaseValues() override { |
891 | if (uses_values_) { |
892 | auto result = values_; |
893 | values_ = AllocateBuffer(this->pool_); |
894 | return result; |
895 | } else { |
896 | return nullptr; |
897 | } |
898 | } |
899 | |
900 | std::shared_ptr<ResizableBuffer> ReleaseIsValid() override { |
901 | if (nullable_values_) { |
902 | auto result = valid_bits_; |
903 | valid_bits_ = AllocateBuffer(this->pool_); |
904 | return result; |
905 | } else { |
906 | return nullptr; |
907 | } |
908 | } |
909 | |
910 | // Process written repetition/definition levels to reach the end of |
911 | // records. Process no more levels than necessary to delimit the indicated |
912 | // number of logical records. Updates internal state of RecordReader |
913 | // |
914 | // \return Number of records delimited |
915 | int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { |
916 | int64_t values_to_read = 0; |
917 | int64_t records_read = 0; |
918 | |
919 | const int16_t* def_levels = this->def_levels() + levels_position_; |
920 | const int16_t* rep_levels = this->rep_levels() + levels_position_; |
921 | |
922 | DCHECK_GT(this->max_rep_level_, 0); |
923 | |
924 | // Count logical records and number of values to read |
925 | while (levels_position_ < levels_written_) { |
926 | if (*rep_levels++ == 0) { |
927 | // If at_record_start_ is true, we are seeing the start of a record |
928 | // for the second time, such as after repeated calls to |
929 | // DelimitRecords. In this case we must continue until we find |
930 | // another record start or exhausting the ColumnChunk |
931 | if (!at_record_start_) { |
932 | // We've reached the end of a record; increment the record count. |
933 | ++records_read; |
934 | if (records_read == num_records) { |
935 | // We've found the number of records we were looking for. Set |
936 | // at_record_start_ to true and break |
937 | at_record_start_ = true; |
938 | break; |
939 | } |
940 | } |
941 | } |
942 | |
943 | // We have decided to consume the level at this position; therefore we |
944 | // must advance until we find another record boundary |
945 | at_record_start_ = false; |
946 | |
947 | if (*def_levels++ == this->max_def_level_) { |
948 | ++values_to_read; |
949 | } |
950 | ++levels_position_; |
951 | } |
952 | *values_seen = values_to_read; |
953 | return records_read; |
954 | } |
955 | |
956 | void Reserve(int64_t capacity) override { |
957 | ReserveLevels(capacity); |
958 | ReserveValues(capacity); |
959 | } |
960 | |
961 | void ReserveLevels(int64_t capacity) { |
962 | if (this->max_def_level_ > 0 && (levels_written_ + capacity > levels_capacity_)) { |
963 | int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1); |
964 | while (levels_written_ + capacity > new_levels_capacity) { |
965 | new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1); |
966 | } |
967 | PARQUET_THROW_NOT_OK( |
968 | def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); |
969 | if (this->max_rep_level_ > 0) { |
970 | PARQUET_THROW_NOT_OK( |
971 | rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); |
972 | } |
973 | levels_capacity_ = new_levels_capacity; |
974 | } |
975 | } |
976 | |
977 | void ReserveValues(int64_t capacity) { |
978 | if (values_written_ + capacity > values_capacity_) { |
979 | int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1); |
980 | while (values_written_ + capacity > new_values_capacity) { |
981 | new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1); |
982 | } |
983 | |
984 | int type_size = GetTypeByteSize(this->descr_->physical_type()); |
985 | |
986 | // XXX(wesm): A hack to avoid memory allocation when reading directly |
987 | // into builder classes |
988 | if (uses_values_) { |
989 | PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false)); |
990 | } |
991 | |
992 | values_capacity_ = new_values_capacity; |
993 | } |
994 | if (nullable_values_) { |
995 | int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); |
996 | if (valid_bits_->size() < valid_bytes_new) { |
997 | int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); |
998 | PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); |
999 | |
1000 | // Avoid valgrind warnings |
1001 | memset(valid_bits_->mutable_data() + valid_bytes_old, 0, |
1002 | valid_bytes_new - valid_bytes_old); |
1003 | } |
1004 | } |
1005 | } |
1006 | |
1007 | void Reset() override { |
1008 | ResetValues(); |
1009 | |
1010 | if (levels_written_ > 0) { |
1011 | const int64_t levels_remaining = levels_written_ - levels_position_; |
1012 | // Shift remaining levels to beginning of buffer and trim to only the number |
1013 | // of decoded levels remaining |
1014 | int16_t* def_data = def_levels(); |
1015 | int16_t* rep_data = rep_levels(); |
1016 | |
1017 | std::copy(def_data + levels_position_, def_data + levels_written_, def_data); |
1018 | PARQUET_THROW_NOT_OK( |
1019 | def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); |
1020 | |
1021 | if (this->max_rep_level_ > 0) { |
1022 | std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); |
1023 | PARQUET_THROW_NOT_OK( |
1024 | rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); |
1025 | } |
1026 | |
1027 | levels_written_ -= levels_position_; |
1028 | levels_position_ = 0; |
1029 | levels_capacity_ = levels_remaining; |
1030 | } |
1031 | |
1032 | records_read_ = 0; |
1033 | |
1034 | // Call Finish on the binary builders to reset them |
1035 | } |
1036 | |
1037 | void (std::unique_ptr<PageReader> reader) override { |
1038 | at_record_start_ = true; |
1039 | this->pager_ = std::move(reader); |
1040 | ResetDecoders(); |
1041 | } |
1042 | |
1043 | bool HasMoreData() const override { return this->pager_ != nullptr; } |
1044 | |
1045 | // Dictionary decoders must be reset when advancing row groups |
1046 | void ResetDecoders() { this->decoders_.clear(); } |
1047 | |
1048 | virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { |
1049 | uint8_t* valid_bits = valid_bits_->mutable_data(); |
1050 | const int64_t valid_bits_offset = values_written_; |
1051 | |
1052 | int64_t num_decoded = this->current_decoder_->DecodeSpaced( |
1053 | ValuesHead<T>(), static_cast<int>(values_with_nulls), |
1054 | static_cast<int>(null_count), valid_bits, valid_bits_offset); |
1055 | DCHECK_EQ(num_decoded, values_with_nulls); |
1056 | } |
1057 | |
1058 | virtual void ReadValuesDense(int64_t values_to_read) { |
1059 | int64_t num_decoded = |
1060 | this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read)); |
1061 | DCHECK_EQ(num_decoded, values_to_read); |
1062 | } |
1063 | |
1064 | // Return number of logical records read |
1065 | int64_t ReadRecordData(int64_t num_records) { |
1066 | // Conservative upper bound |
1067 | const int64_t possible_num_values = |
1068 | std::max(num_records, levels_written_ - levels_position_); |
1069 | ReserveValues(possible_num_values); |
1070 | |
1071 | const int64_t start_levels_position = levels_position_; |
1072 | |
1073 | int64_t values_to_read = 0; |
1074 | int64_t records_read = 0; |
1075 | if (this->max_rep_level_ > 0) { |
1076 | records_read = DelimitRecords(num_records, &values_to_read); |
1077 | } else if (this->max_def_level_ > 0) { |
1078 | // No repetition levels, skip delimiting logic. Each level represents a |
1079 | // null or not null entry |
1080 | records_read = std::min(levels_written_ - levels_position_, num_records); |
1081 | |
1082 | // This is advanced by DelimitRecords, which we skipped |
1083 | levels_position_ += records_read; |
1084 | } else { |
1085 | records_read = values_to_read = num_records; |
1086 | } |
1087 | |
1088 | int64_t null_count = 0; |
1089 | if (nullable_values_) { |
1090 | int64_t values_with_nulls = 0; |
1091 | internal::DefinitionLevelsToBitmap( |
1092 | def_levels() + start_levels_position, levels_position_ - start_levels_position, |
1093 | this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count, |
1094 | valid_bits_->mutable_data(), values_written_); |
1095 | values_to_read = values_with_nulls - null_count; |
1096 | ReadValuesSpaced(values_with_nulls, null_count); |
1097 | } else { |
1098 | ReadValuesDense(values_to_read); |
1099 | } |
1100 | if (this->max_def_level_ > 0) { |
1101 | // Optional, repeated, or some mix thereof |
1102 | this->ConsumeBufferedValues(levels_position_ - start_levels_position); |
1103 | } else { |
1104 | // Flat, non-repeated |
1105 | this->ConsumeBufferedValues(values_to_read); |
1106 | } |
1107 | // Total values, including null spaces, if any |
1108 | values_written_ += values_to_read + null_count; |
1109 | null_count_ += null_count; |
1110 | |
1111 | return records_read; |
1112 | } |
1113 | |
1114 | void DebugPrintState() override { |
1115 | const int16_t* def_levels = this->def_levels(); |
1116 | const int16_t* rep_levels = this->rep_levels(); |
1117 | const int64_t total_levels_read = levels_position_; |
1118 | |
1119 | const T* vals = reinterpret_cast<const T*>(this->values()); |
1120 | |
1121 | std::cout << "def levels: " ; |
1122 | for (int64_t i = 0; i < total_levels_read; ++i) { |
1123 | std::cout << def_levels[i] << " " ; |
1124 | } |
1125 | std::cout << std::endl; |
1126 | |
1127 | std::cout << "rep levels: " ; |
1128 | for (int64_t i = 0; i < total_levels_read; ++i) { |
1129 | std::cout << rep_levels[i] << " " ; |
1130 | } |
1131 | std::cout << std::endl; |
1132 | |
1133 | std::cout << "values: " ; |
1134 | for (int64_t i = 0; i < this->values_written(); ++i) { |
1135 | std::cout << vals[i] << " " ; |
1136 | } |
1137 | std::cout << std::endl; |
1138 | } |
1139 | |
1140 | void ResetValues() { |
1141 | if (values_written_ > 0) { |
1142 | // Resize to 0, but do not shrink to fit |
1143 | if (uses_values_) { |
1144 | PARQUET_THROW_NOT_OK(values_->Resize(0, false)); |
1145 | } |
1146 | PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); |
1147 | values_written_ = 0; |
1148 | values_capacity_ = 0; |
1149 | null_count_ = 0; |
1150 | } |
1151 | } |
1152 | |
1153 | protected: |
1154 | template <typename T> |
1155 | T* ValuesHead() { |
1156 | return reinterpret_cast<T*>(values_->mutable_data()) + values_written_; |
1157 | } |
1158 | }; |
1159 | |
1160 | class FLBARecordReader : public TypedRecordReader<FLBAType>, |
1161 | virtual public BinaryRecordReader { |
1162 | public: |
1163 | FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
1164 | : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) { |
1165 | DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); |
1166 | int byte_width = descr_->type_length(); |
1167 | std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); |
1168 | builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_)); |
1169 | } |
1170 | |
1171 | ::arrow::ArrayVector GetBuilderChunks() override { |
1172 | std::shared_ptr<::arrow::Array> chunk; |
1173 | PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); |
1174 | return ::arrow::ArrayVector({chunk}); |
1175 | } |
1176 | |
1177 | void ReadValuesDense(int64_t values_to_read) override { |
1178 | auto values = ValuesHead<FLBA>(); |
1179 | int64_t num_decoded = |
1180 | this->current_decoder_->Decode(values, static_cast<int>(values_to_read)); |
1181 | DCHECK_EQ(num_decoded, values_to_read); |
1182 | |
1183 | for (int64_t i = 0; i < num_decoded; i++) { |
1184 | PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); |
1185 | } |
1186 | ResetValues(); |
1187 | } |
1188 | |
1189 | void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
1190 | uint8_t* valid_bits = valid_bits_->mutable_data(); |
1191 | const int64_t valid_bits_offset = values_written_; |
1192 | auto values = ValuesHead<FLBA>(); |
1193 | |
1194 | int64_t num_decoded = this->current_decoder_->DecodeSpaced( |
1195 | values, static_cast<int>(values_to_read), static_cast<int>(null_count), |
1196 | valid_bits, valid_bits_offset); |
1197 | DCHECK_EQ(num_decoded, values_to_read); |
1198 | |
1199 | for (int64_t i = 0; i < num_decoded; i++) { |
1200 | if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { |
1201 | PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); |
1202 | } else { |
1203 | PARQUET_THROW_NOT_OK(builder_->AppendNull()); |
1204 | } |
1205 | } |
1206 | ResetValues(); |
1207 | } |
1208 | |
1209 | private: |
1210 | std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; |
1211 | }; |
1212 | |
1213 | class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>, |
1214 | virtual public BinaryRecordReader { |
1215 | public: |
1216 | ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
1217 | : TypedRecordReader<ByteArrayType>(descr, pool) { |
1218 | DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); |
1219 | accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); |
1220 | } |
1221 | |
1222 | ::arrow::ArrayVector GetBuilderChunks() override { |
1223 | ::arrow::ArrayVector result = accumulator_.chunks; |
1224 | if (result.size() == 0 || accumulator_.builder->length() > 0) { |
1225 | std::shared_ptr<::arrow::Array> last_chunk; |
1226 | PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); |
1227 | result.push_back(last_chunk); |
1228 | } |
1229 | accumulator_.chunks = {}; |
1230 | return result; |
1231 | } |
1232 | |
1233 | void ReadValuesDense(int64_t values_to_read) override { |
1234 | int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( |
1235 | static_cast<int>(values_to_read), &accumulator_); |
1236 | DCHECK_EQ(num_decoded, values_to_read); |
1237 | ResetValues(); |
1238 | } |
1239 | |
1240 | void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
1241 | int64_t num_decoded = this->current_decoder_->DecodeArrow( |
1242 | static_cast<int>(values_to_read), static_cast<int>(null_count), |
1243 | valid_bits_->mutable_data(), values_written_, &accumulator_); |
1244 | DCHECK_EQ(num_decoded, values_to_read - null_count); |
1245 | ResetValues(); |
1246 | } |
1247 | |
1248 | private: |
1249 | // Helper data structure for accumulating builder chunks |
1250 | ArrowBinaryAccumulator accumulator_; |
1251 | }; |
1252 | |
1253 | class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>, |
1254 | virtual public DictionaryRecordReader { |
1255 | public: |
1256 | ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, |
1257 | ::arrow::MemoryPool* pool) |
1258 | : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) { |
1259 | this->read_dictionary_ = true; |
1260 | } |
1261 | |
1262 | std::shared_ptr<::arrow::ChunkedArray> GetResult() override { |
1263 | FlushBuilder(); |
1264 | return std::make_shared<::arrow::ChunkedArray>(result_chunks_, builder_.type()); |
1265 | } |
1266 | |
1267 | void FlushBuilder() { |
1268 | if (builder_.length() > 0) { |
1269 | std::shared_ptr<::arrow::Array> chunk; |
1270 | PARQUET_THROW_NOT_OK(builder_.Finish(&chunk)); |
1271 | result_chunks_.emplace_back(std::move(chunk)); |
1272 | |
1273 | // Also clears the dictionary memo table |
1274 | builder_.ResetFull(); |
1275 | } |
1276 | } |
1277 | |
1278 | void MaybeWriteNewDictionary() { |
1279 | if (this->new_dictionary_) { |
1280 | /// If there is a new dictionary, we may need to flush the builder, then |
1281 | /// insert the new dictionary values |
1282 | FlushBuilder(); |
1283 | auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
1284 | decoder->InsertDictionary(&builder_); |
1285 | this->new_dictionary_ = false; |
1286 | } |
1287 | } |
1288 | |
1289 | void ReadValuesDense(int64_t values_to_read) override { |
1290 | int64_t num_decoded = 0; |
1291 | if (current_encoding_ == Encoding::RLE_DICTIONARY) { |
1292 | MaybeWriteNewDictionary(); |
1293 | auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
1294 | num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_); |
1295 | } else { |
1296 | num_decoded = this->current_decoder_->DecodeArrowNonNull( |
1297 | static_cast<int>(values_to_read), &builder_); |
1298 | |
1299 | /// Flush values since they have been copied into the builder |
1300 | ResetValues(); |
1301 | } |
1302 | DCHECK_EQ(num_decoded, values_to_read); |
1303 | } |
1304 | |
1305 | void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
1306 | int64_t num_decoded = 0; |
1307 | if (current_encoding_ == Encoding::RLE_DICTIONARY) { |
1308 | MaybeWriteNewDictionary(); |
1309 | auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
1310 | num_decoded = decoder->DecodeIndicesSpaced( |
1311 | static_cast<int>(values_to_read), static_cast<int>(null_count), |
1312 | valid_bits_->mutable_data(), values_written_, &builder_); |
1313 | } else { |
1314 | num_decoded = this->current_decoder_->DecodeArrow( |
1315 | static_cast<int>(values_to_read), static_cast<int>(null_count), |
1316 | valid_bits_->mutable_data(), values_written_, &builder_); |
1317 | |
1318 | /// Flush values since they have been copied into the builder |
1319 | ResetValues(); |
1320 | } |
1321 | DCHECK_EQ(num_decoded, values_to_read - null_count); |
1322 | } |
1323 | |
1324 | private: |
1325 | using BinaryDictDecoder = DictDecoder<ByteArrayType>; |
1326 | |
1327 | ::arrow::BinaryDictionary32Builder builder_; |
1328 | std::vector<std::shared_ptr<::arrow::Array>> result_chunks_; |
1329 | }; |
1330 | |
1331 | // TODO(wesm): Implement these to some satisfaction |
1332 | template <> |
1333 | void TypedRecordReader<Int96Type>::DebugPrintState() {} |
1334 | |
1335 | template <> |
1336 | void TypedRecordReader<ByteArrayType>::DebugPrintState() {} |
1337 | |
1338 | template <> |
1339 | void TypedRecordReader<FLBAType>::DebugPrintState() {} |
1340 | |
1341 | std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor* descr, |
1342 | arrow::MemoryPool* pool, |
1343 | bool read_dictionary) { |
1344 | if (read_dictionary) { |
1345 | return std::make_shared<ByteArrayDictionaryRecordReader>(descr, pool); |
1346 | } else { |
1347 | return std::make_shared<ByteArrayChunkedRecordReader>(descr, pool); |
1348 | } |
1349 | } |
1350 | |
1351 | std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, |
1352 | MemoryPool* pool, |
1353 | const bool read_dictionary) { |
1354 | switch (descr->physical_type()) { |
1355 | case Type::BOOLEAN: |
1356 | return std::make_shared<TypedRecordReader<BooleanType>>(descr, pool); |
1357 | case Type::INT32: |
1358 | return std::make_shared<TypedRecordReader<Int32Type>>(descr, pool); |
1359 | case Type::INT64: |
1360 | return std::make_shared<TypedRecordReader<Int64Type>>(descr, pool); |
1361 | case Type::INT96: |
1362 | return std::make_shared<TypedRecordReader<Int96Type>>(descr, pool); |
1363 | case Type::FLOAT: |
1364 | return std::make_shared<TypedRecordReader<FloatType>>(descr, pool); |
1365 | case Type::DOUBLE: |
1366 | return std::make_shared<TypedRecordReader<DoubleType>>(descr, pool); |
1367 | case Type::BYTE_ARRAY: |
1368 | return MakeByteArrayRecordReader(descr, pool, read_dictionary); |
1369 | case Type::FIXED_LEN_BYTE_ARRAY: |
1370 | return std::make_shared<FLBARecordReader>(descr, pool); |
1371 | default: { |
1372 | // PARQUET-1481: This can occur if the file is corrupt |
1373 | std::stringstream ss; |
1374 | ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type()); |
1375 | throw ParquetException(ss.str()); |
1376 | } |
1377 | } |
1378 | // Unreachable code, but supress compiler warning |
1379 | return nullptr; |
1380 | } |
1381 | |
1382 | } // namespace internal |
1383 | } // namespace parquet |
1384 | |