1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/column_reader.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <exception>
24#include <iostream>
25#include <memory>
26#include <unordered_map>
27#include <utility>
28
29#include "arrow/array.h"
30#include "arrow/builder.h"
31#include "arrow/table.h"
32#include "arrow/type.h"
33#include "arrow/util/bit_stream_utils.h"
34#include "arrow/util/checked_cast.h"
35#include "arrow/util/compression.h"
36#include "arrow/util/logging.h"
37#include "arrow/util/rle_encoding.h"
38
39#include "parquet/column_page.h"
40#include "parquet/encoding.h"
41#include "parquet/properties.h"
42#include "parquet/statistics.h"
43#include "parquet/thrift_internal.h" // IWYU pragma: keep
44
45using arrow::MemoryPool;
46using arrow::internal::checked_cast;
47
48namespace parquet {
49
50LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
51
52LevelDecoder::~LevelDecoder() {}
53
54int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
55 int num_buffered_values, const uint8_t* data) {
56 int32_t num_bytes = 0;
57 encoding_ = encoding;
58 num_values_remaining_ = num_buffered_values;
59 bit_width_ = BitUtil::Log2(max_level + 1);
60 switch (encoding) {
61 case Encoding::RLE: {
62 num_bytes = arrow::util::SafeLoadAs<int32_t>(data);
63 const uint8_t* decoder_data = data + sizeof(int32_t);
64 if (!rle_decoder_) {
65 rle_decoder_.reset(
66 new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_));
67 } else {
68 rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
69 }
70 return static_cast<int>(sizeof(int32_t)) + num_bytes;
71 }
72 case Encoding::BIT_PACKED: {
73 num_bytes =
74 static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
75 if (!bit_packed_decoder_) {
76 bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes));
77 } else {
78 bit_packed_decoder_->Reset(data, num_bytes);
79 }
80 return num_bytes;
81 }
82 default:
83 throw ParquetException("Unknown encoding type for levels.");
84 }
85 return -1;
86}
87
88int LevelDecoder::Decode(int batch_size, int16_t* levels) {
89 int num_decoded = 0;
90
91 int num_values = std::min(num_values_remaining_, batch_size);
92 if (encoding_ == Encoding::RLE) {
93 num_decoded = rle_decoder_->GetBatch(levels, num_values);
94 } else {
95 num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
96 }
97 num_values_remaining_ -= num_decoded;
98 return num_decoded;
99}
100
101ReaderProperties default_reader_properties() {
102 static ReaderProperties default_reader_properties;
103 return default_reader_properties;
104}
105
106// ----------------------------------------------------------------------
107// SerializedPageReader deserializes Thrift metadata and pages that have been
108// assembled in a serialized stream for storing in a Parquet files
109
110// This subclass delimits pages appearing in a serialized stream, each preceded
111// by a serialized Thrift format::PageHeader indicating the type of each page
112// and the page metadata.
113class SerializedPageReader : public PageReader {
114 public:
115 SerializedPageReader(const std::shared_ptr<ArrowInputStream>& stream,
116 int64_t total_num_rows, Compression::type codec,
117 ::arrow::MemoryPool* pool)
118 : stream_(stream),
119 decompression_buffer_(AllocateBuffer(pool, 0)),
120 seen_num_rows_(0),
121 total_num_rows_(total_num_rows) {
122 max_page_header_size_ = kDefaultMaxPageHeaderSize;
123 decompressor_ = GetCodec(codec);
124 }
125
126 // Implement the PageReader interface
127 std::shared_ptr<Page> NextPage() override;
128
129 void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
130
131 private:
132 std::shared_ptr<ArrowInputStream> stream_;
133
134 format::PageHeader current_page_header_;
135 std::shared_ptr<Page> current_page_;
136
137 // Compression codec to use.
138 std::unique_ptr<::arrow::util::Codec> decompressor_;
139 std::shared_ptr<ResizableBuffer> decompression_buffer_;
140
141 // Maximum allowed page size
142 uint32_t max_page_header_size_;
143
144 // Number of rows read in data pages so far
145 int64_t seen_num_rows_;
146
147 // Number of rows in all the data pages
148 int64_t total_num_rows_;
149};
150
151std::shared_ptr<Page> SerializedPageReader::NextPage() {
152 // Loop here because there may be unhandled page types that we skip until
153 // finding a page that we do know what to do with
154 while (seen_num_rows_ < total_num_rows_) {
155 uint32_t header_size = 0;
156 uint32_t allowed_page_size = kDefaultPageHeaderSize;
157
158 // Page headers can be very large because of page statistics
159 // We try to deserialize a larger buffer progressively
160 // until a maximum allowed header limit
161 while (true) {
162 string_view buffer;
163 PARQUET_THROW_NOT_OK(stream_->Peek(allowed_page_size, &buffer));
164 if (buffer.size() == 0) {
165 return std::shared_ptr<Page>(nullptr);
166 }
167
168 // This gets used, then set by DeserializeThriftMsg
169 header_size = static_cast<uint32_t>(buffer.size());
170 try {
171 DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(buffer.data()),
172 &header_size, &current_page_header_);
173 break;
174 } catch (std::exception& e) {
175 // Failed to deserialize. Double the allowed page header size and try again
176 std::stringstream ss;
177 ss << e.what();
178 allowed_page_size *= 2;
179 if (allowed_page_size > max_page_header_size_) {
180 ss << "Deserializing page header failed.\n";
181 throw ParquetException(ss.str());
182 }
183 }
184 }
185 // Advance the stream offset
186 PARQUET_THROW_NOT_OK(stream_->Advance(header_size));
187
188 int compressed_len = current_page_header_.compressed_page_size;
189 int uncompressed_len = current_page_header_.uncompressed_page_size;
190
191 // Read the compressed data page.
192 std::shared_ptr<Buffer> page_buffer;
193 PARQUET_THROW_NOT_OK(stream_->Read(compressed_len, &page_buffer));
194 if (page_buffer->size() != compressed_len) {
195 std::stringstream ss;
196 ss << "Page was smaller (" << page_buffer->size() << ") than expected ("
197 << compressed_len << ")";
198 ParquetException::EofException(ss.str());
199 }
200
201 // Uncompress it if we need to
202 if (decompressor_ != nullptr) {
203 // Grow the uncompressed buffer if we need to.
204 if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
205 PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
206 }
207 PARQUET_THROW_NOT_OK(
208 decompressor_->Decompress(compressed_len, page_buffer->data(), uncompressed_len,
209 decompression_buffer_->mutable_data()));
210 page_buffer = decompression_buffer_;
211 }
212
213 if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
214 const format::DictionaryPageHeader& dict_header =
215 current_page_header_.dictionary_page_header;
216
217 bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
218
219 return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
220 FromThrift(dict_header.encoding),
221 is_sorted);
222 } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
223 const format::DataPageHeader& header = current_page_header_.data_page_header;
224
225 EncodedStatistics page_statistics;
226 if (header.__isset.statistics) {
227 const format::Statistics& stats = header.statistics;
228 if (stats.__isset.max) {
229 page_statistics.set_max(stats.max);
230 }
231 if (stats.__isset.min) {
232 page_statistics.set_min(stats.min);
233 }
234 if (stats.__isset.null_count) {
235 page_statistics.set_null_count(stats.null_count);
236 }
237 if (stats.__isset.distinct_count) {
238 page_statistics.set_distinct_count(stats.distinct_count);
239 }
240 }
241
242 seen_num_rows_ += header.num_values;
243
244 return std::make_shared<DataPageV1>(
245 page_buffer, header.num_values, FromThrift(header.encoding),
246 FromThrift(header.definition_level_encoding),
247 FromThrift(header.repetition_level_encoding), page_statistics);
248 } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
249 const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
250 bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
251
252 seen_num_rows_ += header.num_values;
253
254 return std::make_shared<DataPageV2>(
255 page_buffer, header.num_values, header.num_nulls, header.num_rows,
256 FromThrift(header.encoding), header.definition_levels_byte_length,
257 header.repetition_levels_byte_length, is_compressed);
258 } else {
259 // We don't know what this page type is. We're allowed to skip non-data
260 // pages.
261 continue;
262 }
263 }
264 return std::shared_ptr<Page>(nullptr);
265}
266
267std::unique_ptr<PageReader> PageReader::Open(
268 const std::shared_ptr<ArrowInputStream>& stream, int64_t total_num_rows,
269 Compression::type codec, ::arrow::MemoryPool* pool) {
270 return std::unique_ptr<PageReader>(
271 new SerializedPageReader(stream, total_num_rows, codec, pool));
272}
273
274// ----------------------------------------------------------------------
275// Impl base class for TypedColumnReader and RecordReader
276
277// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
278// encoding.
279static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
280 return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
281}
282
283template <typename DType>
284class ColumnReaderImplBase {
285 public:
286 using T = typename DType::c_type;
287
288 ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
289 : descr_(descr),
290 max_def_level_(descr->max_definition_level()),
291 max_rep_level_(descr->max_repetition_level()),
292 num_buffered_values_(0),
293 num_decoded_values_(0),
294 pool_(pool),
295 current_decoder_(nullptr),
296 current_encoding_(Encoding::UNKNOWN) {}
297
298 virtual ~ColumnReaderImplBase() = default;
299
300 protected:
301 // Read up to batch_size values from the current data page into the
302 // pre-allocated memory T*
303 //
304 // @returns: the number of values read into the out buffer
305 int64_t ReadValues(int64_t batch_size, T* out) {
306 int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
307 return num_decoded;
308 }
309
310 // Read up to batch_size values from the current data page into the
311 // pre-allocated memory T*, leaving spaces for null entries according
312 // to the def_levels.
313 //
314 // @returns: the number of values read into the out buffer
315 int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
316 uint8_t* valid_bits, int64_t valid_bits_offset) {
317 return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
318 static_cast<int>(null_count), valid_bits,
319 valid_bits_offset);
320 }
321
322 // Read multiple definition levels into preallocated memory
323 //
324 // Returns the number of decoded definition levels
325 int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
326 if (max_def_level_ == 0) {
327 return 0;
328 }
329 return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
330 }
331
332 bool HasNextInternal() {
333 // Either there is no data page available yet, or the data page has been
334 // exhausted
335 if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
336 if (!ReadNewPage() || num_buffered_values_ == 0) {
337 return false;
338 }
339 }
340 return true;
341 }
342
343 // Read multiple repetition levels into preallocated memory
344 // Returns the number of decoded repetition levels
345 int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
346 if (max_rep_level_ == 0) {
347 return 0;
348 }
349 return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
350 }
351
352 // Advance to the next data page
353 bool ReadNewPage() {
354 // Loop until we find the next data page.
355 while (true) {
356 current_page_ = pager_->NextPage();
357 if (!current_page_) {
358 // EOS
359 return false;
360 }
361
362 if (current_page_->type() == PageType::DICTIONARY_PAGE) {
363 ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
364 continue;
365 } else if (current_page_->type() == PageType::DATA_PAGE) {
366 const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
367 const int64_t levels_byte_size = InitializeLevelDecoders(
368 *page, page->repetition_level_encoding(), page->definition_level_encoding());
369 InitializeDataDecoder(*page, levels_byte_size);
370 return true;
371 } else if (current_page_->type() == PageType::DATA_PAGE_V2) {
372 const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
373 // Repetition and definition levels are always encoded using RLE encoding
374 // in the DataPageV2 format.
375 const int64_t levels_byte_size =
376 InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE);
377 InitializeDataDecoder(*page, levels_byte_size);
378 return true;
379 } else {
380 // We don't know what this page type is. We're allowed to skip non-data
381 // pages.
382 continue;
383 }
384 }
385 return true;
386 }
387
388 void ConfigureDictionary(const DictionaryPage* page) {
389 int encoding = static_cast<int>(page->encoding());
390 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
391 page->encoding() == Encoding::PLAIN) {
392 encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
393 }
394
395 auto it = decoders_.find(encoding);
396 if (it != decoders_.end()) {
397 throw ParquetException("Column cannot have more than one dictionary.");
398 }
399
400 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
401 page->encoding() == Encoding::PLAIN) {
402 auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
403 dictionary->SetData(page->num_values(), page->data(), page->size());
404
405 // The dictionary is fully decoded during DictionaryDecoder::Init, so the
406 // DictionaryPage buffer is no longer required after this step
407 //
408 // TODO(wesm): investigate whether this all-or-nothing decoding of the
409 // dictionary makes sense and whether performance can be improved
410
411 std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_);
412 decoder->SetDict(dictionary.get());
413 decoders_[encoding] =
414 std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release()));
415 } else {
416 ParquetException::NYI("only plain dictionary encoding has been implemented");
417 }
418
419 new_dictionary_ = true;
420 current_decoder_ = decoders_[encoding].get();
421 DCHECK(current_decoder_);
422 }
423
424 // Initialize repetition and definition level decoders on the next data page.
425
426 // If the data page includes repetition and definition levels, we
427 // initialize the level decoders and return the number of encoded level bytes.
428 // The return value helps determine the number of bytes in the encoded data.
429 int64_t InitializeLevelDecoders(const DataPage& page,
430 Encoding::type repetition_level_encoding,
431 Encoding::type definition_level_encoding) {
432 // Read a data page.
433 num_buffered_values_ = page.num_values();
434
435 // Have not decoded any values from the data page yet
436 num_decoded_values_ = 0;
437
438 const uint8_t* buffer = page.data();
439 int64_t levels_byte_size = 0;
440
441 // Data page Layout: Repetition Levels - Definition Levels - encoded values.
442 // Levels are encoded as rle or bit-packed.
443 // Init repetition levels
444 if (max_rep_level_ > 0) {
445 int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
446 repetition_level_encoding, max_rep_level_,
447 static_cast<int>(num_buffered_values_), buffer);
448 buffer += rep_levels_bytes;
449 levels_byte_size += rep_levels_bytes;
450 }
451 // TODO figure a way to set max_def_level_ to 0
452 // if the initial value is invalid
453
454 // Init definition levels
455 if (max_def_level_ > 0) {
456 int64_t def_levels_bytes = definition_level_decoder_.SetData(
457 definition_level_encoding, max_def_level_,
458 static_cast<int>(num_buffered_values_), buffer);
459 levels_byte_size += def_levels_bytes;
460 }
461
462 return levels_byte_size;
463 }
464
465 // Get a decoder object for this page or create a new decoder if this is the
466 // first page with this encoding.
467 void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) {
468 const uint8_t* buffer = page.data() + levels_byte_size;
469 const int64_t data_size = page.size() - levels_byte_size;
470
471 Encoding::type encoding = page.encoding();
472
473 if (IsDictionaryIndexEncoding(encoding)) {
474 encoding = Encoding::RLE_DICTIONARY;
475 }
476
477 auto it = decoders_.find(static_cast<int>(encoding));
478 if (it != decoders_.end()) {
479 DCHECK(it->second.get() != nullptr);
480 if (encoding == Encoding::RLE_DICTIONARY) {
481 DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
482 }
483 current_decoder_ = it->second.get();
484 } else {
485 switch (encoding) {
486 case Encoding::PLAIN: {
487 auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
488 current_decoder_ = decoder.get();
489 decoders_[static_cast<int>(encoding)] = std::move(decoder);
490 break;
491 }
492 case Encoding::RLE_DICTIONARY:
493 throw ParquetException("Dictionary page must be before data page.");
494
495 case Encoding::DELTA_BINARY_PACKED:
496 case Encoding::DELTA_LENGTH_BYTE_ARRAY:
497 case Encoding::DELTA_BYTE_ARRAY:
498 ParquetException::NYI("Unsupported encoding");
499
500 default:
501 throw ParquetException("Unknown encoding type.");
502 }
503 }
504 current_encoding_ = encoding;
505 current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
506 static_cast<int>(data_size));
507 }
508
509 const ColumnDescriptor* descr_;
510 const int16_t max_def_level_;
511 const int16_t max_rep_level_;
512
513 std::unique_ptr<PageReader> pager_;
514 std::shared_ptr<Page> current_page_;
515
516 // Not set if full schema for this field has no optional or repeated elements
517 LevelDecoder definition_level_decoder_;
518
519 // Not set for flat schemas.
520 LevelDecoder repetition_level_decoder_;
521
522 // The total number of values stored in the data page. This is the maximum of
523 // the number of encoded definition levels or encoded values. For
524 // non-repeated, required columns, this is equal to the number of encoded
525 // values. For repeated or optional values, there may be fewer data values
526 // than levels, and this tells you how many encoded levels there are in that
527 // case.
528 int64_t num_buffered_values_;
529
530 // The number of values from the current data page that have been decoded
531 // into memory
532 int64_t num_decoded_values_;
533
534 ::arrow::MemoryPool* pool_;
535
536 using DecoderType = typename EncodingTraits<DType>::Decoder;
537 DecoderType* current_decoder_;
538 Encoding::type current_encoding_;
539
540 /// Flag to signal when a new dictionary has been set, for the benefit of
541 /// DictionaryRecordReader
542 bool new_dictionary_;
543
544 // Map of encoding type to the respective decoder object. For example, a
545 // column chunk's data pages may include both dictionary-encoded and
546 // plain-encoded data.
547 std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_;
548
549 void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
550};
551
552// ----------------------------------------------------------------------
553// TypedColumnReader implementations
554
555template <typename DType>
556class TypedColumnReaderImpl : public TypedColumnReader<DType>,
557 public ColumnReaderImplBase<DType> {
558 public:
559 using T = typename DType::c_type;
560
561 TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
562 ::arrow::MemoryPool* pool)
563 : ColumnReaderImplBase<DType>(descr, pool) {
564 this->pager_ = std::move(pager);
565 }
566
567 bool HasNext() override { return this->HasNextInternal(); }
568
569 int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
570 T* values, int64_t* values_read) override;
571
572 int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
573 T* values, uint8_t* valid_bits, int64_t valid_bits_offset,
574 int64_t* levels_read, int64_t* values_read,
575 int64_t* null_count) override;
576
577 int64_t Skip(int64_t num_rows_to_skip) override;
578
579 Type::type type() const override { return this->descr_->physical_type(); }
580
581 const ColumnDescriptor* descr() const override { return this->descr_; }
582};
583
584template <typename DType>
585int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
586 int16_t* rep_levels, T* values,
587 int64_t* values_read) {
588 // HasNext invokes ReadNewPage
589 if (!HasNext()) {
590 *values_read = 0;
591 return 0;
592 }
593
594 // TODO(wesm): keep reading data pages until batch_size is reached, or the
595 // row group is finished
596 batch_size =
597 std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
598
599 int64_t num_def_levels = 0;
600 int64_t num_rep_levels = 0;
601
602 int64_t values_to_read = 0;
603
604 // If the field is required and non-repeated, there are no definition levels
605 if (this->max_def_level_ > 0 && def_levels) {
606 num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
607 // TODO(wesm): this tallying of values-to-decode can be performed with better
608 // cache-efficiency if fused with the level decoding.
609 for (int64_t i = 0; i < num_def_levels; ++i) {
610 if (def_levels[i] == this->max_def_level_) {
611 ++values_to_read;
612 }
613 }
614 } else {
615 // Required field, read all values
616 values_to_read = batch_size;
617 }
618
619 // Not present for non-repeated fields
620 if (this->max_rep_level_ > 0 && rep_levels) {
621 num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
622 if (def_levels && num_def_levels != num_rep_levels) {
623 throw ParquetException("Number of decoded rep / def levels did not match");
624 }
625 }
626
627 *values_read = this->ReadValues(values_to_read, values);
628 int64_t total_values = std::max(num_def_levels, *values_read);
629 this->ConsumeBufferedValues(total_values);
630
631 return total_values;
632}
633
634template <typename DType>
635int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
636 int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
637 uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
638 int64_t* values_read, int64_t* null_count_out) {
639 // HasNext invokes ReadNewPage
640 if (!HasNext()) {
641 *levels_read = 0;
642 *values_read = 0;
643 *null_count_out = 0;
644 return 0;
645 }
646
647 int64_t total_values;
648 // TODO(wesm): keep reading data pages until batch_size is reached, or the
649 // row group is finished
650 batch_size =
651 std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
652
653 // If the field is required and non-repeated, there are no definition levels
654 if (this->max_def_level_ > 0) {
655 int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
656
657 // Not present for non-repeated fields
658 if (this->max_rep_level_ > 0) {
659 int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
660 if (num_def_levels != num_rep_levels) {
661 throw ParquetException("Number of decoded rep / def levels did not match");
662 }
663 }
664
665 const bool has_spaced_values = internal::HasSpacedValues(this->descr_);
666
667 int64_t null_count = 0;
668 if (!has_spaced_values) {
669 int values_to_read = 0;
670 for (int64_t i = 0; i < num_def_levels; ++i) {
671 if (def_levels[i] == this->max_def_level_) {
672 ++values_to_read;
673 }
674 }
675 total_values = this->ReadValues(values_to_read, values);
676 for (int64_t i = 0; i < total_values; i++) {
677 ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
678 }
679 *values_read = total_values;
680 } else {
681 internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_,
682 this->max_rep_level_, values_read, &null_count,
683 valid_bits, valid_bits_offset);
684 total_values =
685 this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
686 valid_bits, valid_bits_offset);
687 }
688 *levels_read = num_def_levels;
689 *null_count_out = null_count;
690
691 } else {
692 // Required field, read all values
693 total_values = this->ReadValues(batch_size, values);
694 for (int64_t i = 0; i < total_values; i++) {
695 ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
696 }
697 *null_count_out = 0;
698 *levels_read = total_values;
699 }
700
701 this->ConsumeBufferedValues(*levels_read);
702 return total_values;
703}
704
705template <typename DType>
706int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_rows_to_skip) {
707 int64_t rows_to_skip = num_rows_to_skip;
708 while (HasNext() && rows_to_skip > 0) {
709 // If the number of rows to skip is more than the number of undecoded values, skip the
710 // Page.
711 if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) {
712 rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_;
713 this->num_decoded_values_ = this->num_buffered_values_;
714 } else {
715 // We need to read this Page
716 // Jump to the right offset in the Page
717 int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint
718 int64_t values_read = 0;
719
720 // This will be enough scratch space to accommodate 16-bit levels or any
721 // value type
722 std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer(
723 this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
724
725 do {
726 batch_size = std::min(batch_size, rows_to_skip);
727 values_read =
728 ReadBatch(static_cast<int>(batch_size),
729 reinterpret_cast<int16_t*>(scratch->mutable_data()),
730 reinterpret_cast<int16_t*>(scratch->mutable_data()),
731 reinterpret_cast<T*>(scratch->mutable_data()), &values_read);
732 rows_to_skip -= values_read;
733 } while (values_read > 0 && rows_to_skip > 0);
734 }
735 }
736 return num_rows_to_skip - rows_to_skip;
737}
738
739// ----------------------------------------------------------------------
740// Dynamic column reader constructor
741
742std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr,
743 std::unique_ptr<PageReader> pager,
744 MemoryPool* pool) {
745 switch (descr->physical_type()) {
746 case Type::BOOLEAN:
747 return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager),
748 pool);
749 case Type::INT32:
750 return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager),
751 pool);
752 case Type::INT64:
753 return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager),
754 pool);
755 case Type::INT96:
756 return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager),
757 pool);
758 case Type::FLOAT:
759 return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager),
760 pool);
761 case Type::DOUBLE:
762 return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager),
763 pool);
764 case Type::BYTE_ARRAY:
765 return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>(
766 descr, std::move(pager), pool);
767 case Type::FIXED_LEN_BYTE_ARRAY:
768 return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager),
769 pool);
770 default:
771 ParquetException::NYI("type reader not implemented");
772 }
773 // Unreachable code, but supress compiler warning
774 return std::shared_ptr<ColumnReader>(nullptr);
775}
776
777// ----------------------------------------------------------------------
778// RecordReader
779
780namespace internal {
781
782// The minimum number of repetition/definition levels to decode at a time, for
783// better vectorized performance when doing many smaller record reads
784constexpr int64_t kMinLevelBatchSize = 1024;
785
786template <typename DType>
787class TypedRecordReader : public ColumnReaderImplBase<DType>,
788 virtual public RecordReader {
789 public:
790 using T = typename DType::c_type;
791 using BASE = ColumnReaderImplBase<DType>;
792 TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) {
793 nullable_values_ = internal::HasSpacedValues(descr);
794 at_record_start_ = true;
795 records_read_ = 0;
796 values_written_ = 0;
797 values_capacity_ = 0;
798 null_count_ = 0;
799 levels_written_ = 0;
800 levels_position_ = 0;
801 levels_capacity_ = 0;
802 uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY);
803
804 if (uses_values_) {
805 values_ = AllocateBuffer(pool);
806 }
807 valid_bits_ = AllocateBuffer(pool);
808 def_levels_ = AllocateBuffer(pool);
809 rep_levels_ = AllocateBuffer(pool);
810 Reset();
811 }
812
813 int64_t available_values_current_page() const {
814 return this->num_buffered_values_ - this->num_decoded_values_;
815 }
816
817 int64_t ReadRecords(int64_t num_records) override {
818 // Delimit records, then read values at the end
819 int64_t records_read = 0;
820
821 if (levels_position_ < levels_written_) {
822 records_read += ReadRecordData(num_records);
823 }
824
825 int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
826
827 // If we are in the middle of a record, we continue until reaching the
828 // desired number of records or the end of the current record if we've found
829 // enough records
830 while (!at_record_start_ || records_read < num_records) {
831 // Is there more data to read in this row group?
832 if (!this->HasNextInternal()) {
833 if (!at_record_start_) {
834 // We ended the row group while inside a record that we haven't seen
835 // the end of yet. So increment the record count for the last record in
836 // the row group
837 ++records_read;
838 at_record_start_ = true;
839 }
840 break;
841 }
842
843 /// We perform multiple batch reads until we either exhaust the row group
844 /// or observe the desired number of records
845 int64_t batch_size = std::min(level_batch_size, available_values_current_page());
846
847 // No more data in column
848 if (batch_size == 0) {
849 break;
850 }
851
852 if (this->max_def_level_ > 0) {
853 ReserveLevels(batch_size);
854
855 int16_t* def_levels = this->def_levels() + levels_written_;
856 int16_t* rep_levels = this->rep_levels() + levels_written_;
857
858 // Not present for non-repeated fields
859 int64_t levels_read = 0;
860 if (this->max_rep_level_ > 0) {
861 levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
862 if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
863 throw ParquetException("Number of decoded rep / def levels did not match");
864 }
865 } else if (this->max_def_level_ > 0) {
866 levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
867 }
868
869 // Exhausted column chunk
870 if (levels_read == 0) {
871 break;
872 }
873
874 levels_written_ += levels_read;
875 records_read += ReadRecordData(num_records - records_read);
876 } else {
877 // No repetition or definition levels
878 batch_size = std::min(num_records - records_read, batch_size);
879 records_read += ReadRecordData(batch_size);
880 }
881 }
882
883 return records_read;
884 }
885
886 // We may outwardly have the appearance of having exhausted a column chunk
887 // when in fact we are in the middle of processing the last batch
888 bool has_values_to_process() const { return levels_position_ < levels_written_; }
889
890 std::shared_ptr<ResizableBuffer> ReleaseValues() override {
891 if (uses_values_) {
892 auto result = values_;
893 values_ = AllocateBuffer(this->pool_);
894 return result;
895 } else {
896 return nullptr;
897 }
898 }
899
900 std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
901 if (nullable_values_) {
902 auto result = valid_bits_;
903 valid_bits_ = AllocateBuffer(this->pool_);
904 return result;
905 } else {
906 return nullptr;
907 }
908 }
909
910 // Process written repetition/definition levels to reach the end of
911 // records. Process no more levels than necessary to delimit the indicated
912 // number of logical records. Updates internal state of RecordReader
913 //
914 // \return Number of records delimited
915 int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
916 int64_t values_to_read = 0;
917 int64_t records_read = 0;
918
919 const int16_t* def_levels = this->def_levels() + levels_position_;
920 const int16_t* rep_levels = this->rep_levels() + levels_position_;
921
922 DCHECK_GT(this->max_rep_level_, 0);
923
924 // Count logical records and number of values to read
925 while (levels_position_ < levels_written_) {
926 if (*rep_levels++ == 0) {
927 // If at_record_start_ is true, we are seeing the start of a record
928 // for the second time, such as after repeated calls to
929 // DelimitRecords. In this case we must continue until we find
930 // another record start or exhausting the ColumnChunk
931 if (!at_record_start_) {
932 // We've reached the end of a record; increment the record count.
933 ++records_read;
934 if (records_read == num_records) {
935 // We've found the number of records we were looking for. Set
936 // at_record_start_ to true and break
937 at_record_start_ = true;
938 break;
939 }
940 }
941 }
942
943 // We have decided to consume the level at this position; therefore we
944 // must advance until we find another record boundary
945 at_record_start_ = false;
946
947 if (*def_levels++ == this->max_def_level_) {
948 ++values_to_read;
949 }
950 ++levels_position_;
951 }
952 *values_seen = values_to_read;
953 return records_read;
954 }
955
956 void Reserve(int64_t capacity) override {
957 ReserveLevels(capacity);
958 ReserveValues(capacity);
959 }
960
961 void ReserveLevels(int64_t capacity) {
962 if (this->max_def_level_ > 0 && (levels_written_ + capacity > levels_capacity_)) {
963 int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1);
964 while (levels_written_ + capacity > new_levels_capacity) {
965 new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1);
966 }
967 PARQUET_THROW_NOT_OK(
968 def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
969 if (this->max_rep_level_ > 0) {
970 PARQUET_THROW_NOT_OK(
971 rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
972 }
973 levels_capacity_ = new_levels_capacity;
974 }
975 }
976
977 void ReserveValues(int64_t capacity) {
978 if (values_written_ + capacity > values_capacity_) {
979 int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1);
980 while (values_written_ + capacity > new_values_capacity) {
981 new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1);
982 }
983
984 int type_size = GetTypeByteSize(this->descr_->physical_type());
985
986 // XXX(wesm): A hack to avoid memory allocation when reading directly
987 // into builder classes
988 if (uses_values_) {
989 PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));
990 }
991
992 values_capacity_ = new_values_capacity;
993 }
994 if (nullable_values_) {
995 int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
996 if (valid_bits_->size() < valid_bytes_new) {
997 int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
998 PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
999
1000 // Avoid valgrind warnings
1001 memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
1002 valid_bytes_new - valid_bytes_old);
1003 }
1004 }
1005 }
1006
1007 void Reset() override {
1008 ResetValues();
1009
1010 if (levels_written_ > 0) {
1011 const int64_t levels_remaining = levels_written_ - levels_position_;
1012 // Shift remaining levels to beginning of buffer and trim to only the number
1013 // of decoded levels remaining
1014 int16_t* def_data = def_levels();
1015 int16_t* rep_data = rep_levels();
1016
1017 std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
1018 PARQUET_THROW_NOT_OK(
1019 def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
1020
1021 if (this->max_rep_level_ > 0) {
1022 std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
1023 PARQUET_THROW_NOT_OK(
1024 rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
1025 }
1026
1027 levels_written_ -= levels_position_;
1028 levels_position_ = 0;
1029 levels_capacity_ = levels_remaining;
1030 }
1031
1032 records_read_ = 0;
1033
1034 // Call Finish on the binary builders to reset them
1035 }
1036
1037 void SetPageReader(std::unique_ptr<PageReader> reader) override {
1038 at_record_start_ = true;
1039 this->pager_ = std::move(reader);
1040 ResetDecoders();
1041 }
1042
1043 bool HasMoreData() const override { return this->pager_ != nullptr; }
1044
1045 // Dictionary decoders must be reset when advancing row groups
1046 void ResetDecoders() { this->decoders_.clear(); }
1047
1048 virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
1049 uint8_t* valid_bits = valid_bits_->mutable_data();
1050 const int64_t valid_bits_offset = values_written_;
1051
1052 int64_t num_decoded = this->current_decoder_->DecodeSpaced(
1053 ValuesHead<T>(), static_cast<int>(values_with_nulls),
1054 static_cast<int>(null_count), valid_bits, valid_bits_offset);
1055 DCHECK_EQ(num_decoded, values_with_nulls);
1056 }
1057
1058 virtual void ReadValuesDense(int64_t values_to_read) {
1059 int64_t num_decoded =
1060 this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
1061 DCHECK_EQ(num_decoded, values_to_read);
1062 }
1063
1064 // Return number of logical records read
1065 int64_t ReadRecordData(int64_t num_records) {
1066 // Conservative upper bound
1067 const int64_t possible_num_values =
1068 std::max(num_records, levels_written_ - levels_position_);
1069 ReserveValues(possible_num_values);
1070
1071 const int64_t start_levels_position = levels_position_;
1072
1073 int64_t values_to_read = 0;
1074 int64_t records_read = 0;
1075 if (this->max_rep_level_ > 0) {
1076 records_read = DelimitRecords(num_records, &values_to_read);
1077 } else if (this->max_def_level_ > 0) {
1078 // No repetition levels, skip delimiting logic. Each level represents a
1079 // null or not null entry
1080 records_read = std::min(levels_written_ - levels_position_, num_records);
1081
1082 // This is advanced by DelimitRecords, which we skipped
1083 levels_position_ += records_read;
1084 } else {
1085 records_read = values_to_read = num_records;
1086 }
1087
1088 int64_t null_count = 0;
1089 if (nullable_values_) {
1090 int64_t values_with_nulls = 0;
1091 internal::DefinitionLevelsToBitmap(
1092 def_levels() + start_levels_position, levels_position_ - start_levels_position,
1093 this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count,
1094 valid_bits_->mutable_data(), values_written_);
1095 values_to_read = values_with_nulls - null_count;
1096 ReadValuesSpaced(values_with_nulls, null_count);
1097 } else {
1098 ReadValuesDense(values_to_read);
1099 }
1100 if (this->max_def_level_ > 0) {
1101 // Optional, repeated, or some mix thereof
1102 this->ConsumeBufferedValues(levels_position_ - start_levels_position);
1103 } else {
1104 // Flat, non-repeated
1105 this->ConsumeBufferedValues(values_to_read);
1106 }
1107 // Total values, including null spaces, if any
1108 values_written_ += values_to_read + null_count;
1109 null_count_ += null_count;
1110
1111 return records_read;
1112 }
1113
1114 void DebugPrintState() override {
1115 const int16_t* def_levels = this->def_levels();
1116 const int16_t* rep_levels = this->rep_levels();
1117 const int64_t total_levels_read = levels_position_;
1118
1119 const T* vals = reinterpret_cast<const T*>(this->values());
1120
1121 std::cout << "def levels: ";
1122 for (int64_t i = 0; i < total_levels_read; ++i) {
1123 std::cout << def_levels[i] << " ";
1124 }
1125 std::cout << std::endl;
1126
1127 std::cout << "rep levels: ";
1128 for (int64_t i = 0; i < total_levels_read; ++i) {
1129 std::cout << rep_levels[i] << " ";
1130 }
1131 std::cout << std::endl;
1132
1133 std::cout << "values: ";
1134 for (int64_t i = 0; i < this->values_written(); ++i) {
1135 std::cout << vals[i] << " ";
1136 }
1137 std::cout << std::endl;
1138 }
1139
1140 void ResetValues() {
1141 if (values_written_ > 0) {
1142 // Resize to 0, but do not shrink to fit
1143 if (uses_values_) {
1144 PARQUET_THROW_NOT_OK(values_->Resize(0, false));
1145 }
1146 PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
1147 values_written_ = 0;
1148 values_capacity_ = 0;
1149 null_count_ = 0;
1150 }
1151 }
1152
1153 protected:
1154 template <typename T>
1155 T* ValuesHead() {
1156 return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
1157 }
1158};
1159
1160class FLBARecordReader : public TypedRecordReader<FLBAType>,
1161 virtual public BinaryRecordReader {
1162 public:
1163 FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
1164 : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) {
1165 DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
1166 int byte_width = descr_->type_length();
1167 std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
1168 builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_));
1169 }
1170
1171 ::arrow::ArrayVector GetBuilderChunks() override {
1172 std::shared_ptr<::arrow::Array> chunk;
1173 PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
1174 return ::arrow::ArrayVector({chunk});
1175 }
1176
1177 void ReadValuesDense(int64_t values_to_read) override {
1178 auto values = ValuesHead<FLBA>();
1179 int64_t num_decoded =
1180 this->current_decoder_->Decode(values, static_cast<int>(values_to_read));
1181 DCHECK_EQ(num_decoded, values_to_read);
1182
1183 for (int64_t i = 0; i < num_decoded; i++) {
1184 PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
1185 }
1186 ResetValues();
1187 }
1188
1189 void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1190 uint8_t* valid_bits = valid_bits_->mutable_data();
1191 const int64_t valid_bits_offset = values_written_;
1192 auto values = ValuesHead<FLBA>();
1193
1194 int64_t num_decoded = this->current_decoder_->DecodeSpaced(
1195 values, static_cast<int>(values_to_read), static_cast<int>(null_count),
1196 valid_bits, valid_bits_offset);
1197 DCHECK_EQ(num_decoded, values_to_read);
1198
1199 for (int64_t i = 0; i < num_decoded; i++) {
1200 if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
1201 PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
1202 } else {
1203 PARQUET_THROW_NOT_OK(builder_->AppendNull());
1204 }
1205 }
1206 ResetValues();
1207 }
1208
1209 private:
1210 std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
1211};
1212
1213class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
1214 virtual public BinaryRecordReader {
1215 public:
1216 ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
1217 : TypedRecordReader<ByteArrayType>(descr, pool) {
1218 DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
1219 accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
1220 }
1221
1222 ::arrow::ArrayVector GetBuilderChunks() override {
1223 ::arrow::ArrayVector result = accumulator_.chunks;
1224 if (result.size() == 0 || accumulator_.builder->length() > 0) {
1225 std::shared_ptr<::arrow::Array> last_chunk;
1226 PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
1227 result.push_back(last_chunk);
1228 }
1229 accumulator_.chunks = {};
1230 return result;
1231 }
1232
1233 void ReadValuesDense(int64_t values_to_read) override {
1234 int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
1235 static_cast<int>(values_to_read), &accumulator_);
1236 DCHECK_EQ(num_decoded, values_to_read);
1237 ResetValues();
1238 }
1239
1240 void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1241 int64_t num_decoded = this->current_decoder_->DecodeArrow(
1242 static_cast<int>(values_to_read), static_cast<int>(null_count),
1243 valid_bits_->mutable_data(), values_written_, &accumulator_);
1244 DCHECK_EQ(num_decoded, values_to_read - null_count);
1245 ResetValues();
1246 }
1247
1248 private:
1249 // Helper data structure for accumulating builder chunks
1250 ArrowBinaryAccumulator accumulator_;
1251};
1252
1253class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
1254 virtual public DictionaryRecordReader {
1255 public:
1256 ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr,
1257 ::arrow::MemoryPool* pool)
1258 : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) {
1259 this->read_dictionary_ = true;
1260 }
1261
1262 std::shared_ptr<::arrow::ChunkedArray> GetResult() override {
1263 FlushBuilder();
1264 return std::make_shared<::arrow::ChunkedArray>(result_chunks_, builder_.type());
1265 }
1266
1267 void FlushBuilder() {
1268 if (builder_.length() > 0) {
1269 std::shared_ptr<::arrow::Array> chunk;
1270 PARQUET_THROW_NOT_OK(builder_.Finish(&chunk));
1271 result_chunks_.emplace_back(std::move(chunk));
1272
1273 // Also clears the dictionary memo table
1274 builder_.ResetFull();
1275 }
1276 }
1277
1278 void MaybeWriteNewDictionary() {
1279 if (this->new_dictionary_) {
1280 /// If there is a new dictionary, we may need to flush the builder, then
1281 /// insert the new dictionary values
1282 FlushBuilder();
1283 auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1284 decoder->InsertDictionary(&builder_);
1285 this->new_dictionary_ = false;
1286 }
1287 }
1288
1289 void ReadValuesDense(int64_t values_to_read) override {
1290 int64_t num_decoded = 0;
1291 if (current_encoding_ == Encoding::RLE_DICTIONARY) {
1292 MaybeWriteNewDictionary();
1293 auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1294 num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_);
1295 } else {
1296 num_decoded = this->current_decoder_->DecodeArrowNonNull(
1297 static_cast<int>(values_to_read), &builder_);
1298
1299 /// Flush values since they have been copied into the builder
1300 ResetValues();
1301 }
1302 DCHECK_EQ(num_decoded, values_to_read);
1303 }
1304
1305 void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1306 int64_t num_decoded = 0;
1307 if (current_encoding_ == Encoding::RLE_DICTIONARY) {
1308 MaybeWriteNewDictionary();
1309 auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1310 num_decoded = decoder->DecodeIndicesSpaced(
1311 static_cast<int>(values_to_read), static_cast<int>(null_count),
1312 valid_bits_->mutable_data(), values_written_, &builder_);
1313 } else {
1314 num_decoded = this->current_decoder_->DecodeArrow(
1315 static_cast<int>(values_to_read), static_cast<int>(null_count),
1316 valid_bits_->mutable_data(), values_written_, &builder_);
1317
1318 /// Flush values since they have been copied into the builder
1319 ResetValues();
1320 }
1321 DCHECK_EQ(num_decoded, values_to_read - null_count);
1322 }
1323
1324 private:
1325 using BinaryDictDecoder = DictDecoder<ByteArrayType>;
1326
1327 ::arrow::BinaryDictionary32Builder builder_;
1328 std::vector<std::shared_ptr<::arrow::Array>> result_chunks_;
1329};
1330
1331// TODO(wesm): Implement these to some satisfaction
1332template <>
1333void TypedRecordReader<Int96Type>::DebugPrintState() {}
1334
1335template <>
1336void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
1337
1338template <>
1339void TypedRecordReader<FLBAType>::DebugPrintState() {}
1340
1341std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor* descr,
1342 arrow::MemoryPool* pool,
1343 bool read_dictionary) {
1344 if (read_dictionary) {
1345 return std::make_shared<ByteArrayDictionaryRecordReader>(descr, pool);
1346 } else {
1347 return std::make_shared<ByteArrayChunkedRecordReader>(descr, pool);
1348 }
1349}
1350
1351std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
1352 MemoryPool* pool,
1353 const bool read_dictionary) {
1354 switch (descr->physical_type()) {
1355 case Type::BOOLEAN:
1356 return std::make_shared<TypedRecordReader<BooleanType>>(descr, pool);
1357 case Type::INT32:
1358 return std::make_shared<TypedRecordReader<Int32Type>>(descr, pool);
1359 case Type::INT64:
1360 return std::make_shared<TypedRecordReader<Int64Type>>(descr, pool);
1361 case Type::INT96:
1362 return std::make_shared<TypedRecordReader<Int96Type>>(descr, pool);
1363 case Type::FLOAT:
1364 return std::make_shared<TypedRecordReader<FloatType>>(descr, pool);
1365 case Type::DOUBLE:
1366 return std::make_shared<TypedRecordReader<DoubleType>>(descr, pool);
1367 case Type::BYTE_ARRAY:
1368 return MakeByteArrayRecordReader(descr, pool, read_dictionary);
1369 case Type::FIXED_LEN_BYTE_ARRAY:
1370 return std::make_shared<FLBARecordReader>(descr, pool);
1371 default: {
1372 // PARQUET-1481: This can occur if the file is corrupt
1373 std::stringstream ss;
1374 ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type());
1375 throw ParquetException(ss.str());
1376 }
1377 }
1378 // Unreachable code, but supress compiler warning
1379 return nullptr;
1380}
1381
1382} // namespace internal
1383} // namespace parquet
1384