1// licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/arrow/record_reader.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <iostream>
24#include <memory>
25#include <sstream>
26#include <unordered_map>
27#include <utility>
28
29#include "arrow/buffer.h"
30#include "arrow/builder.h"
31#include "arrow/memory_pool.h"
32#include "arrow/status.h"
33#include "arrow/type.h"
34#include "arrow/util/bit-util.h"
35#include "arrow/util/logging.h"
36#include "arrow/util/rle-encoding.h"
37
38#include "parquet/column_page.h"
39#include "parquet/column_reader.h"
40#include "parquet/encoding-internal.h"
41#include "parquet/encoding.h"
42#include "parquet/exception.h"
43#include "parquet/properties.h"
44#include "parquet/schema.h"
45#include "parquet/types.h"
46
47using arrow::MemoryPool;
48
49namespace parquet {
50namespace internal {
51
52namespace BitUtil = ::arrow::BitUtil;
53
54template <typename DType>
55class TypedRecordReader;
56
57// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
58// encoding.
59static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
60 return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
61}
62
63// The minimum number of repetition/definition levels to decode at a time, for
64// better vectorized performance when doing many smaller record reads
65constexpr int64_t kMinLevelBatchSize = 1024;
66
67class RecordReader::RecordReaderImpl {
68 public:
69 RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool)
70 : descr_(descr),
71 pool_(pool),
72 num_buffered_values_(0),
73 num_decoded_values_(0),
74 max_def_level_(descr->max_definition_level()),
75 max_rep_level_(descr->max_repetition_level()),
76 at_record_start_(true),
77 records_read_(0),
78 values_written_(0),
79 values_capacity_(0),
80 null_count_(0),
81 levels_written_(0),
82 levels_position_(0),
83 levels_capacity_(0) {
84 nullable_values_ = internal::HasSpacedValues(descr);
85 values_ = AllocateBuffer(pool);
86 valid_bits_ = AllocateBuffer(pool);
87 def_levels_ = AllocateBuffer(pool);
88 rep_levels_ = AllocateBuffer(pool);
89 Reset();
90 }
91
92 virtual ~RecordReaderImpl() = default;
93
94 virtual int64_t ReadRecordData(const int64_t num_records) = 0;
95
96 // Returns true if there are still values in this column.
97 bool HasNext() {
98 // Either there is no data page available yet, or the data page has been
99 // exhausted
100 if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
101 if (!ReadNewPage() || num_buffered_values_ == 0) {
102 return false;
103 }
104 }
105 return true;
106 }
107
108 int64_t ReadRecords(int64_t num_records) {
109 // Delimit records, then read values at the end
110 int64_t records_read = 0;
111
112 if (levels_position_ < levels_written_) {
113 records_read += ReadRecordData(num_records);
114 }
115
116 int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
117
118 // If we are in the middle of a record, we continue until reaching the
119 // desired number of records or the end of the current record if we've found
120 // enough records
121 while (!at_record_start_ || records_read < num_records) {
122 // Is there more data to read in this row group?
123 if (!HasNext()) {
124 if (!at_record_start_) {
125 // We ended the row group while inside a record that we haven't seen
126 // the end of yet. So increment the record count for the last record in
127 // the row group
128 ++records_read;
129 at_record_start_ = true;
130 }
131 break;
132 }
133
134 /// We perform multiple batch reads until we either exhaust the row group
135 /// or observe the desired number of records
136 int64_t batch_size = std::min(level_batch_size, available_values_current_page());
137
138 // No more data in column
139 if (batch_size == 0) {
140 break;
141 }
142
143 if (max_def_level_ > 0) {
144 ReserveLevels(batch_size);
145
146 int16_t* def_levels = this->def_levels() + levels_written_;
147 int16_t* rep_levels = this->rep_levels() + levels_written_;
148
149 // Not present for non-repeated fields
150 int64_t levels_read = 0;
151 if (max_rep_level_ > 0) {
152 levels_read = ReadDefinitionLevels(batch_size, def_levels);
153 if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
154 throw ParquetException("Number of decoded rep / def levels did not match");
155 }
156 } else if (max_def_level_ > 0) {
157 levels_read = ReadDefinitionLevels(batch_size, def_levels);
158 }
159
160 // Exhausted column chunk
161 if (levels_read == 0) {
162 break;
163 }
164
165 levels_written_ += levels_read;
166 records_read += ReadRecordData(num_records - records_read);
167 } else {
168 // No repetition or definition levels
169 batch_size = std::min(num_records - records_read, batch_size);
170 records_read += ReadRecordData(batch_size);
171 }
172 }
173
174 return records_read;
175 }
176
177 // Dictionary decoders must be reset when advancing row groups
178 virtual void ResetDecoders() = 0;
179
180 void SetPageReader(std::unique_ptr<PageReader> reader) {
181 at_record_start_ = true;
182 pager_ = std::move(reader);
183 ResetDecoders();
184 }
185
186 bool HasMoreData() const { return pager_ != nullptr; }
187
188 int16_t* def_levels() const {
189 return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
190 }
191
192 int16_t* rep_levels() {
193 return reinterpret_cast<int16_t*>(rep_levels_->mutable_data());
194 }
195
196 uint8_t* values() const { return values_->mutable_data(); }
197
198 /// \brief Number of values written including nulls (if any)
199 int64_t values_written() const { return values_written_; }
200
201 int64_t levels_position() const { return levels_position_; }
202 int64_t levels_written() const { return levels_written_; }
203
204 // We may outwardly have the appearance of having exhausted a column chunk
205 // when in fact we are in the middle of processing the last batch
206 bool has_values_to_process() const { return levels_position_ < levels_written_; }
207
208 int64_t null_count() const { return null_count_; }
209
210 bool nullable_values() const { return nullable_values_; }
211
212 std::shared_ptr<ResizableBuffer> ReleaseValues() {
213 auto result = values_;
214 values_ = AllocateBuffer(pool_);
215 return result;
216 }
217
218 std::shared_ptr<ResizableBuffer> ReleaseIsValid() {
219 auto result = valid_bits_;
220 valid_bits_ = AllocateBuffer(pool_);
221 return result;
222 }
223
224 // Process written repetition/definition levels to reach the end of
225 // records. Process no more levels than necessary to delimit the indicated
226 // number of logical records. Updates internal state of RecordReader
227 //
228 // \return Number of records delimited
229 int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
230 int64_t values_to_read = 0;
231 int64_t records_read = 0;
232
233 const int16_t* def_levels = this->def_levels() + levels_position_;
234 const int16_t* rep_levels = this->rep_levels() + levels_position_;
235
236 DCHECK_GT(max_rep_level_, 0);
237
238 // Count logical records and number of values to read
239 while (levels_position_ < levels_written_) {
240 if (*rep_levels++ == 0) {
241 // If at_record_start_ is true, we are seeing the start of a record
242 // for the second time, such as after repeated calls to
243 // DelimitRecords. In this case we must continue until we find
244 // another record start or exhausting the ColumnChunk
245 if (!at_record_start_) {
246 // We've reached the end of a record; increment the record count.
247 ++records_read;
248 if (records_read == num_records) {
249 // We've found the number of records we were looking for. Set
250 // at_record_start_ to true and break
251 at_record_start_ = true;
252 break;
253 }
254 }
255 }
256
257 // We have decided to consume the level at this position; therefore we
258 // must advance until we find another record boundary
259 at_record_start_ = false;
260
261 if (*def_levels++ == max_def_level_) {
262 ++values_to_read;
263 }
264 ++levels_position_;
265 }
266 *values_seen = values_to_read;
267 return records_read;
268 }
269
270 // Read multiple definition levels into preallocated memory
271 //
272 // Returns the number of decoded definition levels
273 int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
274 if (descr_->max_definition_level() == 0) {
275 return 0;
276 }
277 return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
278 }
279
280 int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
281 if (descr_->max_repetition_level() == 0) {
282 return 0;
283 }
284 return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
285 }
286
287 int64_t available_values_current_page() const {
288 return num_buffered_values_ - num_decoded_values_;
289 }
290
291 void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
292
293 Type::type type() const { return descr_->physical_type(); }
294
295 const ColumnDescriptor* descr() const { return descr_; }
296
297 void Reserve(int64_t capacity) {
298 ReserveLevels(capacity);
299 ReserveValues(capacity);
300 }
301
302 void ReserveLevels(int64_t capacity) {
303 if (descr_->max_definition_level() > 0 &&
304 (levels_written_ + capacity > levels_capacity_)) {
305 int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1);
306 while (levels_written_ + capacity > new_levels_capacity) {
307 new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1);
308 }
309 PARQUET_THROW_NOT_OK(
310 def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
311 if (descr_->max_repetition_level() > 0) {
312 PARQUET_THROW_NOT_OK(
313 rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
314 }
315 levels_capacity_ = new_levels_capacity;
316 }
317 }
318
319 void ReserveValues(int64_t capacity) {
320 if (values_written_ + capacity > values_capacity_) {
321 int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1);
322 while (values_written_ + capacity > new_values_capacity) {
323 new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1);
324 }
325
326 int type_size = GetTypeByteSize(descr_->physical_type());
327 PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));
328 values_capacity_ = new_values_capacity;
329 }
330 if (nullable_values_) {
331 int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
332 if (valid_bits_->size() < valid_bytes_new) {
333 int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
334 PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
335
336 // Avoid valgrind warnings
337 memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
338 valid_bytes_new - valid_bytes_old);
339 }
340 }
341 }
342
343 void Reset() {
344 ResetValues();
345
346 if (levels_written_ > 0) {
347 const int64_t levels_remaining = levels_written_ - levels_position_;
348 // Shift remaining levels to beginning of buffer and trim to only the number
349 // of decoded levels remaining
350 int16_t* def_data = def_levels();
351 int16_t* rep_data = rep_levels();
352
353 std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
354 std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
355
356 PARQUET_THROW_NOT_OK(
357 def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
358 PARQUET_THROW_NOT_OK(
359 rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
360
361 levels_written_ -= levels_position_;
362 levels_position_ = 0;
363 levels_capacity_ = levels_remaining;
364 }
365
366 records_read_ = 0;
367
368 // Call Finish on the binary builders to reset them
369 }
370
371 void ResetValues() {
372 if (values_written_ > 0) {
373 // Resize to 0, but do not shrink to fit
374 PARQUET_THROW_NOT_OK(values_->Resize(0, false));
375 PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
376 values_written_ = 0;
377 values_capacity_ = 0;
378 null_count_ = 0;
379 }
380 }
381
382 virtual void DebugPrintState() = 0;
383
384 virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0;
385
386 protected:
387 virtual bool ReadNewPage() = 0;
388
389 const ColumnDescriptor* descr_;
390 ::arrow::MemoryPool* pool_;
391
392 std::unique_ptr<PageReader> pager_;
393 std::shared_ptr<Page> current_page_;
394
395 // Not set if full schema for this field has no optional or repeated elements
396 LevelDecoder definition_level_decoder_;
397
398 // Not set for flat schemas.
399 LevelDecoder repetition_level_decoder_;
400
401 // The total number of values stored in the data page. This is the maximum of
402 // the number of encoded definition levels or encoded values. For
403 // non-repeated, required columns, this is equal to the number of encoded
404 // values. For repeated or optional values, there may be fewer data values
405 // than levels, and this tells you how many encoded levels there are in that
406 // case.
407 int64_t num_buffered_values_;
408
409 // The number of values from the current data page that have been decoded
410 // into memory
411 int64_t num_decoded_values_;
412
413 const int16_t max_def_level_;
414 const int16_t max_rep_level_;
415
416 bool nullable_values_;
417
418 bool at_record_start_;
419 int64_t records_read_;
420
421 int64_t values_written_;
422 int64_t values_capacity_;
423 int64_t null_count_;
424
425 int64_t levels_written_;
426 int64_t levels_position_;
427 int64_t levels_capacity_;
428
429 std::shared_ptr<::arrow::ResizableBuffer> values_;
430
431 template <typename T>
432 T* ValuesHead() {
433 return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
434 }
435
436 std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
437 std::shared_ptr<::arrow::ResizableBuffer> def_levels_;
438 std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
439};
440
441template <typename DType>
442struct RecordReaderTraits {
443 using BuilderType = ::arrow::ArrayBuilder;
444};
445
446template <>
447struct RecordReaderTraits<ByteArrayType> {
448 using BuilderType = ::arrow::internal::ChunkedBinaryBuilder;
449};
450
451template <>
452struct RecordReaderTraits<FLBAType> {
453 using BuilderType = ::arrow::FixedSizeBinaryBuilder;
454};
455
456template <typename DType>
457class TypedRecordReader : public RecordReader::RecordReaderImpl {
458 public:
459 using T = typename DType::c_type;
460
461 using BuilderType = typename RecordReaderTraits<DType>::BuilderType;
462
463 TypedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
464 : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) {
465 InitializeBuilder();
466 }
467
468 void ResetDecoders() override { decoders_.clear(); }
469
470 inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
471 uint8_t* valid_bits = valid_bits_->mutable_data();
472 const int64_t valid_bits_offset = values_written_;
473
474 int64_t num_decoded = current_decoder_->DecodeSpaced(
475 ValuesHead<T>(), static_cast<int>(values_with_nulls),
476 static_cast<int>(null_count), valid_bits, valid_bits_offset);
477 DCHECK_EQ(num_decoded, values_with_nulls);
478 }
479
480 inline void ReadValuesDense(int64_t values_to_read) {
481 int64_t num_decoded =
482 current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
483 DCHECK_EQ(num_decoded, values_to_read);
484 }
485
486 // Return number of logical records read
487 int64_t ReadRecordData(const int64_t num_records) override {
488 // Conservative upper bound
489 const int64_t possible_num_values =
490 std::max(num_records, levels_written_ - levels_position_);
491 ReserveValues(possible_num_values);
492
493 const int64_t start_levels_position = levels_position_;
494
495 int64_t values_to_read = 0;
496 int64_t records_read = 0;
497 if (max_rep_level_ > 0) {
498 records_read = DelimitRecords(num_records, &values_to_read);
499 } else if (max_def_level_ > 0) {
500 // No repetition levels, skip delimiting logic. Each level represents a
501 // null or not null entry
502 records_read = std::min(levels_written_ - levels_position_, num_records);
503
504 // This is advanced by DelimitRecords, which we skipped
505 levels_position_ += records_read;
506 } else {
507 records_read = values_to_read = num_records;
508 }
509
510 int64_t null_count = 0;
511 if (nullable_values_) {
512 int64_t values_with_nulls = 0;
513 internal::DefinitionLevelsToBitmap(
514 def_levels() + start_levels_position, levels_position_ - start_levels_position,
515 max_def_level_, max_rep_level_, &values_with_nulls, &null_count,
516 valid_bits_->mutable_data(), values_written_);
517 values_to_read = values_with_nulls - null_count;
518 ReadValuesSpaced(values_with_nulls, null_count);
519 ConsumeBufferedValues(levels_position_ - start_levels_position);
520 } else {
521 ReadValuesDense(values_to_read);
522 ConsumeBufferedValues(values_to_read);
523 }
524 // Total values, including null spaces, if any
525 values_written_ += values_to_read + null_count;
526 null_count_ += null_count;
527
528 return records_read;
529 }
530
531 void DebugPrintState() override {
532 const int16_t* def_levels = this->def_levels();
533 const int16_t* rep_levels = this->rep_levels();
534 const int64_t total_levels_read = levels_position_;
535
536 const T* values = reinterpret_cast<const T*>(this->values());
537
538 std::cout << "def levels: ";
539 for (int64_t i = 0; i < total_levels_read; ++i) {
540 std::cout << def_levels[i] << " ";
541 }
542 std::cout << std::endl;
543
544 std::cout << "rep levels: ";
545 for (int64_t i = 0; i < total_levels_read; ++i) {
546 std::cout << rep_levels[i] << " ";
547 }
548 std::cout << std::endl;
549
550 std::cout << "values: ";
551 for (int64_t i = 0; i < this->values_written(); ++i) {
552 std::cout << values[i] << " ";
553 }
554 std::cout << std::endl;
555 }
556
557 std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() override {
558 throw ParquetException("GetChunks only implemented for binary types");
559 }
560
561 private:
562 typedef Decoder<DType> DecoderType;
563
564 // Map of encoding type to the respective decoder object. For example, a
565 // column chunk's data pages may include both dictionary-encoded and
566 // plain-encoded data.
567 std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
568
569 std::unique_ptr<BuilderType> builder_;
570
571 DecoderType* current_decoder_;
572
573 // Advance to the next data page
574 bool ReadNewPage() override;
575
576 void InitializeBuilder() {}
577
578 void ConfigureDictionary(const DictionaryPage* page);
579};
580
581// TODO(wesm): Implement these to some satisfaction
582template <>
583void TypedRecordReader<Int96Type>::DebugPrintState() {}
584
585template <>
586void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
587
588template <>
589void TypedRecordReader<FLBAType>::DebugPrintState() {}
590
591template <>
592void TypedRecordReader<ByteArrayType>::InitializeBuilder() {
593 // Maximum of 16MB chunks
594 constexpr int32_t kBinaryChunksize = 1 << 24;
595 DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
596 builder_.reset(new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_));
597}
598
599template <>
600void TypedRecordReader<FLBAType>::InitializeBuilder() {
601 DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
602 int byte_width = descr_->type_length();
603 std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
604 builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_));
605}
606
607template <>
608::arrow::ArrayVector TypedRecordReader<ByteArrayType>::GetBuilderChunks() {
609 ::arrow::ArrayVector chunks;
610 PARQUET_THROW_NOT_OK(builder_->Finish(&chunks));
611 return chunks;
612}
613
614template <>
615::arrow::ArrayVector TypedRecordReader<FLBAType>::GetBuilderChunks() {
616 std::shared_ptr<::arrow::Array> chunk;
617 PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
618 return ::arrow::ArrayVector({chunk});
619}
620
621template <>
622inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
623 auto values = ValuesHead<ByteArray>();
624 int64_t num_decoded =
625 current_decoder_->Decode(values, static_cast<int>(values_to_read));
626 DCHECK_EQ(num_decoded, values_to_read);
627
628 for (int64_t i = 0; i < num_decoded; i++) {
629 PARQUET_THROW_NOT_OK(
630 builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
631 }
632 ResetValues();
633}
634
635template <>
636inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read) {
637 auto values = ValuesHead<FLBA>();
638 int64_t num_decoded =
639 current_decoder_->Decode(values, static_cast<int>(values_to_read));
640 DCHECK_EQ(num_decoded, values_to_read);
641
642 for (int64_t i = 0; i < num_decoded; i++) {
643 PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
644 }
645 ResetValues();
646}
647
648template <>
649inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read,
650 int64_t null_count) {
651 uint8_t* valid_bits = valid_bits_->mutable_data();
652 const int64_t valid_bits_offset = values_written_;
653 auto values = ValuesHead<ByteArray>();
654
655 int64_t num_decoded = current_decoder_->DecodeSpaced(
656 values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
657 valid_bits_offset);
658 DCHECK_EQ(num_decoded, values_to_read);
659
660 for (int64_t i = 0; i < num_decoded; i++) {
661 if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
662 PARQUET_THROW_NOT_OK(
663 builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
664 } else {
665 PARQUET_THROW_NOT_OK(builder_->AppendNull());
666 }
667 }
668 ResetValues();
669}
670
671template <>
672inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read,
673 int64_t null_count) {
674 uint8_t* valid_bits = valid_bits_->mutable_data();
675 const int64_t valid_bits_offset = values_written_;
676 auto values = ValuesHead<FLBA>();
677
678 int64_t num_decoded = current_decoder_->DecodeSpaced(
679 values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
680 valid_bits_offset);
681 DCHECK_EQ(num_decoded, values_to_read);
682
683 for (int64_t i = 0; i < num_decoded; i++) {
684 if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
685 PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
686 } else {
687 PARQUET_THROW_NOT_OK(builder_->AppendNull());
688 }
689 }
690 ResetValues();
691}
692
693template <typename DType>
694inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
695 int encoding = static_cast<int>(page->encoding());
696 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
697 page->encoding() == Encoding::PLAIN) {
698 encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
699 }
700
701 auto it = decoders_.find(encoding);
702 if (it != decoders_.end()) {
703 throw ParquetException("Column cannot have more than one dictionary.");
704 }
705
706 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
707 page->encoding() == Encoding::PLAIN) {
708 PlainDecoder<DType> dictionary(descr_);
709 dictionary.SetData(page->num_values(), page->data(), page->size());
710
711 // The dictionary is fully decoded during DictionaryDecoder::Init, so the
712 // DictionaryPage buffer is no longer required after this step
713 //
714 // TODO(wesm): investigate whether this all-or-nothing decoding of the
715 // dictionary makes sense and whether performance can be improved
716
717 auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
718 decoder->SetDict(&dictionary);
719 decoders_[encoding] = decoder;
720 } else {
721 ParquetException::NYI("only plain dictionary encoding has been implemented");
722 }
723
724 current_decoder_ = decoders_[encoding].get();
725}
726
727template <typename DType>
728bool TypedRecordReader<DType>::ReadNewPage() {
729 // Loop until we find the next data page.
730 const uint8_t* buffer;
731
732 while (true) {
733 current_page_ = pager_->NextPage();
734 if (!current_page_) {
735 // EOS
736 return false;
737 }
738
739 if (current_page_->type() == PageType::DICTIONARY_PAGE) {
740 ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
741 continue;
742 } else if (current_page_->type() == PageType::DATA_PAGE) {
743 const DataPage* page = static_cast<const DataPage*>(current_page_.get());
744
745 // Read a data page.
746 num_buffered_values_ = page->num_values();
747
748 // Have not decoded any values from the data page yet
749 num_decoded_values_ = 0;
750
751 buffer = page->data();
752
753 // If the data page includes repetition and definition levels, we
754 // initialize the level decoder and subtract the encoded level bytes from
755 // the page size to determine the number of bytes in the encoded data.
756 int64_t data_size = page->size();
757
758 // Data page Layout: Repetition Levels - Definition Levels - encoded values.
759 // Levels are encoded as rle or bit-packed.
760 // Init repetition levels
761 if (descr_->max_repetition_level() > 0) {
762 int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
763 page->repetition_level_encoding(), descr_->max_repetition_level(),
764 static_cast<int>(num_buffered_values_), buffer);
765 buffer += rep_levels_bytes;
766 data_size -= rep_levels_bytes;
767 }
768 // TODO figure a way to set max_definition_level_ to 0
769 // if the initial value is invalid
770
771 // Init definition levels
772 if (descr_->max_definition_level() > 0) {
773 int64_t def_levels_bytes = definition_level_decoder_.SetData(
774 page->definition_level_encoding(), descr_->max_definition_level(),
775 static_cast<int>(num_buffered_values_), buffer);
776 buffer += def_levels_bytes;
777 data_size -= def_levels_bytes;
778 }
779
780 // Get a decoder object for this page or create a new decoder if this is the
781 // first page with this encoding.
782 Encoding::type encoding = page->encoding();
783
784 if (IsDictionaryIndexEncoding(encoding)) {
785 encoding = Encoding::RLE_DICTIONARY;
786 }
787
788 auto it = decoders_.find(static_cast<int>(encoding));
789 if (it != decoders_.end()) {
790 if (encoding == Encoding::RLE_DICTIONARY) {
791 DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
792 }
793 current_decoder_ = it->second.get();
794 } else {
795 switch (encoding) {
796 case Encoding::PLAIN: {
797 std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
798 decoders_[static_cast<int>(encoding)] = decoder;
799 current_decoder_ = decoder.get();
800 break;
801 }
802 case Encoding::RLE_DICTIONARY:
803 throw ParquetException("Dictionary page must be before data page.");
804
805 case Encoding::DELTA_BINARY_PACKED:
806 case Encoding::DELTA_LENGTH_BYTE_ARRAY:
807 case Encoding::DELTA_BYTE_ARRAY:
808 ParquetException::NYI("Unsupported encoding");
809
810 default:
811 throw ParquetException("Unknown encoding type.");
812 }
813 }
814 current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
815 static_cast<int>(data_size));
816 return true;
817 } else {
818 // We don't know what this page type is. We're allowed to skip non-data
819 // pages.
820 continue;
821 }
822 }
823 return true;
824}
825
826std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
827 MemoryPool* pool) {
828 switch (descr->physical_type()) {
829 case Type::BOOLEAN:
830 return std::shared_ptr<RecordReader>(
831 new RecordReader(new TypedRecordReader<BooleanType>(descr, pool)));
832 case Type::INT32:
833 return std::shared_ptr<RecordReader>(
834 new RecordReader(new TypedRecordReader<Int32Type>(descr, pool)));
835 case Type::INT64:
836 return std::shared_ptr<RecordReader>(
837 new RecordReader(new TypedRecordReader<Int64Type>(descr, pool)));
838 case Type::INT96:
839 return std::shared_ptr<RecordReader>(
840 new RecordReader(new TypedRecordReader<Int96Type>(descr, pool)));
841 case Type::FLOAT:
842 return std::shared_ptr<RecordReader>(
843 new RecordReader(new TypedRecordReader<FloatType>(descr, pool)));
844 case Type::DOUBLE:
845 return std::shared_ptr<RecordReader>(
846 new RecordReader(new TypedRecordReader<DoubleType>(descr, pool)));
847 case Type::BYTE_ARRAY:
848 return std::shared_ptr<RecordReader>(
849 new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool)));
850 case Type::FIXED_LEN_BYTE_ARRAY:
851 return std::shared_ptr<RecordReader>(
852 new RecordReader(new TypedRecordReader<FLBAType>(descr, pool)));
853 default: {
854 // PARQUET-1481: This can occur if the file is corrupt
855 std::stringstream ss;
856 ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type());
857 throw ParquetException(ss.str());
858 }
859 }
860 // Unreachable code, but supress compiler warning
861 return nullptr;
862}
863
864// ----------------------------------------------------------------------
865// Implement public API
866
867RecordReader::RecordReader(RecordReaderImpl* impl) { impl_.reset(impl); }
868
869RecordReader::~RecordReader() {}
870
871int64_t RecordReader::ReadRecords(int64_t num_records) {
872 return impl_->ReadRecords(num_records);
873}
874
875void RecordReader::Reset() { return impl_->Reset(); }
876
877void RecordReader::Reserve(int64_t num_values) { impl_->Reserve(num_values); }
878
879const int16_t* RecordReader::def_levels() const { return impl_->def_levels(); }
880
881const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels(); }
882
883const uint8_t* RecordReader::values() const { return impl_->values(); }
884
885std::shared_ptr<ResizableBuffer> RecordReader::ReleaseValues() {
886 return impl_->ReleaseValues();
887}
888
889std::shared_ptr<ResizableBuffer> RecordReader::ReleaseIsValid() {
890 return impl_->ReleaseIsValid();
891}
892
893int64_t RecordReader::values_written() const { return impl_->values_written(); }
894
895int64_t RecordReader::levels_position() const { return impl_->levels_position(); }
896
897int64_t RecordReader::levels_written() const { return impl_->levels_written(); }
898
899int64_t RecordReader::null_count() const { return impl_->null_count(); }
900
901bool RecordReader::nullable_values() const { return impl_->nullable_values(); }
902
903bool RecordReader::HasMoreData() const { return impl_->HasMoreData(); }
904
905void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
906 impl_->SetPageReader(std::move(reader));
907}
908
909::arrow::ArrayVector RecordReader::GetBuilderChunks() {
910 return impl_->GetBuilderChunks();
911}
912
913void RecordReader::DebugPrintState() { impl_->DebugPrintState(); }
914
915} // namespace internal
916} // namespace parquet
917