1 | // licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/arrow/record_reader.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstring> |
23 | #include <iostream> |
24 | #include <memory> |
25 | #include <sstream> |
26 | #include <unordered_map> |
27 | #include <utility> |
28 | |
29 | #include "arrow/buffer.h" |
30 | #include "arrow/builder.h" |
31 | #include "arrow/memory_pool.h" |
32 | #include "arrow/status.h" |
33 | #include "arrow/type.h" |
34 | #include "arrow/util/bit-util.h" |
35 | #include "arrow/util/logging.h" |
36 | #include "arrow/util/rle-encoding.h" |
37 | |
38 | #include "parquet/column_page.h" |
39 | #include "parquet/column_reader.h" |
40 | #include "parquet/encoding-internal.h" |
41 | #include "parquet/encoding.h" |
42 | #include "parquet/exception.h" |
43 | #include "parquet/properties.h" |
44 | #include "parquet/schema.h" |
45 | #include "parquet/types.h" |
46 | |
47 | using arrow::MemoryPool; |
48 | |
49 | namespace parquet { |
50 | namespace internal { |
51 | |
52 | namespace BitUtil = ::arrow::BitUtil; |
53 | |
54 | template <typename DType> |
55 | class TypedRecordReader; |
56 | |
57 | // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index |
58 | // encoding. |
59 | static bool IsDictionaryIndexEncoding(const Encoding::type& e) { |
60 | return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; |
61 | } |
62 | |
63 | // The minimum number of repetition/definition levels to decode at a time, for |
64 | // better vectorized performance when doing many smaller record reads |
65 | constexpr int64_t kMinLevelBatchSize = 1024; |
66 | |
67 | class RecordReader::RecordReaderImpl { |
68 | public: |
69 | RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool) |
70 | : descr_(descr), |
71 | pool_(pool), |
72 | num_buffered_values_(0), |
73 | num_decoded_values_(0), |
74 | max_def_level_(descr->max_definition_level()), |
75 | max_rep_level_(descr->max_repetition_level()), |
76 | at_record_start_(true), |
77 | records_read_(0), |
78 | values_written_(0), |
79 | values_capacity_(0), |
80 | null_count_(0), |
81 | levels_written_(0), |
82 | levels_position_(0), |
83 | levels_capacity_(0) { |
84 | nullable_values_ = internal::HasSpacedValues(descr); |
85 | values_ = AllocateBuffer(pool); |
86 | valid_bits_ = AllocateBuffer(pool); |
87 | def_levels_ = AllocateBuffer(pool); |
88 | rep_levels_ = AllocateBuffer(pool); |
89 | Reset(); |
90 | } |
91 | |
92 | virtual ~RecordReaderImpl() = default; |
93 | |
94 | virtual int64_t ReadRecordData(const int64_t num_records) = 0; |
95 | |
96 | // Returns true if there are still values in this column. |
97 | bool HasNext() { |
98 | // Either there is no data page available yet, or the data page has been |
99 | // exhausted |
100 | if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { |
101 | if (!ReadNewPage() || num_buffered_values_ == 0) { |
102 | return false; |
103 | } |
104 | } |
105 | return true; |
106 | } |
107 | |
108 | int64_t ReadRecords(int64_t num_records) { |
109 | // Delimit records, then read values at the end |
110 | int64_t records_read = 0; |
111 | |
112 | if (levels_position_ < levels_written_) { |
113 | records_read += ReadRecordData(num_records); |
114 | } |
115 | |
116 | int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); |
117 | |
118 | // If we are in the middle of a record, we continue until reaching the |
119 | // desired number of records or the end of the current record if we've found |
120 | // enough records |
121 | while (!at_record_start_ || records_read < num_records) { |
122 | // Is there more data to read in this row group? |
123 | if (!HasNext()) { |
124 | if (!at_record_start_) { |
125 | // We ended the row group while inside a record that we haven't seen |
126 | // the end of yet. So increment the record count for the last record in |
127 | // the row group |
128 | ++records_read; |
129 | at_record_start_ = true; |
130 | } |
131 | break; |
132 | } |
133 | |
134 | /// We perform multiple batch reads until we either exhaust the row group |
135 | /// or observe the desired number of records |
136 | int64_t batch_size = std::min(level_batch_size, available_values_current_page()); |
137 | |
138 | // No more data in column |
139 | if (batch_size == 0) { |
140 | break; |
141 | } |
142 | |
143 | if (max_def_level_ > 0) { |
144 | ReserveLevels(batch_size); |
145 | |
146 | int16_t* def_levels = this->def_levels() + levels_written_; |
147 | int16_t* rep_levels = this->rep_levels() + levels_written_; |
148 | |
149 | // Not present for non-repeated fields |
150 | int64_t levels_read = 0; |
151 | if (max_rep_level_ > 0) { |
152 | levels_read = ReadDefinitionLevels(batch_size, def_levels); |
153 | if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { |
154 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
155 | } |
156 | } else if (max_def_level_ > 0) { |
157 | levels_read = ReadDefinitionLevels(batch_size, def_levels); |
158 | } |
159 | |
160 | // Exhausted column chunk |
161 | if (levels_read == 0) { |
162 | break; |
163 | } |
164 | |
165 | levels_written_ += levels_read; |
166 | records_read += ReadRecordData(num_records - records_read); |
167 | } else { |
168 | // No repetition or definition levels |
169 | batch_size = std::min(num_records - records_read, batch_size); |
170 | records_read += ReadRecordData(batch_size); |
171 | } |
172 | } |
173 | |
174 | return records_read; |
175 | } |
176 | |
177 | // Dictionary decoders must be reset when advancing row groups |
178 | virtual void ResetDecoders() = 0; |
179 | |
180 | void (std::unique_ptr<PageReader> reader) { |
181 | at_record_start_ = true; |
182 | pager_ = std::move(reader); |
183 | ResetDecoders(); |
184 | } |
185 | |
186 | bool HasMoreData() const { return pager_ != nullptr; } |
187 | |
188 | int16_t* def_levels() const { |
189 | return reinterpret_cast<int16_t*>(def_levels_->mutable_data()); |
190 | } |
191 | |
192 | int16_t* rep_levels() { |
193 | return reinterpret_cast<int16_t*>(rep_levels_->mutable_data()); |
194 | } |
195 | |
196 | uint8_t* values() const { return values_->mutable_data(); } |
197 | |
198 | /// \brief Number of values written including nulls (if any) |
199 | int64_t values_written() const { return values_written_; } |
200 | |
201 | int64_t levels_position() const { return levels_position_; } |
202 | int64_t levels_written() const { return levels_written_; } |
203 | |
204 | // We may outwardly have the appearance of having exhausted a column chunk |
205 | // when in fact we are in the middle of processing the last batch |
206 | bool has_values_to_process() const { return levels_position_ < levels_written_; } |
207 | |
208 | int64_t null_count() const { return null_count_; } |
209 | |
210 | bool nullable_values() const { return nullable_values_; } |
211 | |
212 | std::shared_ptr<ResizableBuffer> ReleaseValues() { |
213 | auto result = values_; |
214 | values_ = AllocateBuffer(pool_); |
215 | return result; |
216 | } |
217 | |
218 | std::shared_ptr<ResizableBuffer> ReleaseIsValid() { |
219 | auto result = valid_bits_; |
220 | valid_bits_ = AllocateBuffer(pool_); |
221 | return result; |
222 | } |
223 | |
224 | // Process written repetition/definition levels to reach the end of |
225 | // records. Process no more levels than necessary to delimit the indicated |
226 | // number of logical records. Updates internal state of RecordReader |
227 | // |
228 | // \return Number of records delimited |
229 | int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { |
230 | int64_t values_to_read = 0; |
231 | int64_t records_read = 0; |
232 | |
233 | const int16_t* def_levels = this->def_levels() + levels_position_; |
234 | const int16_t* rep_levels = this->rep_levels() + levels_position_; |
235 | |
236 | DCHECK_GT(max_rep_level_, 0); |
237 | |
238 | // Count logical records and number of values to read |
239 | while (levels_position_ < levels_written_) { |
240 | if (*rep_levels++ == 0) { |
241 | // If at_record_start_ is true, we are seeing the start of a record |
242 | // for the second time, such as after repeated calls to |
243 | // DelimitRecords. In this case we must continue until we find |
244 | // another record start or exhausting the ColumnChunk |
245 | if (!at_record_start_) { |
246 | // We've reached the end of a record; increment the record count. |
247 | ++records_read; |
248 | if (records_read == num_records) { |
249 | // We've found the number of records we were looking for. Set |
250 | // at_record_start_ to true and break |
251 | at_record_start_ = true; |
252 | break; |
253 | } |
254 | } |
255 | } |
256 | |
257 | // We have decided to consume the level at this position; therefore we |
258 | // must advance until we find another record boundary |
259 | at_record_start_ = false; |
260 | |
261 | if (*def_levels++ == max_def_level_) { |
262 | ++values_to_read; |
263 | } |
264 | ++levels_position_; |
265 | } |
266 | *values_seen = values_to_read; |
267 | return records_read; |
268 | } |
269 | |
270 | // Read multiple definition levels into preallocated memory |
271 | // |
272 | // Returns the number of decoded definition levels |
273 | int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { |
274 | if (descr_->max_definition_level() == 0) { |
275 | return 0; |
276 | } |
277 | return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
278 | } |
279 | |
280 | int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { |
281 | if (descr_->max_repetition_level() == 0) { |
282 | return 0; |
283 | } |
284 | return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
285 | } |
286 | |
287 | int64_t available_values_current_page() const { |
288 | return num_buffered_values_ - num_decoded_values_; |
289 | } |
290 | |
291 | void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } |
292 | |
293 | Type::type type() const { return descr_->physical_type(); } |
294 | |
295 | const ColumnDescriptor* descr() const { return descr_; } |
296 | |
297 | void Reserve(int64_t capacity) { |
298 | ReserveLevels(capacity); |
299 | ReserveValues(capacity); |
300 | } |
301 | |
302 | void ReserveLevels(int64_t capacity) { |
303 | if (descr_->max_definition_level() > 0 && |
304 | (levels_written_ + capacity > levels_capacity_)) { |
305 | int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1); |
306 | while (levels_written_ + capacity > new_levels_capacity) { |
307 | new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1); |
308 | } |
309 | PARQUET_THROW_NOT_OK( |
310 | def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); |
311 | if (descr_->max_repetition_level() > 0) { |
312 | PARQUET_THROW_NOT_OK( |
313 | rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); |
314 | } |
315 | levels_capacity_ = new_levels_capacity; |
316 | } |
317 | } |
318 | |
319 | void ReserveValues(int64_t capacity) { |
320 | if (values_written_ + capacity > values_capacity_) { |
321 | int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1); |
322 | while (values_written_ + capacity > new_values_capacity) { |
323 | new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1); |
324 | } |
325 | |
326 | int type_size = GetTypeByteSize(descr_->physical_type()); |
327 | PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false)); |
328 | values_capacity_ = new_values_capacity; |
329 | } |
330 | if (nullable_values_) { |
331 | int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); |
332 | if (valid_bits_->size() < valid_bytes_new) { |
333 | int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); |
334 | PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); |
335 | |
336 | // Avoid valgrind warnings |
337 | memset(valid_bits_->mutable_data() + valid_bytes_old, 0, |
338 | valid_bytes_new - valid_bytes_old); |
339 | } |
340 | } |
341 | } |
342 | |
343 | void Reset() { |
344 | ResetValues(); |
345 | |
346 | if (levels_written_ > 0) { |
347 | const int64_t levels_remaining = levels_written_ - levels_position_; |
348 | // Shift remaining levels to beginning of buffer and trim to only the number |
349 | // of decoded levels remaining |
350 | int16_t* def_data = def_levels(); |
351 | int16_t* rep_data = rep_levels(); |
352 | |
353 | std::copy(def_data + levels_position_, def_data + levels_written_, def_data); |
354 | std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); |
355 | |
356 | PARQUET_THROW_NOT_OK( |
357 | def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); |
358 | PARQUET_THROW_NOT_OK( |
359 | rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); |
360 | |
361 | levels_written_ -= levels_position_; |
362 | levels_position_ = 0; |
363 | levels_capacity_ = levels_remaining; |
364 | } |
365 | |
366 | records_read_ = 0; |
367 | |
368 | // Call Finish on the binary builders to reset them |
369 | } |
370 | |
371 | void ResetValues() { |
372 | if (values_written_ > 0) { |
373 | // Resize to 0, but do not shrink to fit |
374 | PARQUET_THROW_NOT_OK(values_->Resize(0, false)); |
375 | PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); |
376 | values_written_ = 0; |
377 | values_capacity_ = 0; |
378 | null_count_ = 0; |
379 | } |
380 | } |
381 | |
382 | virtual void DebugPrintState() = 0; |
383 | |
384 | virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0; |
385 | |
386 | protected: |
387 | virtual bool ReadNewPage() = 0; |
388 | |
389 | const ColumnDescriptor* descr_; |
390 | ::arrow::MemoryPool* pool_; |
391 | |
392 | std::unique_ptr<PageReader> ; |
393 | std::shared_ptr<Page> current_page_; |
394 | |
395 | // Not set if full schema for this field has no optional or repeated elements |
396 | LevelDecoder definition_level_decoder_; |
397 | |
398 | // Not set for flat schemas. |
399 | LevelDecoder repetition_level_decoder_; |
400 | |
401 | // The total number of values stored in the data page. This is the maximum of |
402 | // the number of encoded definition levels or encoded values. For |
403 | // non-repeated, required columns, this is equal to the number of encoded |
404 | // values. For repeated or optional values, there may be fewer data values |
405 | // than levels, and this tells you how many encoded levels there are in that |
406 | // case. |
407 | int64_t num_buffered_values_; |
408 | |
409 | // The number of values from the current data page that have been decoded |
410 | // into memory |
411 | int64_t num_decoded_values_; |
412 | |
413 | const int16_t max_def_level_; |
414 | const int16_t max_rep_level_; |
415 | |
416 | bool nullable_values_; |
417 | |
418 | bool at_record_start_; |
419 | int64_t records_read_; |
420 | |
421 | int64_t values_written_; |
422 | int64_t values_capacity_; |
423 | int64_t null_count_; |
424 | |
425 | int64_t levels_written_; |
426 | int64_t levels_position_; |
427 | int64_t levels_capacity_; |
428 | |
429 | std::shared_ptr<::arrow::ResizableBuffer> values_; |
430 | |
431 | template <typename T> |
432 | T* ValuesHead() { |
433 | return reinterpret_cast<T*>(values_->mutable_data()) + values_written_; |
434 | } |
435 | |
436 | std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; |
437 | std::shared_ptr<::arrow::ResizableBuffer> def_levels_; |
438 | std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; |
439 | }; |
440 | |
441 | template <typename DType> |
442 | struct RecordReaderTraits { |
443 | using BuilderType = ::arrow::ArrayBuilder; |
444 | }; |
445 | |
446 | template <> |
447 | struct RecordReaderTraits<ByteArrayType> { |
448 | using BuilderType = ::arrow::internal::ChunkedBinaryBuilder; |
449 | }; |
450 | |
451 | template <> |
452 | struct RecordReaderTraits<FLBAType> { |
453 | using BuilderType = ::arrow::FixedSizeBinaryBuilder; |
454 | }; |
455 | |
456 | template <typename DType> |
457 | class TypedRecordReader : public RecordReader::RecordReaderImpl { |
458 | public: |
459 | using T = typename DType::c_type; |
460 | |
461 | using BuilderType = typename RecordReaderTraits<DType>::BuilderType; |
462 | |
463 | TypedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
464 | : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) { |
465 | InitializeBuilder(); |
466 | } |
467 | |
468 | void ResetDecoders() override { decoders_.clear(); } |
469 | |
470 | inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { |
471 | uint8_t* valid_bits = valid_bits_->mutable_data(); |
472 | const int64_t valid_bits_offset = values_written_; |
473 | |
474 | int64_t num_decoded = current_decoder_->DecodeSpaced( |
475 | ValuesHead<T>(), static_cast<int>(values_with_nulls), |
476 | static_cast<int>(null_count), valid_bits, valid_bits_offset); |
477 | DCHECK_EQ(num_decoded, values_with_nulls); |
478 | } |
479 | |
480 | inline void ReadValuesDense(int64_t values_to_read) { |
481 | int64_t num_decoded = |
482 | current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read)); |
483 | DCHECK_EQ(num_decoded, values_to_read); |
484 | } |
485 | |
486 | // Return number of logical records read |
487 | int64_t ReadRecordData(const int64_t num_records) override { |
488 | // Conservative upper bound |
489 | const int64_t possible_num_values = |
490 | std::max(num_records, levels_written_ - levels_position_); |
491 | ReserveValues(possible_num_values); |
492 | |
493 | const int64_t start_levels_position = levels_position_; |
494 | |
495 | int64_t values_to_read = 0; |
496 | int64_t records_read = 0; |
497 | if (max_rep_level_ > 0) { |
498 | records_read = DelimitRecords(num_records, &values_to_read); |
499 | } else if (max_def_level_ > 0) { |
500 | // No repetition levels, skip delimiting logic. Each level represents a |
501 | // null or not null entry |
502 | records_read = std::min(levels_written_ - levels_position_, num_records); |
503 | |
504 | // This is advanced by DelimitRecords, which we skipped |
505 | levels_position_ += records_read; |
506 | } else { |
507 | records_read = values_to_read = num_records; |
508 | } |
509 | |
510 | int64_t null_count = 0; |
511 | if (nullable_values_) { |
512 | int64_t values_with_nulls = 0; |
513 | internal::DefinitionLevelsToBitmap( |
514 | def_levels() + start_levels_position, levels_position_ - start_levels_position, |
515 | max_def_level_, max_rep_level_, &values_with_nulls, &null_count, |
516 | valid_bits_->mutable_data(), values_written_); |
517 | values_to_read = values_with_nulls - null_count; |
518 | ReadValuesSpaced(values_with_nulls, null_count); |
519 | ConsumeBufferedValues(levels_position_ - start_levels_position); |
520 | } else { |
521 | ReadValuesDense(values_to_read); |
522 | ConsumeBufferedValues(values_to_read); |
523 | } |
524 | // Total values, including null spaces, if any |
525 | values_written_ += values_to_read + null_count; |
526 | null_count_ += null_count; |
527 | |
528 | return records_read; |
529 | } |
530 | |
531 | void DebugPrintState() override { |
532 | const int16_t* def_levels = this->def_levels(); |
533 | const int16_t* rep_levels = this->rep_levels(); |
534 | const int64_t total_levels_read = levels_position_; |
535 | |
536 | const T* values = reinterpret_cast<const T*>(this->values()); |
537 | |
538 | std::cout << "def levels: " ; |
539 | for (int64_t i = 0; i < total_levels_read; ++i) { |
540 | std::cout << def_levels[i] << " " ; |
541 | } |
542 | std::cout << std::endl; |
543 | |
544 | std::cout << "rep levels: " ; |
545 | for (int64_t i = 0; i < total_levels_read; ++i) { |
546 | std::cout << rep_levels[i] << " " ; |
547 | } |
548 | std::cout << std::endl; |
549 | |
550 | std::cout << "values: " ; |
551 | for (int64_t i = 0; i < this->values_written(); ++i) { |
552 | std::cout << values[i] << " " ; |
553 | } |
554 | std::cout << std::endl; |
555 | } |
556 | |
557 | std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() override { |
558 | throw ParquetException("GetChunks only implemented for binary types" ); |
559 | } |
560 | |
561 | private: |
562 | typedef Decoder<DType> DecoderType; |
563 | |
564 | // Map of encoding type to the respective decoder object. For example, a |
565 | // column chunk's data pages may include both dictionary-encoded and |
566 | // plain-encoded data. |
567 | std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_; |
568 | |
569 | std::unique_ptr<BuilderType> builder_; |
570 | |
571 | DecoderType* current_decoder_; |
572 | |
573 | // Advance to the next data page |
574 | bool ReadNewPage() override; |
575 | |
576 | void InitializeBuilder() {} |
577 | |
578 | void ConfigureDictionary(const DictionaryPage* page); |
579 | }; |
580 | |
581 | // TODO(wesm): Implement these to some satisfaction |
582 | template <> |
583 | void TypedRecordReader<Int96Type>::DebugPrintState() {} |
584 | |
585 | template <> |
586 | void TypedRecordReader<ByteArrayType>::DebugPrintState() {} |
587 | |
588 | template <> |
589 | void TypedRecordReader<FLBAType>::DebugPrintState() {} |
590 | |
591 | template <> |
592 | void TypedRecordReader<ByteArrayType>::InitializeBuilder() { |
593 | // Maximum of 16MB chunks |
594 | constexpr int32_t kBinaryChunksize = 1 << 24; |
595 | DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); |
596 | builder_.reset(new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); |
597 | } |
598 | |
599 | template <> |
600 | void TypedRecordReader<FLBAType>::InitializeBuilder() { |
601 | DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); |
602 | int byte_width = descr_->type_length(); |
603 | std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); |
604 | builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); |
605 | } |
606 | |
607 | template <> |
608 | ::arrow::ArrayVector TypedRecordReader<ByteArrayType>::GetBuilderChunks() { |
609 | ::arrow::ArrayVector chunks; |
610 | PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); |
611 | return chunks; |
612 | } |
613 | |
614 | template <> |
615 | ::arrow::ArrayVector TypedRecordReader<FLBAType>::GetBuilderChunks() { |
616 | std::shared_ptr<::arrow::Array> chunk; |
617 | PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); |
618 | return ::arrow::ArrayVector({chunk}); |
619 | } |
620 | |
621 | template <> |
622 | inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) { |
623 | auto values = ValuesHead<ByteArray>(); |
624 | int64_t num_decoded = |
625 | current_decoder_->Decode(values, static_cast<int>(values_to_read)); |
626 | DCHECK_EQ(num_decoded, values_to_read); |
627 | |
628 | for (int64_t i = 0; i < num_decoded; i++) { |
629 | PARQUET_THROW_NOT_OK( |
630 | builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len))); |
631 | } |
632 | ResetValues(); |
633 | } |
634 | |
635 | template <> |
636 | inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read) { |
637 | auto values = ValuesHead<FLBA>(); |
638 | int64_t num_decoded = |
639 | current_decoder_->Decode(values, static_cast<int>(values_to_read)); |
640 | DCHECK_EQ(num_decoded, values_to_read); |
641 | |
642 | for (int64_t i = 0; i < num_decoded; i++) { |
643 | PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); |
644 | } |
645 | ResetValues(); |
646 | } |
647 | |
648 | template <> |
649 | inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read, |
650 | int64_t null_count) { |
651 | uint8_t* valid_bits = valid_bits_->mutable_data(); |
652 | const int64_t valid_bits_offset = values_written_; |
653 | auto values = ValuesHead<ByteArray>(); |
654 | |
655 | int64_t num_decoded = current_decoder_->DecodeSpaced( |
656 | values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits, |
657 | valid_bits_offset); |
658 | DCHECK_EQ(num_decoded, values_to_read); |
659 | |
660 | for (int64_t i = 0; i < num_decoded; i++) { |
661 | if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { |
662 | PARQUET_THROW_NOT_OK( |
663 | builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len))); |
664 | } else { |
665 | PARQUET_THROW_NOT_OK(builder_->AppendNull()); |
666 | } |
667 | } |
668 | ResetValues(); |
669 | } |
670 | |
671 | template <> |
672 | inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read, |
673 | int64_t null_count) { |
674 | uint8_t* valid_bits = valid_bits_->mutable_data(); |
675 | const int64_t valid_bits_offset = values_written_; |
676 | auto values = ValuesHead<FLBA>(); |
677 | |
678 | int64_t num_decoded = current_decoder_->DecodeSpaced( |
679 | values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits, |
680 | valid_bits_offset); |
681 | DCHECK_EQ(num_decoded, values_to_read); |
682 | |
683 | for (int64_t i = 0; i < num_decoded; i++) { |
684 | if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { |
685 | PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); |
686 | } else { |
687 | PARQUET_THROW_NOT_OK(builder_->AppendNull()); |
688 | } |
689 | } |
690 | ResetValues(); |
691 | } |
692 | |
693 | template <typename DType> |
694 | inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage* page) { |
695 | int encoding = static_cast<int>(page->encoding()); |
696 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
697 | page->encoding() == Encoding::PLAIN) { |
698 | encoding = static_cast<int>(Encoding::RLE_DICTIONARY); |
699 | } |
700 | |
701 | auto it = decoders_.find(encoding); |
702 | if (it != decoders_.end()) { |
703 | throw ParquetException("Column cannot have more than one dictionary." ); |
704 | } |
705 | |
706 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
707 | page->encoding() == Encoding::PLAIN) { |
708 | PlainDecoder<DType> dictionary(descr_); |
709 | dictionary.SetData(page->num_values(), page->data(), page->size()); |
710 | |
711 | // The dictionary is fully decoded during DictionaryDecoder::Init, so the |
712 | // DictionaryPage buffer is no longer required after this step |
713 | // |
714 | // TODO(wesm): investigate whether this all-or-nothing decoding of the |
715 | // dictionary makes sense and whether performance can be improved |
716 | |
717 | auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_); |
718 | decoder->SetDict(&dictionary); |
719 | decoders_[encoding] = decoder; |
720 | } else { |
721 | ParquetException::NYI("only plain dictionary encoding has been implemented" ); |
722 | } |
723 | |
724 | current_decoder_ = decoders_[encoding].get(); |
725 | } |
726 | |
727 | template <typename DType> |
728 | bool TypedRecordReader<DType>::ReadNewPage() { |
729 | // Loop until we find the next data page. |
730 | const uint8_t* buffer; |
731 | |
732 | while (true) { |
733 | current_page_ = pager_->NextPage(); |
734 | if (!current_page_) { |
735 | // EOS |
736 | return false; |
737 | } |
738 | |
739 | if (current_page_->type() == PageType::DICTIONARY_PAGE) { |
740 | ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); |
741 | continue; |
742 | } else if (current_page_->type() == PageType::DATA_PAGE) { |
743 | const DataPage* page = static_cast<const DataPage*>(current_page_.get()); |
744 | |
745 | // Read a data page. |
746 | num_buffered_values_ = page->num_values(); |
747 | |
748 | // Have not decoded any values from the data page yet |
749 | num_decoded_values_ = 0; |
750 | |
751 | buffer = page->data(); |
752 | |
753 | // If the data page includes repetition and definition levels, we |
754 | // initialize the level decoder and subtract the encoded level bytes from |
755 | // the page size to determine the number of bytes in the encoded data. |
756 | int64_t data_size = page->size(); |
757 | |
758 | // Data page Layout: Repetition Levels - Definition Levels - encoded values. |
759 | // Levels are encoded as rle or bit-packed. |
760 | // Init repetition levels |
761 | if (descr_->max_repetition_level() > 0) { |
762 | int64_t rep_levels_bytes = repetition_level_decoder_.SetData( |
763 | page->repetition_level_encoding(), descr_->max_repetition_level(), |
764 | static_cast<int>(num_buffered_values_), buffer); |
765 | buffer += rep_levels_bytes; |
766 | data_size -= rep_levels_bytes; |
767 | } |
768 | // TODO figure a way to set max_definition_level_ to 0 |
769 | // if the initial value is invalid |
770 | |
771 | // Init definition levels |
772 | if (descr_->max_definition_level() > 0) { |
773 | int64_t def_levels_bytes = definition_level_decoder_.SetData( |
774 | page->definition_level_encoding(), descr_->max_definition_level(), |
775 | static_cast<int>(num_buffered_values_), buffer); |
776 | buffer += def_levels_bytes; |
777 | data_size -= def_levels_bytes; |
778 | } |
779 | |
780 | // Get a decoder object for this page or create a new decoder if this is the |
781 | // first page with this encoding. |
782 | Encoding::type encoding = page->encoding(); |
783 | |
784 | if (IsDictionaryIndexEncoding(encoding)) { |
785 | encoding = Encoding::RLE_DICTIONARY; |
786 | } |
787 | |
788 | auto it = decoders_.find(static_cast<int>(encoding)); |
789 | if (it != decoders_.end()) { |
790 | if (encoding == Encoding::RLE_DICTIONARY) { |
791 | DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); |
792 | } |
793 | current_decoder_ = it->second.get(); |
794 | } else { |
795 | switch (encoding) { |
796 | case Encoding::PLAIN: { |
797 | std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_)); |
798 | decoders_[static_cast<int>(encoding)] = decoder; |
799 | current_decoder_ = decoder.get(); |
800 | break; |
801 | } |
802 | case Encoding::RLE_DICTIONARY: |
803 | throw ParquetException("Dictionary page must be before data page." ); |
804 | |
805 | case Encoding::DELTA_BINARY_PACKED: |
806 | case Encoding::DELTA_LENGTH_BYTE_ARRAY: |
807 | case Encoding::DELTA_BYTE_ARRAY: |
808 | ParquetException::NYI("Unsupported encoding" ); |
809 | |
810 | default: |
811 | throw ParquetException("Unknown encoding type." ); |
812 | } |
813 | } |
814 | current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer, |
815 | static_cast<int>(data_size)); |
816 | return true; |
817 | } else { |
818 | // We don't know what this page type is. We're allowed to skip non-data |
819 | // pages. |
820 | continue; |
821 | } |
822 | } |
823 | return true; |
824 | } |
825 | |
826 | std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, |
827 | MemoryPool* pool) { |
828 | switch (descr->physical_type()) { |
829 | case Type::BOOLEAN: |
830 | return std::shared_ptr<RecordReader>( |
831 | new RecordReader(new TypedRecordReader<BooleanType>(descr, pool))); |
832 | case Type::INT32: |
833 | return std::shared_ptr<RecordReader>( |
834 | new RecordReader(new TypedRecordReader<Int32Type>(descr, pool))); |
835 | case Type::INT64: |
836 | return std::shared_ptr<RecordReader>( |
837 | new RecordReader(new TypedRecordReader<Int64Type>(descr, pool))); |
838 | case Type::INT96: |
839 | return std::shared_ptr<RecordReader>( |
840 | new RecordReader(new TypedRecordReader<Int96Type>(descr, pool))); |
841 | case Type::FLOAT: |
842 | return std::shared_ptr<RecordReader>( |
843 | new RecordReader(new TypedRecordReader<FloatType>(descr, pool))); |
844 | case Type::DOUBLE: |
845 | return std::shared_ptr<RecordReader>( |
846 | new RecordReader(new TypedRecordReader<DoubleType>(descr, pool))); |
847 | case Type::BYTE_ARRAY: |
848 | return std::shared_ptr<RecordReader>( |
849 | new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool))); |
850 | case Type::FIXED_LEN_BYTE_ARRAY: |
851 | return std::shared_ptr<RecordReader>( |
852 | new RecordReader(new TypedRecordReader<FLBAType>(descr, pool))); |
853 | default: { |
854 | // PARQUET-1481: This can occur if the file is corrupt |
855 | std::stringstream ss; |
856 | ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type()); |
857 | throw ParquetException(ss.str()); |
858 | } |
859 | } |
860 | // Unreachable code, but supress compiler warning |
861 | return nullptr; |
862 | } |
863 | |
864 | // ---------------------------------------------------------------------- |
865 | // Implement public API |
866 | |
867 | RecordReader::RecordReader(RecordReaderImpl* impl) { impl_.reset(impl); } |
868 | |
869 | RecordReader::~RecordReader() {} |
870 | |
871 | int64_t RecordReader::ReadRecords(int64_t num_records) { |
872 | return impl_->ReadRecords(num_records); |
873 | } |
874 | |
875 | void RecordReader::Reset() { return impl_->Reset(); } |
876 | |
877 | void RecordReader::Reserve(int64_t num_values) { impl_->Reserve(num_values); } |
878 | |
879 | const int16_t* RecordReader::def_levels() const { return impl_->def_levels(); } |
880 | |
881 | const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels(); } |
882 | |
883 | const uint8_t* RecordReader::values() const { return impl_->values(); } |
884 | |
885 | std::shared_ptr<ResizableBuffer> RecordReader::ReleaseValues() { |
886 | return impl_->ReleaseValues(); |
887 | } |
888 | |
889 | std::shared_ptr<ResizableBuffer> RecordReader::ReleaseIsValid() { |
890 | return impl_->ReleaseIsValid(); |
891 | } |
892 | |
893 | int64_t RecordReader::values_written() const { return impl_->values_written(); } |
894 | |
895 | int64_t RecordReader::levels_position() const { return impl_->levels_position(); } |
896 | |
897 | int64_t RecordReader::levels_written() const { return impl_->levels_written(); } |
898 | |
899 | int64_t RecordReader::null_count() const { return impl_->null_count(); } |
900 | |
901 | bool RecordReader::nullable_values() const { return impl_->nullable_values(); } |
902 | |
903 | bool RecordReader::HasMoreData() const { return impl_->HasMoreData(); } |
904 | |
905 | void RecordReader::(std::unique_ptr<PageReader> reader) { |
906 | impl_->SetPageReader(std::move(reader)); |
907 | } |
908 | |
909 | ::arrow::ArrayVector RecordReader::GetBuilderChunks() { |
910 | return impl_->GetBuilderChunks(); |
911 | } |
912 | |
913 | void RecordReader::DebugPrintState() { impl_->DebugPrintState(); } |
914 | |
915 | } // namespace internal |
916 | } // namespace parquet |
917 | |