1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/arrow/reader.h"
19
20#include <algorithm>
21#include <climits>
22#include <cstring>
23#include <future>
24#include <ostream>
25#include <string>
26#include <type_traits>
27#include <utility>
28#include <vector>
29
30#include "arrow/api.h"
31#include "arrow/util/bit-util.h"
32#include "arrow/util/int-util.h"
33#include "arrow/util/logging.h"
34#include "arrow/util/thread-pool.h"
35
36// For arrow::compute::Datum. This should perhaps be promoted. See ARROW-4022
37#include "arrow/compute/kernel.h"
38
39#include "parquet/arrow/record_reader.h"
40#include "parquet/arrow/schema.h"
41#include "parquet/column_reader.h"
42#include "parquet/exception.h"
43#include "parquet/file_reader.h"
44#include "parquet/metadata.h"
45#include "parquet/properties.h"
46#include "parquet/schema.h"
47#include "parquet/types.h"
48#include "parquet/util/memory.h"
49#include "parquet/util/schema-util.h"
50
51using arrow::Array;
52using arrow::BooleanArray;
53using arrow::ChunkedArray;
54using arrow::Column;
55using arrow::Field;
56using arrow::Int32Array;
57using arrow::ListArray;
58using arrow::MemoryPool;
59using arrow::ResizableBuffer;
60using arrow::Status;
61using arrow::StructArray;
62using arrow::Table;
63using arrow::TimestampArray;
64
65// For Array/ChunkedArray variant
66using arrow::compute::Datum;
67
68using parquet::schema::Node;
69
70// Help reduce verbosity
71using ParquetReader = parquet::ParquetFileReader;
72using arrow::RecordBatchReader;
73
74using parquet::internal::RecordReader;
75
76namespace parquet {
77namespace arrow {
78
79using ::arrow::BitUtil::BytesForBits;
80using ::arrow::BitUtil::FromBigEndian;
81using ::arrow::internal::SafeLeftShift;
82
83template <typename ArrowType>
84using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
85
86namespace {
87
88Status GetSingleChunk(const ChunkedArray& chunked, std::shared_ptr<Array>* out) {
89 DCHECK_GT(chunked.num_chunks(), 0);
90 if (chunked.num_chunks() > 1) {
91 return Status::Invalid("Function call returned a chunked array");
92 }
93 *out = chunked.chunk(0);
94 return Status::OK();
95}
96
97} // namespace
98
99// ----------------------------------------------------------------------
100// Iteration utilities
101
102// Abstraction to decouple row group iteration details from the ColumnReader,
103// so we can read only a single row group if we want
104class FileColumnIterator {
105 public:
106 explicit FileColumnIterator(int column_index, ParquetFileReader* reader)
107 : column_index_(column_index),
108 reader_(reader),
109 schema_(reader->metadata()->schema()) {}
110
111 virtual ~FileColumnIterator() {}
112
113 virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0;
114
115 const SchemaDescriptor* schema() const { return schema_; }
116
117 const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
118
119 std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }
120
121 int column_index() const { return column_index_; }
122
123 protected:
124 int column_index_;
125 ParquetFileReader* reader_;
126 const SchemaDescriptor* schema_;
127};
128
129class AllRowGroupsIterator : public FileColumnIterator {
130 public:
131 explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
132 : FileColumnIterator(column_index, reader), next_row_group_(0) {}
133
134 std::unique_ptr<::parquet::PageReader> NextChunk() override {
135 std::unique_ptr<::parquet::PageReader> result;
136 if (next_row_group_ < reader_->metadata()->num_row_groups()) {
137 result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_);
138 next_row_group_++;
139 } else {
140 result = nullptr;
141 }
142 return result;
143 }
144
145 private:
146 int next_row_group_;
147};
148
149class SingleRowGroupIterator : public FileColumnIterator {
150 public:
151 explicit SingleRowGroupIterator(int column_index, int row_group_number,
152 ParquetFileReader* reader)
153 : FileColumnIterator(column_index, reader),
154 row_group_number_(row_group_number),
155 done_(false) {}
156
157 std::unique_ptr<::parquet::PageReader> NextChunk() override {
158 if (done_) {
159 return nullptr;
160 }
161
162 auto result =
163 reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_);
164 done_ = true;
165 return result;
166 }
167
168 private:
169 int row_group_number_;
170 bool done_;
171};
172
173class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
174 public:
175 explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices,
176 const std::vector<int>& column_indices,
177 std::shared_ptr<::arrow::Schema> schema,
178 FileReader* reader)
179 : row_group_indices_(row_group_indices),
180 column_indices_(column_indices),
181 schema_(schema),
182 file_reader_(reader),
183 next_row_group_(0) {}
184
185 ~RowGroupRecordBatchReader() override {}
186
187 std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
188
189 Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
190 if (table_ != nullptr) { // one row group has been loaded
191 std::shared_ptr<::arrow::RecordBatch> tmp;
192 RETURN_NOT_OK(table_batch_reader_->ReadNext(&tmp));
193 if (tmp != nullptr) { // some column chunks are left in table
194 *out = tmp;
195 return Status::OK();
196 } else { // the entire table is consumed
197 table_batch_reader_.reset();
198 table_.reset();
199 }
200 }
201
202 // all row groups has been consumed
203 if (next_row_group_ == row_group_indices_.size()) {
204 *out = nullptr;
205 return Status::OK();
206 }
207
208 RETURN_NOT_OK(file_reader_->ReadRowGroup(row_group_indices_[next_row_group_],
209 column_indices_, &table_));
210
211 next_row_group_++;
212 table_batch_reader_.reset(new ::arrow::TableBatchReader(*table_.get()));
213 return table_batch_reader_->ReadNext(out);
214 }
215
216 private:
217 std::vector<int> row_group_indices_;
218 std::vector<int> column_indices_;
219 std::shared_ptr<::arrow::Schema> schema_;
220 FileReader* file_reader_;
221 size_t next_row_group_;
222 std::shared_ptr<::arrow::Table> table_;
223 std::unique_ptr<::arrow::TableBatchReader> table_batch_reader_;
224};
225
226// ----------------------------------------------------------------------
227// File reader implementation
228
229class FileReader::Impl {
230 public:
231 Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
232 : pool_(pool), reader_(std::move(reader)), use_threads_(false) {}
233
234 virtual ~Impl() {}
235
236 Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
237
238 Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out);
239 Status ReadSchemaField(int i, const std::vector<int>& indices,
240 std::shared_ptr<ChunkedArray>* out);
241 Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out);
242 Status ReadColumnChunk(int column_index, int row_group_index,
243 std::shared_ptr<ChunkedArray>* out);
244
245 Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
246 int16_t def_level,
247 std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
248
249 Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
250 Status GetSchema(const std::vector<int>& indices,
251 std::shared_ptr<::arrow::Schema>* out);
252 Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table);
253 Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
254 std::shared_ptr<::arrow::Table>* out);
255 Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
256 Status ReadTable(std::shared_ptr<Table>* table);
257 Status ReadRowGroups(const std::vector<int>& row_groups, std::shared_ptr<Table>* table);
258 Status ReadRowGroups(const std::vector<int>& row_groups,
259 const std::vector<int>& indices,
260 std::shared_ptr<::arrow::Table>* out);
261
262 bool CheckForFlatColumn(const ColumnDescriptor* descr);
263 bool CheckForFlatListColumn(const ColumnDescriptor* descr);
264
265 const ParquetFileReader* parquet_reader() const { return reader_.get(); }
266
267 int num_row_groups() const { return reader_->metadata()->num_row_groups(); }
268
269 int num_columns() const { return reader_->metadata()->num_columns(); }
270
271 void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
272
273 ParquetFileReader* reader() { return reader_.get(); }
274
275 private:
276 MemoryPool* pool_;
277 std::unique_ptr<ParquetFileReader> reader_;
278 bool use_threads_;
279};
280
281class ColumnReader::ColumnReaderImpl {
282 public:
283 virtual ~ColumnReaderImpl() {}
284 virtual Status NextBatch(int64_t records_to_read,
285 std::shared_ptr<ChunkedArray>* out) = 0;
286 virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
287 virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
288 virtual const std::shared_ptr<Field> field() = 0;
289};
290
291// Reader implementation for primitive arrays
292class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
293 public:
294 PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
295 : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
296 record_reader_ = RecordReader::Make(descr_, pool_);
297 DCHECK(NodeToField(*input_->descr()->schema_node(), &field_).ok());
298 NextRowGroup();
299 }
300
301 Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
302
303 template <typename ParquetType>
304 Status WrapIntoListArray(Datum* inout_array);
305
306 Status GetDefLevels(const int16_t** data, size_t* length) override;
307 Status GetRepLevels(const int16_t** data, size_t* length) override;
308
309 const std::shared_ptr<Field> field() override { return field_; }
310
311 private:
312 void NextRowGroup();
313
314 MemoryPool* pool_;
315 std::unique_ptr<FileColumnIterator> input_;
316 const ColumnDescriptor* descr_;
317
318 std::shared_ptr<RecordReader> record_reader_;
319
320 std::shared_ptr<Field> field_;
321};
322
323// Reader implementation for struct array
324class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
325 public:
326 explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
327 int16_t struct_def_level, MemoryPool* pool, const Node* node)
328 : children_(children), struct_def_level_(struct_def_level), pool_(pool) {
329 InitField(node, children);
330 }
331
332 Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
333 Status GetDefLevels(const int16_t** data, size_t* length) override;
334 Status GetRepLevels(const int16_t** data, size_t* length) override;
335 const std::shared_ptr<Field> field() override { return field_; }
336
337 private:
338 std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
339 int16_t struct_def_level_;
340 MemoryPool* pool_;
341 std::shared_ptr<Field> field_;
342 std::shared_ptr<ResizableBuffer> def_levels_buffer_;
343
344 Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
345 void InitField(const Node* node,
346 const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
347};
348
349FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
350 : impl_(new FileReader::Impl(pool, std::move(reader))) {}
351
352FileReader::~FileReader() {}
353
354Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
355 std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
356
357 std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
358 new PrimitiveImpl(pool_, std::move(input)));
359 *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
360 return Status::OK();
361}
362
363Status FileReader::Impl::GetReaderForNode(
364 int index, const Node* node, const std::vector<int>& indices, int16_t def_level,
365 std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
366 *out = nullptr;
367
368 if (IsSimpleStruct(node)) {
369 const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
370 std::vector<std::shared_ptr<ColumnReader::ColumnReaderImpl>> children;
371 for (int i = 0; i < group->field_count(); i++) {
372 std::unique_ptr<ColumnReader::ColumnReaderImpl> child_reader;
373 // TODO(itaiin): Remove the -1 index hack when all types of nested reads
374 // are supported. This currently just signals the lower level reader resolution
375 // to abort
376 RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices,
377 static_cast<int16_t>(def_level + 1), &child_reader));
378 if (child_reader != nullptr) {
379 children.push_back(std::move(child_reader));
380 }
381 }
382
383 if (children.size() > 0) {
384 *out = std::unique_ptr<ColumnReader::ColumnReaderImpl>(
385 new StructImpl(children, def_level, pool_, node));
386 }
387 } else {
388 // This should be a flat field case - translate the field index to
389 // the correct column index by walking down to the leaf node
390 const Node* walker = node;
391 while (!walker->is_primitive()) {
392 DCHECK(walker->is_group());
393 auto group = static_cast<const GroupNode*>(walker);
394 if (group->field_count() != 1) {
395 return Status::NotImplemented("lists with structs are not supported.");
396 }
397 walker = group->field(0).get();
398 }
399 auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
400
401 // If the index of the column is found then a reader for the coliumn is needed.
402 // Otherwise *out keeps the nullptr value.
403 if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) {
404 std::unique_ptr<ColumnReader> reader;
405 RETURN_NOT_OK(GetColumn(column_index, &reader));
406 *out = std::move(reader->impl_);
407 }
408 }
409
410 return Status::OK();
411}
412
413Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) {
414 std::vector<int> indices(reader_->metadata()->num_columns());
415
416 for (size_t j = 0; j < indices.size(); ++j) {
417 indices[j] = static_cast<int>(j);
418 }
419
420 return ReadSchemaField(i, indices, out);
421}
422
423Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
424 std::shared_ptr<ChunkedArray>* out) {
425 auto parquet_schema = reader_->metadata()->schema();
426
427 auto node = parquet_schema->group_node()->field(i).get();
428 std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
429
430 RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
431 if (reader_impl == nullptr) {
432 *out = nullptr;
433 return Status::OK();
434 }
435
436 std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
437
438 // TODO(wesm): This calculation doesn't make much sense when we have repeated
439 // schema nodes
440 int64_t records_to_read = 0;
441
442 const FileMetaData& metadata = *reader_->metadata();
443 for (int j = 0; j < metadata.num_row_groups(); j++) {
444 records_to_read += metadata.RowGroup(j)->ColumnChunk(i)->num_values();
445 }
446
447 return reader->NextBatch(records_to_read, out);
448}
449
450Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
451 std::unique_ptr<ColumnReader> flat_column_reader;
452 RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
453
454 int64_t records_to_read = 0;
455 for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
456 records_to_read += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
457 }
458
459 return flat_column_reader->NextBatch(records_to_read, out);
460}
461
462Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
463 std::shared_ptr<::arrow::Schema>* out) {
464 auto descr = reader_->metadata()->schema();
465 auto parquet_key_value_metadata = reader_->metadata()->key_value_metadata();
466 return FromParquetSchema(descr, indices, parquet_key_value_metadata, out);
467}
468
469Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
470 std::shared_ptr<ChunkedArray>* out) {
471 auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
472 int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
473
474 std::unique_ptr<FileColumnIterator> input(
475 new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
476
477 std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
478 new PrimitiveImpl(pool_, std::move(input)));
479 ColumnReader flat_column_reader(std::move(impl));
480
481 return flat_column_reader.NextBatch(records_to_read, out);
482}
483
484Status FileReader::Impl::ReadRowGroup(int row_group_index,
485 const std::vector<int>& indices,
486 std::shared_ptr<Table>* out) {
487 std::shared_ptr<::arrow::Schema> schema;
488 RETURN_NOT_OK(GetSchema(indices, &schema));
489
490 auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
491
492 int num_columns = static_cast<int>(indices.size());
493 std::vector<std::shared_ptr<Column>> columns(num_columns);
494
495 // TODO(wesm): Refactor to share more code with ReadTable
496
497 auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, this](int i) {
498 int column_index = indices[i];
499
500 std::shared_ptr<ChunkedArray> array;
501 RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array));
502 columns[i] = std::make_shared<Column>(schema->field(i), array);
503 return Status::OK();
504 };
505
506 if (use_threads_) {
507 std::vector<std::future<Status>> futures;
508 auto pool = ::arrow::internal::GetCpuThreadPool();
509 for (int i = 0; i < num_columns; i++) {
510 futures.push_back(pool->Submit(ReadColumnFunc, i));
511 }
512 Status final_status = Status::OK();
513 for (auto& fut : futures) {
514 Status st = fut.get();
515 if (!st.ok()) {
516 final_status = std::move(st);
517 }
518 }
519 RETURN_NOT_OK(final_status);
520 } else {
521 for (int i = 0; i < num_columns; i++) {
522 RETURN_NOT_OK(ReadColumnFunc(i));
523 }
524 }
525
526 *out = Table::Make(schema, columns);
527 return Status::OK();
528}
529
530Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
531 std::shared_ptr<Table>* out) {
532 std::shared_ptr<::arrow::Schema> schema;
533 RETURN_NOT_OK(GetSchema(indices, &schema));
534
535 // We only need to read schema fields which have columns indicated
536 // in the indices vector
537 std::vector<int> field_indices;
538 if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
539 &field_indices)) {
540 return Status::Invalid("Invalid column index");
541 }
542
543 int num_fields = static_cast<int>(field_indices.size());
544 std::vector<std::shared_ptr<Column>> columns(num_fields);
545
546 auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
547 std::shared_ptr<ChunkedArray> array;
548 RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
549 columns[i] = std::make_shared<Column>(schema->field(i), array);
550 return Status::OK();
551 };
552
553 if (use_threads_) {
554 std::vector<std::future<Status>> futures;
555 auto pool = ::arrow::internal::GetCpuThreadPool();
556 for (int i = 0; i < num_fields; i++) {
557 futures.push_back(pool->Submit(ReadColumnFunc, i));
558 }
559 Status final_status = Status::OK();
560 for (auto& fut : futures) {
561 Status st = fut.get();
562 if (!st.ok()) {
563 final_status = std::move(st);
564 }
565 }
566 RETURN_NOT_OK(final_status);
567 } else {
568 for (int i = 0; i < num_fields; i++) {
569 RETURN_NOT_OK(ReadColumnFunc(i));
570 }
571 }
572
573 std::shared_ptr<Table> table = Table::Make(schema, columns);
574 RETURN_NOT_OK(table->Validate());
575 *out = table;
576 return Status::OK();
577}
578
579Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
580 std::vector<int> indices(reader_->metadata()->num_columns());
581
582 for (size_t i = 0; i < indices.size(); ++i) {
583 indices[i] = static_cast<int>(i);
584 }
585 return ReadTable(indices, table);
586}
587
588Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
589 const std::vector<int>& indices,
590 std::shared_ptr<Table>* table) {
591 std::vector<std::shared_ptr<Table>> tables(row_groups.size(), nullptr);
592
593 for (size_t i = 0; i < row_groups.size(); ++i) {
594 RETURN_NOT_OK(ReadRowGroup(row_groups[i], indices, &tables[i]));
595 }
596 return ConcatenateTables(tables, table);
597}
598
599Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
600 std::shared_ptr<Table>* table) {
601 std::vector<int> indices(reader_->metadata()->num_columns());
602
603 for (size_t i = 0; i < indices.size(); ++i) {
604 indices[i] = static_cast<int>(i);
605 }
606 return ReadRowGroups(row_groups, indices, table);
607}
608
609Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
610 std::vector<int> indices(reader_->metadata()->num_columns());
611
612 for (size_t i = 0; i < indices.size(); ++i) {
613 indices[i] = static_cast<int>(i);
614 }
615 return ReadRowGroup(i, indices, table);
616}
617
618// Static ctor
619Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
620 MemoryPool* allocator, const ReaderProperties& props,
621 const std::shared_ptr<FileMetaData>& metadata,
622 std::unique_ptr<FileReader>* reader) {
623 std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(file));
624 std::unique_ptr<ParquetReader> pq_reader;
625 PARQUET_CATCH_NOT_OK(pq_reader =
626 ParquetReader::Open(std::move(io_wrapper), props, metadata));
627 reader->reset(new FileReader(allocator, std::move(pq_reader)));
628 return Status::OK();
629}
630
631Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
632 MemoryPool* allocator, std::unique_ptr<FileReader>* reader) {
633 return OpenFile(file, allocator, ::parquet::default_reader_properties(), nullptr,
634 reader);
635}
636
637Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
638 return impl_->GetColumn(i, out);
639}
640
641Status FileReader::GetSchema(const std::vector<int>& indices,
642 std::shared_ptr<::arrow::Schema>* out) {
643 return impl_->GetSchema(indices, out);
644}
645
646Status FileReader::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
647 try {
648 return impl_->ReadColumn(i, out);
649 } catch (const ::parquet::ParquetException& e) {
650 return ::arrow::Status::IOError(e.what());
651 }
652}
653
654Status FileReader::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) {
655 try {
656 return impl_->ReadSchemaField(i, out);
657 } catch (const ::parquet::ParquetException& e) {
658 return ::arrow::Status::IOError(e.what());
659 }
660}
661
662Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
663 std::shared_ptr<ChunkedArray> chunked_out;
664 RETURN_NOT_OK(ReadColumn(i, &chunked_out));
665 return GetSingleChunk(*chunked_out, out);
666}
667
668Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
669 std::shared_ptr<ChunkedArray> chunked_out;
670 RETURN_NOT_OK(ReadSchemaField(i, &chunked_out));
671 return GetSingleChunk(*chunked_out, out);
672}
673
674Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
675 std::shared_ptr<RecordBatchReader>* out) {
676 std::vector<int> indices(impl_->num_columns());
677
678 for (size_t j = 0; j < indices.size(); ++j) {
679 indices[j] = static_cast<int>(j);
680 }
681
682 return GetRecordBatchReader(row_group_indices, indices, out);
683}
684
685Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
686 const std::vector<int>& column_indices,
687 std::shared_ptr<RecordBatchReader>* out) {
688 // column indicies check
689 std::shared_ptr<::arrow::Schema> schema;
690 RETURN_NOT_OK(GetSchema(column_indices, &schema));
691
692 // row group indices check
693 int max_num = num_row_groups();
694 for (auto row_group_index : row_group_indices) {
695 if (row_group_index < 0 || row_group_index >= max_num) {
696 return Status::Invalid("Some index in row_group_indices is ", row_group_index,
697 ", which is either < 0 or >= num_row_groups(", max_num, ")");
698 }
699 }
700
701 *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices,
702 schema, this);
703 return Status::OK();
704}
705
706Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
707 try {
708 return impl_->ReadTable(out);
709 } catch (const ::parquet::ParquetException& e) {
710 return ::arrow::Status::IOError(e.what());
711 }
712}
713
714Status FileReader::ReadTable(const std::vector<int>& indices,
715 std::shared_ptr<Table>* out) {
716 try {
717 return impl_->ReadTable(indices, out);
718 } catch (const ::parquet::ParquetException& e) {
719 return ::arrow::Status::IOError(e.what());
720 }
721}
722
723Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) {
724 try {
725 return impl_->ReadRowGroup(i, out);
726 } catch (const ::parquet::ParquetException& e) {
727 return ::arrow::Status::IOError(e.what());
728 }
729}
730
731Status FileReader::ReadRowGroup(int i, const std::vector<int>& indices,
732 std::shared_ptr<Table>* out) {
733 try {
734 return impl_->ReadRowGroup(i, indices, out);
735 } catch (const ::parquet::ParquetException& e) {
736 return ::arrow::Status::IOError(e.what());
737 }
738}
739
740Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
741 std::shared_ptr<Table>* out) {
742 try {
743 return impl_->ReadRowGroups(row_groups, out);
744 } catch (const ::parquet::ParquetException& e) {
745 return ::arrow::Status::IOError(e.what());
746 }
747}
748
749Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
750 const std::vector<int>& indices,
751 std::shared_ptr<Table>* out) {
752 try {
753 return impl_->ReadRowGroups(row_groups, indices, out);
754 } catch (const ::parquet::ParquetException& e) {
755 return ::arrow::Status::IOError(e.what());
756 }
757}
758
759std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
760 return std::shared_ptr<RowGroupReader>(
761 new RowGroupReader(impl_.get(), row_group_index));
762}
763
764int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
765
766void FileReader::set_num_threads(int num_threads) {}
767
768void FileReader::set_use_threads(bool use_threads) {
769 impl_->set_use_threads(use_threads);
770}
771
772Status FileReader::ScanContents(std::vector<int> columns, const int32_t column_batch_size,
773 int64_t* num_rows) {
774 try {
775 *num_rows = ScanFileContents(columns, column_batch_size, impl_->reader());
776 return Status::OK();
777 } catch (const ::parquet::ParquetException& e) {
778 return Status::IOError(e.what());
779 }
780}
781
782const ParquetFileReader* FileReader::parquet_reader() const {
783 return impl_->parquet_reader();
784}
785
786template <typename ParquetType>
787Status PrimitiveImpl::WrapIntoListArray(Datum* inout_array) {
788 if (descr_->max_repetition_level() == 0) {
789 // Flat, no action
790 return Status::OK();
791 }
792
793 std::shared_ptr<Array> flat_array;
794
795 // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is
796 // not yet implemented
797 if (inout_array->kind() == Datum::CHUNKED_ARRAY) {
798 if (inout_array->chunked_array()->num_chunks() > 1) {
799 return Status::NotImplemented(
800 "Nested data conversions not implemented for "
801 "chunked array outputs");
802 }
803 flat_array = inout_array->chunked_array()->chunk(0);
804 } else {
805 DCHECK_EQ(Datum::ARRAY, inout_array->kind());
806 flat_array = inout_array->make_array();
807 }
808
809 const int16_t* def_levels = record_reader_->def_levels();
810 const int16_t* rep_levels = record_reader_->rep_levels();
811 const int64_t total_levels_read = record_reader_->levels_position();
812
813 std::shared_ptr<::arrow::Schema> arrow_schema;
814 RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
815 input_->metadata()->key_value_metadata(),
816 &arrow_schema));
817 std::shared_ptr<Field> current_field = arrow_schema->field(0);
818
819 // Walk downwards to extract nullability
820 std::vector<bool> nullable;
821 std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
822 std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
823 nullable.push_back(current_field->nullable());
824 while (current_field->type()->num_children() > 0) {
825 if (current_field->type()->num_children() > 1) {
826 return Status::NotImplemented("Fields with more than one child are not supported.");
827 } else {
828 if (current_field->type()->id() != ::arrow::Type::LIST) {
829 return Status::NotImplemented("Currently only nesting with Lists is supported.");
830 }
831 current_field = current_field->type()->child(0);
832 }
833 offset_builders.emplace_back(
834 std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
835 valid_bits_builders.emplace_back(
836 std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
837 nullable.push_back(current_field->nullable());
838 }
839
840 int64_t list_depth = offset_builders.size();
841 // This describes the minimal definition that describes a level that
842 // reflects a value in the primitive values array.
843 int16_t values_def_level = descr_->max_definition_level();
844 if (nullable[nullable.size() - 1]) {
845 values_def_level--;
846 }
847
848 // The definition levels that are needed so that a list is declared
849 // as empty and not null.
850 std::vector<int16_t> empty_def_level(list_depth);
851 int def_level = 0;
852 for (int i = 0; i < list_depth; i++) {
853 if (nullable[i]) {
854 def_level++;
855 }
856 empty_def_level[i] = static_cast<int16_t>(def_level);
857 def_level++;
858 }
859
860 int32_t values_offset = 0;
861 std::vector<int64_t> null_counts(list_depth, 0);
862 for (int64_t i = 0; i < total_levels_read; i++) {
863 int16_t rep_level = rep_levels[i];
864 if (rep_level < descr_->max_repetition_level()) {
865 for (int64_t j = rep_level; j < list_depth; j++) {
866 if (j == (list_depth - 1)) {
867 RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
868 } else {
869 RETURN_NOT_OK(offset_builders[j]->Append(
870 static_cast<int32_t>(offset_builders[j + 1]->length())));
871 }
872
873 if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
874 RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
875 null_counts[j]++;
876 break;
877 } else {
878 RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
879 if (empty_def_level[j] == def_levels[i]) {
880 break;
881 }
882 }
883 }
884 }
885 if (def_levels[i] >= values_def_level) {
886 values_offset++;
887 }
888 }
889 // Add the final offset to all lists
890 for (int64_t j = 0; j < list_depth; j++) {
891 if (j == (list_depth - 1)) {
892 RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
893 } else {
894 RETURN_NOT_OK(offset_builders[j]->Append(
895 static_cast<int32_t>(offset_builders[j + 1]->length())));
896 }
897 }
898
899 std::vector<std::shared_ptr<Buffer>> offsets;
900 std::vector<std::shared_ptr<Buffer>> valid_bits;
901 std::vector<int64_t> list_lengths;
902 for (int64_t j = 0; j < list_depth; j++) {
903 list_lengths.push_back(offset_builders[j]->length() - 1);
904 std::shared_ptr<Array> array;
905 RETURN_NOT_OK(offset_builders[j]->Finish(&array));
906 offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
907 RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
908 valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
909 }
910
911 std::shared_ptr<Array> output = flat_array;
912 for (int64_t j = list_depth - 1; j >= 0; j--) {
913 auto list_type =
914 ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
915 output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
916 output, valid_bits[j], null_counts[j]);
917 }
918 *inout_array = output;
919 return Status::OK();
920}
921
922template <typename ArrowType, typename ParquetType>
923struct supports_fast_path_impl {
924 using ArrowCType = typename ArrowType::c_type;
925 using ParquetCType = typename ParquetType::c_type;
926 static constexpr bool value = std::is_same<ArrowCType, ParquetCType>::value;
927};
928
929template <typename ArrowType>
930struct supports_fast_path_impl<ArrowType, ByteArrayType> {
931 static constexpr bool value = false;
932};
933
934template <typename ArrowType>
935struct supports_fast_path_impl<ArrowType, FLBAType> {
936 static constexpr bool value = false;
937};
938
939template <typename ArrowType, typename ParquetType>
940using supports_fast_path =
941 typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
942
943template <typename ArrowType, typename ParquetType, typename Enable = void>
944struct TransferFunctor {
945 using ArrowCType = typename ArrowType::c_type;
946 using ParquetCType = typename ParquetType::c_type;
947
948 Status operator()(RecordReader* reader, MemoryPool* pool,
949 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
950 static_assert(!std::is_same<ArrowType, ::arrow::Int32Type>::value,
951 "The fast path transfer functor should be used "
952 "for primitive values");
953
954 int64_t length = reader->values_written();
955 std::shared_ptr<Buffer> data;
956 RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
957
958 auto values = reinterpret_cast<const ParquetCType*>(reader->values());
959 auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
960 std::copy(values, values + length, out_ptr);
961
962 if (reader->nullable_values()) {
963 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
964 *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
965 reader->null_count());
966 } else {
967 *out = std::make_shared<ArrayType<ArrowType>>(type, length, data);
968 }
969 return Status::OK();
970 }
971};
972
973template <typename ArrowType, typename ParquetType>
974struct TransferFunctor<ArrowType, ParquetType,
975 supports_fast_path<ArrowType, ParquetType>> {
976 Status operator()(RecordReader* reader, MemoryPool* pool,
977 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
978 int64_t length = reader->values_written();
979 std::shared_ptr<ResizableBuffer> values = reader->ReleaseValues();
980
981 if (reader->nullable_values()) {
982 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
983 *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
984 reader->null_count());
985 } else {
986 *out = std::make_shared<ArrayType<ArrowType>>(type, length, values);
987 }
988 return Status::OK();
989 }
990};
991
992template <>
993struct TransferFunctor<::arrow::BooleanType, BooleanType> {
994 Status operator()(RecordReader* reader, MemoryPool* pool,
995 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
996 int64_t length = reader->values_written();
997 std::shared_ptr<Buffer> data;
998
999 const int64_t buffer_size = BytesForBits(length);
1000 RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
1001
1002 // Transfer boolean values to packed bitmap
1003 auto values = reinterpret_cast<const bool*>(reader->values());
1004 uint8_t* data_ptr = data->mutable_data();
1005 memset(data_ptr, 0, buffer_size);
1006
1007 for (int64_t i = 0; i < length; i++) {
1008 if (values[i]) {
1009 ::arrow::BitUtil::SetBit(data_ptr, i);
1010 }
1011 }
1012
1013 if (reader->nullable_values()) {
1014 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
1015 RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false));
1016 *out = std::make_shared<BooleanArray>(type, length, data, is_valid,
1017 reader->null_count());
1018 } else {
1019 *out = std::make_shared<BooleanArray>(type, length, data);
1020 }
1021 return Status::OK();
1022 }
1023};
1024
1025template <>
1026struct TransferFunctor<::arrow::TimestampType, Int96Type> {
1027 Status operator()(RecordReader* reader, MemoryPool* pool,
1028 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
1029 int64_t length = reader->values_written();
1030 auto values = reinterpret_cast<const Int96*>(reader->values());
1031
1032 std::shared_ptr<Buffer> data;
1033 RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
1034
1035 auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
1036 for (int64_t i = 0; i < length; i++) {
1037 *data_ptr++ = Int96GetNanoSeconds(values[i]);
1038 }
1039
1040 if (reader->nullable_values()) {
1041 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
1042 *out = std::make_shared<TimestampArray>(type, length, data, is_valid,
1043 reader->null_count());
1044 } else {
1045 *out = std::make_shared<TimestampArray>(type, length, data);
1046 }
1047
1048 return Status::OK();
1049 }
1050};
1051
1052template <>
1053struct TransferFunctor<::arrow::Date64Type, Int32Type> {
1054 Status operator()(RecordReader* reader, MemoryPool* pool,
1055 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
1056 int64_t length = reader->values_written();
1057 auto values = reinterpret_cast<const int32_t*>(reader->values());
1058
1059 std::shared_ptr<Buffer> data;
1060 RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
1061 auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
1062
1063 for (int64_t i = 0; i < length; i++) {
1064 *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
1065 }
1066
1067 if (reader->nullable_values()) {
1068 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
1069 *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
1070 reader->null_count());
1071 } else {
1072 *out = std::make_shared<::arrow::Date64Array>(type, length, data);
1073 }
1074 return Status::OK();
1075 }
1076};
1077
1078template <typename ArrowType, typename ParquetType>
1079struct TransferFunctor<
1080 ArrowType, ParquetType,
1081 typename std::enable_if<
1082 (std::is_base_of<::arrow::BinaryType, ArrowType>::value ||
1083 std::is_same<::arrow::FixedSizeBinaryType, ArrowType>::value) &&
1084 (std::is_same<ParquetType, ByteArrayType>::value ||
1085 std::is_same<ParquetType, FLBAType>::value)>::type> {
1086 Status operator()(RecordReader* reader, MemoryPool* pool,
1087 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
1088 std::vector<std::shared_ptr<Array>> chunks = reader->GetBuilderChunks();
1089
1090 if (type->id() == ::arrow::Type::STRING) {
1091 // Convert from BINARY type to STRING
1092 for (size_t i = 0; i < chunks.size(); ++i) {
1093 auto new_data = chunks[i]->data()->Copy();
1094 new_data->type = type;
1095 chunks[i] = ::arrow::MakeArray(new_data);
1096 }
1097 }
1098 *out = std::make_shared<ChunkedArray>(chunks);
1099 return Status::OK();
1100 }
1101};
1102
1103static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
1104 const int32_t length = stop - start;
1105
1106 DCHECK_GE(length, 0);
1107 DCHECK_LE(length, 8);
1108
1109 switch (length) {
1110 case 0:
1111 return 0;
1112 case 1:
1113 return bytes[start];
1114 case 2:
1115 return FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
1116 case 3: {
1117 const uint64_t first_two_bytes =
1118 FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
1119 const uint64_t last_byte = bytes[stop - 1];
1120 return first_two_bytes << 8 | last_byte;
1121 }
1122 case 4:
1123 return FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
1124 case 5: {
1125 const uint64_t first_four_bytes =
1126 FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
1127 const uint64_t last_byte = bytes[stop - 1];
1128 return first_four_bytes << 8 | last_byte;
1129 }
1130 case 6: {
1131 const uint64_t first_four_bytes =
1132 FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
1133 const uint64_t last_two_bytes =
1134 FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
1135 return first_four_bytes << 16 | last_two_bytes;
1136 }
1137 case 7: {
1138 const uint64_t first_four_bytes =
1139 FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
1140 const uint64_t second_two_bytes =
1141 FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
1142 const uint64_t last_byte = bytes[stop - 1];
1143 return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
1144 }
1145 case 8:
1146 return FromBigEndian(*reinterpret_cast<const uint64_t*>(bytes + start));
1147 default: {
1148 DCHECK(false);
1149 return UINT64_MAX;
1150 }
1151 }
1152}
1153
1154static constexpr int32_t kMinDecimalBytes = 1;
1155static constexpr int32_t kMaxDecimalBytes = 16;
1156
1157/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
1158/// uint64_t (low bits).
1159static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
1160 int64_t* out_high, uint64_t* out_low) {
1161 DCHECK_GE(length, kMinDecimalBytes);
1162 DCHECK_LE(length, kMaxDecimalBytes);
1163
1164 // XXX This code is copied from Decimal::FromBigEndian
1165
1166 int64_t high, low;
1167
1168 // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
1169 // sign bit.
1170 const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
1171
1172 // 1. Extract the high bytes
1173 // Stop byte of the high bytes
1174 const int32_t high_bits_offset = std::max(0, length - 8);
1175 const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
1176
1177 if (high_bits_offset == 8) {
1178 // Avoid undefined shift by 64 below
1179 high = high_bits;
1180 } else {
1181 high = -1 * (is_negative && length < kMaxDecimalBytes);
1182 // Shift left enough bits to make room for the incoming int64_t
1183 high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
1184 // Preserve the upper bits by inplace OR-ing the int64_t
1185 high |= high_bits;
1186 }
1187
1188 // 2. Extract the low bytes
1189 // Stop byte of the low bytes
1190 const int32_t low_bits_offset = std::min(length, 8);
1191 const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
1192
1193 if (low_bits_offset == 8) {
1194 // Avoid undefined shift by 64 below
1195 low = low_bits;
1196 } else {
1197 // Sign extend the low bits if necessary
1198 low = -1 * (is_negative && length < 8);
1199 // Shift left enough bits to make room for the incoming int64_t
1200 low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
1201 // Preserve the upper bits by inplace OR-ing the int64_t
1202 low |= low_bits;
1203 }
1204
1205 *out_high = high;
1206 *out_low = static_cast<uint64_t>(low);
1207}
1208
1209static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
1210 uint8_t* out_buf) {
1211 // view the first 8 bytes as an unsigned 64-bit integer
1212 auto low = reinterpret_cast<uint64_t*>(out_buf);
1213
1214 // view the second 8 bytes as a signed 64-bit integer
1215 auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
1216
1217 // Convert the fixed size binary array bytes into a Decimal128 compatible layout
1218 BytesToIntegerPair(value, byte_width, high, low);
1219}
1220
1221// ----------------------------------------------------------------------
1222// BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
1223
1224template <typename T>
1225Status ConvertToDecimal128(const Array& array, const std::shared_ptr<::arrow::DataType>&,
1226 MemoryPool* pool, std::shared_ptr<Array>*) {
1227 return Status::NotImplemented("not implemented");
1228}
1229
1230template <>
1231Status ConvertToDecimal128<FLBAType>(const Array& array,
1232 const std::shared_ptr<::arrow::DataType>& type,
1233 MemoryPool* pool, std::shared_ptr<Array>* out) {
1234 const auto& fixed_size_binary_array =
1235 static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
1236
1237 // The byte width of each decimal value
1238 const int32_t type_length =
1239 static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
1240
1241 // number of elements in the entire array
1242 const int64_t length = fixed_size_binary_array.length();
1243
1244 // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
1245 // this will be different from the decimal array width because we write the minimum
1246 // number of bytes necessary to represent a given precision
1247 const int32_t byte_width =
1248 static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
1249 .byte_width();
1250
1251 // allocate memory for the decimal array
1252 std::shared_ptr<Buffer> data;
1253 RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
1254
1255 // raw bytes that we can write to
1256 uint8_t* out_ptr = data->mutable_data();
1257
1258 // convert each FixedSizeBinary value to valid decimal bytes
1259 const int64_t null_count = fixed_size_binary_array.null_count();
1260 if (null_count > 0) {
1261 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1262 if (!fixed_size_binary_array.IsNull(i)) {
1263 RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
1264 }
1265 }
1266 } else {
1267 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1268 RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
1269 }
1270 }
1271
1272 *out = std::make_shared<::arrow::Decimal128Array>(
1273 type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
1274
1275 return Status::OK();
1276}
1277
1278template <>
1279Status ConvertToDecimal128<ByteArrayType>(const Array& array,
1280 const std::shared_ptr<::arrow::DataType>& type,
1281 MemoryPool* pool, std::shared_ptr<Array>* out) {
1282 const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
1283 const int64_t length = binary_array.length();
1284
1285 const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
1286 const int64_t type_length = decimal_type.byte_width();
1287
1288 std::shared_ptr<Buffer> data;
1289 RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
1290
1291 // raw bytes that we can write to
1292 uint8_t* out_ptr = data->mutable_data();
1293
1294 const int64_t null_count = binary_array.null_count();
1295
1296 // convert each BinaryArray value to valid decimal bytes
1297 for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
1298 int32_t record_len = 0;
1299 const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
1300
1301 if ((record_len < 0) || (record_len > type_length)) {
1302 return Status::Invalid("Invalid BYTE_ARRAY size");
1303 }
1304
1305 auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
1306 out_ptr_view[0] = 0;
1307 out_ptr_view[1] = 0;
1308
1309 // only convert rows that are not null if there are nulls, or
1310 // all rows, if there are not
1311 if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) {
1312 RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
1313 }
1314 }
1315
1316 *out = std::make_shared<::arrow::Decimal128Array>(
1317 type, length, data, binary_array.null_bitmap(), null_count);
1318 return Status::OK();
1319}
1320
1321/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
1322/// We do this by:
1323/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
1324/// 2. Allocating a buffer for the arrow::Decimal128Array
1325/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
1326/// representing the high and low bits of each decimal value.
1327template <typename ArrowType, typename ParquetType>
1328struct TransferFunctor<
1329 ArrowType, ParquetType,
1330 typename std::enable_if<std::is_same<ArrowType, ::arrow::Decimal128Type>::value &&
1331 (std::is_same<ParquetType, ByteArrayType>::value ||
1332 std::is_same<ParquetType, FLBAType>::value)>::type> {
1333 Status operator()(RecordReader* reader, MemoryPool* pool,
1334 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
1335 DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
1336
1337 ::arrow::ArrayVector chunks = reader->GetBuilderChunks();
1338
1339 for (size_t i = 0; i < chunks.size(); ++i) {
1340 std::shared_ptr<Array> chunk_as_decimal;
1341 RETURN_NOT_OK(
1342 ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
1343
1344 // Replace the chunk, which will hopefully also free memory as we go
1345 chunks[i] = chunk_as_decimal;
1346 }
1347 *out = std::make_shared<ChunkedArray>(chunks);
1348 return Status::OK();
1349 }
1350};
1351
1352/// \brief Convert an Int32 or Int64 array into a Decimal128Array
1353/// The parquet spec allows systems to write decimals in int32, int64 if the values are
1354/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
1355/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
1356template <typename ParquetIntegerType,
1357 typename = typename std::enable_if<
1358 std::is_same<ParquetIntegerType, Int32Type>::value ||
1359 std::is_same<ParquetIntegerType, Int64Type>::value>::type>
1360static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
1361 const std::shared_ptr<::arrow::DataType>& type,
1362 Datum* out) {
1363 DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
1364
1365 const int64_t length = reader->values_written();
1366
1367 using ElementType = typename ParquetIntegerType::c_type;
1368 static_assert(std::is_same<ElementType, int32_t>::value ||
1369 std::is_same<ElementType, int64_t>::value,
1370 "ElementType must be int32_t or int64_t");
1371
1372 const auto values = reinterpret_cast<const ElementType*>(reader->values());
1373
1374 const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
1375 const int64_t type_length = decimal_type.byte_width();
1376
1377 std::shared_ptr<Buffer> data;
1378 RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
1379 uint8_t* out_ptr = data->mutable_data();
1380
1381 using ::arrow::BitUtil::FromLittleEndian;
1382
1383 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1384 // sign/zero extend int32_t values, otherwise a no-op
1385 const auto value = static_cast<int64_t>(values[i]);
1386
1387 auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
1388
1389 // No-op on little endian machines, byteswap on big endian
1390 out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
1391
1392 // no need to byteswap here because we're sign/zero extending exactly 8 bytes
1393 out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
1394 }
1395
1396 if (reader->nullable_values()) {
1397 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
1398 *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
1399 reader->null_count());
1400 } else {
1401 *out = std::make_shared<::arrow::Decimal128Array>(type, length, data);
1402 }
1403 return Status::OK();
1404}
1405
1406template <>
1407struct TransferFunctor<::arrow::Decimal128Type, Int32Type> {
1408 Status operator()(RecordReader* reader, MemoryPool* pool,
1409 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
1410 return DecimalIntegerTransfer<Int32Type>(reader, pool, type, out);
1411 }
1412};
1413
1414template <>
1415struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
1416 Status operator()(RecordReader* reader, MemoryPool* pool,
1417 const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
1418 return DecimalIntegerTransfer<Int64Type>(reader, pool, type, out);
1419 }
1420};
1421
1422#define TRANSFER_DATA(ArrowType, ParquetType) \
1423 TransferFunctor<ArrowType, ParquetType> func; \
1424 RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), &result)); \
1425 RETURN_NOT_OK(WrapIntoListArray<ParquetType>(&result))
1426
1427#define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
1428 case ::arrow::Type::ENUM: { \
1429 TRANSFER_DATA(ArrowType, ParquetType); \
1430 } break;
1431
1432Status PrimitiveImpl::NextBatch(int64_t records_to_read,
1433 std::shared_ptr<ChunkedArray>* out) {
1434 try {
1435 // Pre-allocation gives much better performance for flat columns
1436 record_reader_->Reserve(records_to_read);
1437
1438 record_reader_->Reset();
1439 while (records_to_read > 0) {
1440 if (!record_reader_->HasMoreData()) {
1441 break;
1442 }
1443 int64_t records_read = record_reader_->ReadRecords(records_to_read);
1444 records_to_read -= records_read;
1445 if (records_read == 0) {
1446 NextRowGroup();
1447 }
1448 }
1449 } catch (const ::parquet::ParquetException& e) {
1450 return ::arrow::Status::IOError(e.what());
1451 }
1452
1453 Datum result;
1454 switch (field_->type()->id()) {
1455 TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
1456 TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
1457 TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
1458 TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
1459 TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
1460 TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
1461 TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
1462 TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
1463 TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
1464 TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
1465 TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
1466 TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
1467 TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
1468 TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
1469 TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
1470 TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
1471 case ::arrow::Type::NA: {
1472 result = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
1473 RETURN_NOT_OK(WrapIntoListArray<Int32Type>(&result));
1474 break;
1475 }
1476 case ::arrow::Type::DECIMAL: {
1477 switch (descr_->physical_type()) {
1478 case ::parquet::Type::INT32: {
1479 TRANSFER_DATA(::arrow::Decimal128Type, Int32Type);
1480 } break;
1481 case ::parquet::Type::INT64: {
1482 TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
1483 } break;
1484 case ::parquet::Type::BYTE_ARRAY: {
1485 TRANSFER_DATA(::arrow::Decimal128Type, ByteArrayType);
1486 } break;
1487 case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
1488 TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
1489 } break;
1490 default:
1491 return Status::Invalid(
1492 "Physical type for decimal must be int32, int64, byte array, or fixed "
1493 "length binary");
1494 }
1495 } break;
1496 case ::arrow::Type::TIMESTAMP: {
1497 ::arrow::TimestampType* timestamp_type =
1498 static_cast<::arrow::TimestampType*>(field_->type().get());
1499 switch (timestamp_type->unit()) {
1500 case ::arrow::TimeUnit::MILLI:
1501 case ::arrow::TimeUnit::MICRO: {
1502 TRANSFER_DATA(::arrow::TimestampType, Int64Type);
1503 } break;
1504 case ::arrow::TimeUnit::NANO: {
1505 TRANSFER_DATA(::arrow::TimestampType, Int96Type);
1506 } break;
1507 default:
1508 return Status::NotImplemented("TimeUnit not supported");
1509 }
1510 } break;
1511 TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
1512 TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
1513 default:
1514 return Status::NotImplemented("No support for reading columns of type ",
1515 field_->type()->ToString());
1516 }
1517
1518 DCHECK_NE(result.kind(), Datum::NONE);
1519
1520 if (result.kind() == Datum::ARRAY) {
1521 *out = std::make_shared<ChunkedArray>(result.make_array());
1522 } else if (result.kind() == Datum::CHUNKED_ARRAY) {
1523 *out = result.chunked_array();
1524 } else {
1525 DCHECK(false) << "Should be impossible";
1526 }
1527 return Status::OK();
1528}
1529
1530void PrimitiveImpl::NextRowGroup() {
1531 std::unique_ptr<PageReader> page_reader = input_->NextChunk();
1532 record_reader_->SetPageReader(std::move(page_reader));
1533}
1534
1535Status PrimitiveImpl::GetDefLevels(const int16_t** data, size_t* length) {
1536 *data = record_reader_->def_levels();
1537 *length = record_reader_->levels_written();
1538 return Status::OK();
1539}
1540
1541Status PrimitiveImpl::GetRepLevels(const int16_t** data, size_t* length) {
1542 *data = record_reader_->rep_levels();
1543 *length = record_reader_->levels_written();
1544 return Status::OK();
1545}
1546
1547ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl)
1548 : impl_(std::move(impl)) {}
1549
1550ColumnReader::~ColumnReader() {}
1551
1552Status ColumnReader::NextBatch(int64_t records_to_read,
1553 std::shared_ptr<ChunkedArray>* out) {
1554 return impl_->NextBatch(records_to_read, out);
1555}
1556
1557Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
1558 std::shared_ptr<ChunkedArray> chunked_out;
1559 RETURN_NOT_OK(impl_->NextBatch(records_to_read, &chunked_out));
1560 return GetSingleChunk(*chunked_out, out);
1561}
1562
1563// StructImpl methods
1564
1565Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
1566 int64_t* null_count_out) {
1567 std::shared_ptr<Buffer> null_bitmap;
1568 auto null_count = 0;
1569 const int16_t* def_levels_data;
1570 size_t def_levels_length;
1571 RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
1572 RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap));
1573 uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
1574 for (size_t i = 0; i < def_levels_length; i++) {
1575 if (def_levels_data[i] < struct_def_level_) {
1576 // Mark null
1577 null_count += 1;
1578 } else {
1579 DCHECK_EQ(def_levels_data[i], struct_def_level_);
1580 ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
1581 }
1582 }
1583
1584 *null_count_out = null_count;
1585 *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
1586 return Status::OK();
1587}
1588
1589// TODO(itaiin): Consider caching the results of this calculation -
1590// note that this is only used once for each read for now
1591Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
1592 *data = nullptr;
1593 if (children_.size() == 0) {
1594 // Empty struct
1595 *length = 0;
1596 return Status::OK();
1597 }
1598
1599 // We have at least one child
1600 const int16_t* child_def_levels;
1601 size_t child_length;
1602 RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
1603 auto size = child_length * sizeof(int16_t);
1604 RETURN_NOT_OK(AllocateResizableBuffer(pool_, size, &def_levels_buffer_));
1605 // Initialize with the minimal def level
1606 std::memset(def_levels_buffer_->mutable_data(), -1, size);
1607 auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
1608
1609 // When a struct is defined, all of its children def levels are at least at
1610 // nesting level, and def level equals nesting level.
1611 // When a struct is not defined, all of its children def levels are less than
1612 // the nesting level, and the def level equals max(children def levels)
1613 // All other possibilities are malformed definition data.
1614 for (auto& child : children_) {
1615 size_t current_child_length;
1616 RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, &current_child_length));
1617 DCHECK_EQ(child_length, current_child_length);
1618 for (size_t i = 0; i < child_length; i++) {
1619 // Check that value is either uninitialized, or current
1620 // and previous children def levels agree on the struct level
1621 DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) ==
1622 (child_def_levels[i] >= struct_def_level_)));
1623 result_levels[i] =
1624 std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
1625 }
1626 }
1627 *data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
1628 *length = child_length;
1629 return Status::OK();
1630}
1631
1632void StructImpl::InitField(
1633 const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) {
1634 // Make a shallow node to field conversion from the children fields
1635 std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
1636 for (size_t i = 0; i < children.size(); i++) {
1637 fields[i] = children[i]->field();
1638 }
1639 auto type = ::arrow::struct_(fields);
1640 field_ = ::arrow::field(node->name(), type);
1641}
1642
1643Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
1644 return Status::NotImplemented("GetRepLevels is not implemented for struct");
1645}
1646
1647Status StructImpl::NextBatch(int64_t records_to_read,
1648 std::shared_ptr<ChunkedArray>* out) {
1649 std::vector<std::shared_ptr<Array>> children_arrays;
1650 std::shared_ptr<Buffer> null_bitmap;
1651 int64_t null_count;
1652
1653 // Gather children arrays and def levels
1654 for (auto& child : children_) {
1655 std::shared_ptr<ChunkedArray> field;
1656 RETURN_NOT_OK(child->NextBatch(records_to_read, &field));
1657
1658 if (field->num_chunks() > 1) {
1659 return Status::Invalid("Chunked field reads not yet supported with StructArray");
1660 }
1661 children_arrays.push_back(field->chunk(0));
1662 }
1663
1664 RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
1665
1666 int64_t struct_length = children_arrays[0]->length();
1667 for (size_t i = 1; i < children_arrays.size(); ++i) {
1668 if (children_arrays[i]->length() != struct_length) {
1669 // TODO(wesm): This should really only occur if the Parquet file is
1670 // malformed. Should this be a DCHECK?
1671 return Status::Invalid("Struct children had different lengths");
1672 }
1673 }
1674
1675 auto result = std::make_shared<StructArray>(field()->type(), struct_length,
1676 children_arrays, null_bitmap, null_count);
1677 *out = std::make_shared<ChunkedArray>(result);
1678 return Status::OK();
1679}
1680
1681std::shared_ptr<ColumnChunkReader> RowGroupReader::Column(int column_index) {
1682 return std::shared_ptr<ColumnChunkReader>(
1683 new ColumnChunkReader(impl_, row_group_index_, column_index));
1684}
1685
1686Status RowGroupReader::ReadTable(const std::vector<int>& column_indices,
1687 std::shared_ptr<::arrow::Table>* out) {
1688 return impl_->ReadRowGroup(row_group_index_, column_indices, out);
1689}
1690
1691Status RowGroupReader::ReadTable(std::shared_ptr<::arrow::Table>* out) {
1692 return impl_->ReadRowGroup(row_group_index_, out);
1693}
1694
1695RowGroupReader::~RowGroupReader() {}
1696
1697RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index)
1698 : impl_(impl), row_group_index_(row_group_index) {}
1699
1700Status ColumnChunkReader::Read(std::shared_ptr<::arrow::ChunkedArray>* out) {
1701 return impl_->ReadColumnChunk(column_index_, row_group_index_, out);
1702}
1703
1704Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
1705 std::shared_ptr<ChunkedArray> chunked_out;
1706 RETURN_NOT_OK(impl_->ReadColumnChunk(column_index_, row_group_index_, &chunked_out));
1707 return GetSingleChunk(*chunked_out, out);
1708}
1709
1710ColumnChunkReader::~ColumnChunkReader() {}
1711
1712ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int row_group_index,
1713 int column_index)
1714 : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
1715
1716} // namespace arrow
1717} // namespace parquet
1718