1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/arrow/reader.h" |
19 | |
20 | #include <algorithm> |
21 | #include <climits> |
22 | #include <cstring> |
23 | #include <future> |
24 | #include <ostream> |
25 | #include <string> |
26 | #include <type_traits> |
27 | #include <utility> |
28 | #include <vector> |
29 | |
30 | #include "arrow/api.h" |
31 | #include "arrow/util/bit-util.h" |
32 | #include "arrow/util/int-util.h" |
33 | #include "arrow/util/logging.h" |
34 | #include "arrow/util/thread-pool.h" |
35 | |
36 | // For arrow::compute::Datum. This should perhaps be promoted. See ARROW-4022 |
37 | #include "arrow/compute/kernel.h" |
38 | |
39 | #include "parquet/arrow/record_reader.h" |
40 | #include "parquet/arrow/schema.h" |
41 | #include "parquet/column_reader.h" |
42 | #include "parquet/exception.h" |
43 | #include "parquet/file_reader.h" |
44 | #include "parquet/metadata.h" |
45 | #include "parquet/properties.h" |
46 | #include "parquet/schema.h" |
47 | #include "parquet/types.h" |
48 | #include "parquet/util/memory.h" |
49 | #include "parquet/util/schema-util.h" |
50 | |
51 | using arrow::Array; |
52 | using arrow::BooleanArray; |
53 | using arrow::ChunkedArray; |
54 | using arrow::Column; |
55 | using arrow::Field; |
56 | using arrow::Int32Array; |
57 | using arrow::ListArray; |
58 | using arrow::MemoryPool; |
59 | using arrow::ResizableBuffer; |
60 | using arrow::Status; |
61 | using arrow::StructArray; |
62 | using arrow::Table; |
63 | using arrow::TimestampArray; |
64 | |
65 | // For Array/ChunkedArray variant |
66 | using arrow::compute::Datum; |
67 | |
68 | using parquet::schema::Node; |
69 | |
70 | // Help reduce verbosity |
71 | using ParquetReader = parquet::ParquetFileReader; |
72 | using arrow::RecordBatchReader; |
73 | |
74 | using parquet::internal::RecordReader; |
75 | |
76 | namespace parquet { |
77 | namespace arrow { |
78 | |
79 | using ::arrow::BitUtil::BytesForBits; |
80 | using ::arrow::BitUtil::FromBigEndian; |
81 | using ::arrow::internal::SafeLeftShift; |
82 | |
83 | template <typename ArrowType> |
84 | using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType; |
85 | |
86 | namespace { |
87 | |
88 | Status GetSingleChunk(const ChunkedArray& chunked, std::shared_ptr<Array>* out) { |
89 | DCHECK_GT(chunked.num_chunks(), 0); |
90 | if (chunked.num_chunks() > 1) { |
91 | return Status::Invalid("Function call returned a chunked array" ); |
92 | } |
93 | *out = chunked.chunk(0); |
94 | return Status::OK(); |
95 | } |
96 | |
97 | } // namespace |
98 | |
99 | // ---------------------------------------------------------------------- |
100 | // Iteration utilities |
101 | |
102 | // Abstraction to decouple row group iteration details from the ColumnReader, |
103 | // so we can read only a single row group if we want |
104 | class FileColumnIterator { |
105 | public: |
106 | explicit FileColumnIterator(int column_index, ParquetFileReader* reader) |
107 | : column_index_(column_index), |
108 | reader_(reader), |
109 | schema_(reader->metadata()->schema()) {} |
110 | |
111 | virtual ~FileColumnIterator() {} |
112 | |
113 | virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0; |
114 | |
115 | const SchemaDescriptor* schema() const { return schema_; } |
116 | |
117 | const ColumnDescriptor* descr() const { return schema_->Column(column_index_); } |
118 | |
119 | std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); } |
120 | |
121 | int column_index() const { return column_index_; } |
122 | |
123 | protected: |
124 | int column_index_; |
125 | ParquetFileReader* reader_; |
126 | const SchemaDescriptor* schema_; |
127 | }; |
128 | |
129 | class AllRowGroupsIterator : public FileColumnIterator { |
130 | public: |
131 | explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader) |
132 | : FileColumnIterator(column_index, reader), next_row_group_(0) {} |
133 | |
134 | std::unique_ptr<::parquet::PageReader> NextChunk() override { |
135 | std::unique_ptr<::parquet::PageReader> result; |
136 | if (next_row_group_ < reader_->metadata()->num_row_groups()) { |
137 | result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_); |
138 | next_row_group_++; |
139 | } else { |
140 | result = nullptr; |
141 | } |
142 | return result; |
143 | } |
144 | |
145 | private: |
146 | int next_row_group_; |
147 | }; |
148 | |
149 | class SingleRowGroupIterator : public FileColumnIterator { |
150 | public: |
151 | explicit SingleRowGroupIterator(int column_index, int row_group_number, |
152 | ParquetFileReader* reader) |
153 | : FileColumnIterator(column_index, reader), |
154 | row_group_number_(row_group_number), |
155 | done_(false) {} |
156 | |
157 | std::unique_ptr<::parquet::PageReader> NextChunk() override { |
158 | if (done_) { |
159 | return nullptr; |
160 | } |
161 | |
162 | auto result = |
163 | reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_); |
164 | done_ = true; |
165 | return result; |
166 | } |
167 | |
168 | private: |
169 | int row_group_number_; |
170 | bool done_; |
171 | }; |
172 | |
173 | class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader { |
174 | public: |
175 | explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices, |
176 | const std::vector<int>& column_indices, |
177 | std::shared_ptr<::arrow::Schema> schema, |
178 | FileReader* reader) |
179 | : row_group_indices_(row_group_indices), |
180 | column_indices_(column_indices), |
181 | schema_(schema), |
182 | file_reader_(reader), |
183 | next_row_group_(0) {} |
184 | |
185 | ~RowGroupRecordBatchReader() override {} |
186 | |
187 | std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } |
188 | |
189 | Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override { |
190 | if (table_ != nullptr) { // one row group has been loaded |
191 | std::shared_ptr<::arrow::RecordBatch> tmp; |
192 | RETURN_NOT_OK(table_batch_reader_->ReadNext(&tmp)); |
193 | if (tmp != nullptr) { // some column chunks are left in table |
194 | *out = tmp; |
195 | return Status::OK(); |
196 | } else { // the entire table is consumed |
197 | table_batch_reader_.reset(); |
198 | table_.reset(); |
199 | } |
200 | } |
201 | |
202 | // all row groups has been consumed |
203 | if (next_row_group_ == row_group_indices_.size()) { |
204 | *out = nullptr; |
205 | return Status::OK(); |
206 | } |
207 | |
208 | RETURN_NOT_OK(file_reader_->ReadRowGroup(row_group_indices_[next_row_group_], |
209 | column_indices_, &table_)); |
210 | |
211 | next_row_group_++; |
212 | table_batch_reader_.reset(new ::arrow::TableBatchReader(*table_.get())); |
213 | return table_batch_reader_->ReadNext(out); |
214 | } |
215 | |
216 | private: |
217 | std::vector<int> row_group_indices_; |
218 | std::vector<int> column_indices_; |
219 | std::shared_ptr<::arrow::Schema> schema_; |
220 | FileReader* file_reader_; |
221 | size_t next_row_group_; |
222 | std::shared_ptr<::arrow::Table> table_; |
223 | std::unique_ptr<::arrow::TableBatchReader> table_batch_reader_; |
224 | }; |
225 | |
226 | // ---------------------------------------------------------------------- |
227 | // File reader implementation |
228 | |
229 | class FileReader::Impl { |
230 | public: |
231 | Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) |
232 | : pool_(pool), reader_(std::move(reader)), use_threads_(false) {} |
233 | |
234 | virtual ~Impl() {} |
235 | |
236 | Status GetColumn(int i, std::unique_ptr<ColumnReader>* out); |
237 | |
238 | Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out); |
239 | Status ReadSchemaField(int i, const std::vector<int>& indices, |
240 | std::shared_ptr<ChunkedArray>* out); |
241 | Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out); |
242 | Status ReadColumnChunk(int column_index, int row_group_index, |
243 | std::shared_ptr<ChunkedArray>* out); |
244 | |
245 | Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices, |
246 | int16_t def_level, |
247 | std::unique_ptr<ColumnReader::ColumnReaderImpl>* out); |
248 | |
249 | Status GetSchema(std::shared_ptr<::arrow::Schema>* out); |
250 | Status GetSchema(const std::vector<int>& indices, |
251 | std::shared_ptr<::arrow::Schema>* out); |
252 | Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table); |
253 | Status ReadRowGroup(int row_group_index, const std::vector<int>& indices, |
254 | std::shared_ptr<::arrow::Table>* out); |
255 | Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table); |
256 | Status ReadTable(std::shared_ptr<Table>* table); |
257 | Status ReadRowGroups(const std::vector<int>& row_groups, std::shared_ptr<Table>* table); |
258 | Status ReadRowGroups(const std::vector<int>& row_groups, |
259 | const std::vector<int>& indices, |
260 | std::shared_ptr<::arrow::Table>* out); |
261 | |
262 | bool CheckForFlatColumn(const ColumnDescriptor* descr); |
263 | bool CheckForFlatListColumn(const ColumnDescriptor* descr); |
264 | |
265 | const ParquetFileReader* parquet_reader() const { return reader_.get(); } |
266 | |
267 | int num_row_groups() const { return reader_->metadata()->num_row_groups(); } |
268 | |
269 | int num_columns() const { return reader_->metadata()->num_columns(); } |
270 | |
271 | void set_use_threads(bool use_threads) { use_threads_ = use_threads; } |
272 | |
273 | ParquetFileReader* reader() { return reader_.get(); } |
274 | |
275 | private: |
276 | MemoryPool* pool_; |
277 | std::unique_ptr<ParquetFileReader> reader_; |
278 | bool use_threads_; |
279 | }; |
280 | |
281 | class ColumnReader::ColumnReaderImpl { |
282 | public: |
283 | virtual ~ColumnReaderImpl() {} |
284 | virtual Status NextBatch(int64_t records_to_read, |
285 | std::shared_ptr<ChunkedArray>* out) = 0; |
286 | virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0; |
287 | virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0; |
288 | virtual const std::shared_ptr<Field> field() = 0; |
289 | }; |
290 | |
291 | // Reader implementation for primitive arrays |
292 | class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl { |
293 | public: |
294 | PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input) |
295 | : pool_(pool), input_(std::move(input)), descr_(input_->descr()) { |
296 | record_reader_ = RecordReader::Make(descr_, pool_); |
297 | DCHECK(NodeToField(*input_->descr()->schema_node(), &field_).ok()); |
298 | NextRowGroup(); |
299 | } |
300 | |
301 | Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override; |
302 | |
303 | template <typename ParquetType> |
304 | Status WrapIntoListArray(Datum* inout_array); |
305 | |
306 | Status GetDefLevels(const int16_t** data, size_t* length) override; |
307 | Status GetRepLevels(const int16_t** data, size_t* length) override; |
308 | |
309 | const std::shared_ptr<Field> field() override { return field_; } |
310 | |
311 | private: |
312 | void NextRowGroup(); |
313 | |
314 | MemoryPool* pool_; |
315 | std::unique_ptr<FileColumnIterator> input_; |
316 | const ColumnDescriptor* descr_; |
317 | |
318 | std::shared_ptr<RecordReader> record_reader_; |
319 | |
320 | std::shared_ptr<Field> field_; |
321 | }; |
322 | |
323 | // Reader implementation for struct array |
324 | class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl { |
325 | public: |
326 | explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children, |
327 | int16_t struct_def_level, MemoryPool* pool, const Node* node) |
328 | : children_(children), struct_def_level_(struct_def_level), pool_(pool) { |
329 | InitField(node, children); |
330 | } |
331 | |
332 | Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override; |
333 | Status GetDefLevels(const int16_t** data, size_t* length) override; |
334 | Status GetRepLevels(const int16_t** data, size_t* length) override; |
335 | const std::shared_ptr<Field> field() override { return field_; } |
336 | |
337 | private: |
338 | std::vector<std::shared_ptr<ColumnReaderImpl>> children_; |
339 | int16_t struct_def_level_; |
340 | MemoryPool* pool_; |
341 | std::shared_ptr<Field> field_; |
342 | std::shared_ptr<ResizableBuffer> def_levels_buffer_; |
343 | |
344 | Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count); |
345 | void InitField(const Node* node, |
346 | const std::vector<std::shared_ptr<ColumnReaderImpl>>& children); |
347 | }; |
348 | |
349 | FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) |
350 | : impl_(new FileReader::Impl(pool, std::move(reader))) {} |
351 | |
352 | FileReader::~FileReader() {} |
353 | |
354 | Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) { |
355 | std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get())); |
356 | |
357 | std::unique_ptr<ColumnReader::ColumnReaderImpl> impl( |
358 | new PrimitiveImpl(pool_, std::move(input))); |
359 | *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl))); |
360 | return Status::OK(); |
361 | } |
362 | |
363 | Status FileReader::Impl::GetReaderForNode( |
364 | int index, const Node* node, const std::vector<int>& indices, int16_t def_level, |
365 | std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) { |
366 | *out = nullptr; |
367 | |
368 | if (IsSimpleStruct(node)) { |
369 | const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node); |
370 | std::vector<std::shared_ptr<ColumnReader::ColumnReaderImpl>> children; |
371 | for (int i = 0; i < group->field_count(); i++) { |
372 | std::unique_ptr<ColumnReader::ColumnReaderImpl> child_reader; |
373 | // TODO(itaiin): Remove the -1 index hack when all types of nested reads |
374 | // are supported. This currently just signals the lower level reader resolution |
375 | // to abort |
376 | RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices, |
377 | static_cast<int16_t>(def_level + 1), &child_reader)); |
378 | if (child_reader != nullptr) { |
379 | children.push_back(std::move(child_reader)); |
380 | } |
381 | } |
382 | |
383 | if (children.size() > 0) { |
384 | *out = std::unique_ptr<ColumnReader::ColumnReaderImpl>( |
385 | new StructImpl(children, def_level, pool_, node)); |
386 | } |
387 | } else { |
388 | // This should be a flat field case - translate the field index to |
389 | // the correct column index by walking down to the leaf node |
390 | const Node* walker = node; |
391 | while (!walker->is_primitive()) { |
392 | DCHECK(walker->is_group()); |
393 | auto group = static_cast<const GroupNode*>(walker); |
394 | if (group->field_count() != 1) { |
395 | return Status::NotImplemented("lists with structs are not supported." ); |
396 | } |
397 | walker = group->field(0).get(); |
398 | } |
399 | auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker); |
400 | |
401 | // If the index of the column is found then a reader for the coliumn is needed. |
402 | // Otherwise *out keeps the nullptr value. |
403 | if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) { |
404 | std::unique_ptr<ColumnReader> reader; |
405 | RETURN_NOT_OK(GetColumn(column_index, &reader)); |
406 | *out = std::move(reader->impl_); |
407 | } |
408 | } |
409 | |
410 | return Status::OK(); |
411 | } |
412 | |
413 | Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) { |
414 | std::vector<int> indices(reader_->metadata()->num_columns()); |
415 | |
416 | for (size_t j = 0; j < indices.size(); ++j) { |
417 | indices[j] = static_cast<int>(j); |
418 | } |
419 | |
420 | return ReadSchemaField(i, indices, out); |
421 | } |
422 | |
423 | Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices, |
424 | std::shared_ptr<ChunkedArray>* out) { |
425 | auto parquet_schema = reader_->metadata()->schema(); |
426 | |
427 | auto node = parquet_schema->group_node()->field(i).get(); |
428 | std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl; |
429 | |
430 | RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl)); |
431 | if (reader_impl == nullptr) { |
432 | *out = nullptr; |
433 | return Status::OK(); |
434 | } |
435 | |
436 | std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl))); |
437 | |
438 | // TODO(wesm): This calculation doesn't make much sense when we have repeated |
439 | // schema nodes |
440 | int64_t records_to_read = 0; |
441 | |
442 | const FileMetaData& metadata = *reader_->metadata(); |
443 | for (int j = 0; j < metadata.num_row_groups(); j++) { |
444 | records_to_read += metadata.RowGroup(j)->ColumnChunk(i)->num_values(); |
445 | } |
446 | |
447 | return reader->NextBatch(records_to_read, out); |
448 | } |
449 | |
450 | Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) { |
451 | std::unique_ptr<ColumnReader> flat_column_reader; |
452 | RETURN_NOT_OK(GetColumn(i, &flat_column_reader)); |
453 | |
454 | int64_t records_to_read = 0; |
455 | for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) { |
456 | records_to_read += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values(); |
457 | } |
458 | |
459 | return flat_column_reader->NextBatch(records_to_read, out); |
460 | } |
461 | |
462 | Status FileReader::Impl::GetSchema(const std::vector<int>& indices, |
463 | std::shared_ptr<::arrow::Schema>* out) { |
464 | auto descr = reader_->metadata()->schema(); |
465 | auto parquet_key_value_metadata = reader_->metadata()->key_value_metadata(); |
466 | return FromParquetSchema(descr, indices, parquet_key_value_metadata, out); |
467 | } |
468 | |
469 | Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index, |
470 | std::shared_ptr<ChunkedArray>* out) { |
471 | auto rg_metadata = reader_->metadata()->RowGroup(row_group_index); |
472 | int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values(); |
473 | |
474 | std::unique_ptr<FileColumnIterator> input( |
475 | new SingleRowGroupIterator(column_index, row_group_index, reader_.get())); |
476 | |
477 | std::unique_ptr<ColumnReader::ColumnReaderImpl> impl( |
478 | new PrimitiveImpl(pool_, std::move(input))); |
479 | ColumnReader flat_column_reader(std::move(impl)); |
480 | |
481 | return flat_column_reader.NextBatch(records_to_read, out); |
482 | } |
483 | |
484 | Status FileReader::Impl::ReadRowGroup(int row_group_index, |
485 | const std::vector<int>& indices, |
486 | std::shared_ptr<Table>* out) { |
487 | std::shared_ptr<::arrow::Schema> schema; |
488 | RETURN_NOT_OK(GetSchema(indices, &schema)); |
489 | |
490 | auto rg_metadata = reader_->metadata()->RowGroup(row_group_index); |
491 | |
492 | int num_columns = static_cast<int>(indices.size()); |
493 | std::vector<std::shared_ptr<Column>> columns(num_columns); |
494 | |
495 | // TODO(wesm): Refactor to share more code with ReadTable |
496 | |
497 | auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, this](int i) { |
498 | int column_index = indices[i]; |
499 | |
500 | std::shared_ptr<ChunkedArray> array; |
501 | RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array)); |
502 | columns[i] = std::make_shared<Column>(schema->field(i), array); |
503 | return Status::OK(); |
504 | }; |
505 | |
506 | if (use_threads_) { |
507 | std::vector<std::future<Status>> futures; |
508 | auto pool = ::arrow::internal::GetCpuThreadPool(); |
509 | for (int i = 0; i < num_columns; i++) { |
510 | futures.push_back(pool->Submit(ReadColumnFunc, i)); |
511 | } |
512 | Status final_status = Status::OK(); |
513 | for (auto& fut : futures) { |
514 | Status st = fut.get(); |
515 | if (!st.ok()) { |
516 | final_status = std::move(st); |
517 | } |
518 | } |
519 | RETURN_NOT_OK(final_status); |
520 | } else { |
521 | for (int i = 0; i < num_columns; i++) { |
522 | RETURN_NOT_OK(ReadColumnFunc(i)); |
523 | } |
524 | } |
525 | |
526 | *out = Table::Make(schema, columns); |
527 | return Status::OK(); |
528 | } |
529 | |
530 | Status FileReader::Impl::ReadTable(const std::vector<int>& indices, |
531 | std::shared_ptr<Table>* out) { |
532 | std::shared_ptr<::arrow::Schema> schema; |
533 | RETURN_NOT_OK(GetSchema(indices, &schema)); |
534 | |
535 | // We only need to read schema fields which have columns indicated |
536 | // in the indices vector |
537 | std::vector<int> field_indices; |
538 | if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices, |
539 | &field_indices)) { |
540 | return Status::Invalid("Invalid column index" ); |
541 | } |
542 | |
543 | int num_fields = static_cast<int>(field_indices.size()); |
544 | std::vector<std::shared_ptr<Column>> columns(num_fields); |
545 | |
546 | auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) { |
547 | std::shared_ptr<ChunkedArray> array; |
548 | RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array)); |
549 | columns[i] = std::make_shared<Column>(schema->field(i), array); |
550 | return Status::OK(); |
551 | }; |
552 | |
553 | if (use_threads_) { |
554 | std::vector<std::future<Status>> futures; |
555 | auto pool = ::arrow::internal::GetCpuThreadPool(); |
556 | for (int i = 0; i < num_fields; i++) { |
557 | futures.push_back(pool->Submit(ReadColumnFunc, i)); |
558 | } |
559 | Status final_status = Status::OK(); |
560 | for (auto& fut : futures) { |
561 | Status st = fut.get(); |
562 | if (!st.ok()) { |
563 | final_status = std::move(st); |
564 | } |
565 | } |
566 | RETURN_NOT_OK(final_status); |
567 | } else { |
568 | for (int i = 0; i < num_fields; i++) { |
569 | RETURN_NOT_OK(ReadColumnFunc(i)); |
570 | } |
571 | } |
572 | |
573 | std::shared_ptr<Table> table = Table::Make(schema, columns); |
574 | RETURN_NOT_OK(table->Validate()); |
575 | *out = table; |
576 | return Status::OK(); |
577 | } |
578 | |
579 | Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) { |
580 | std::vector<int> indices(reader_->metadata()->num_columns()); |
581 | |
582 | for (size_t i = 0; i < indices.size(); ++i) { |
583 | indices[i] = static_cast<int>(i); |
584 | } |
585 | return ReadTable(indices, table); |
586 | } |
587 | |
588 | Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups, |
589 | const std::vector<int>& indices, |
590 | std::shared_ptr<Table>* table) { |
591 | std::vector<std::shared_ptr<Table>> tables(row_groups.size(), nullptr); |
592 | |
593 | for (size_t i = 0; i < row_groups.size(); ++i) { |
594 | RETURN_NOT_OK(ReadRowGroup(row_groups[i], indices, &tables[i])); |
595 | } |
596 | return ConcatenateTables(tables, table); |
597 | } |
598 | |
599 | Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups, |
600 | std::shared_ptr<Table>* table) { |
601 | std::vector<int> indices(reader_->metadata()->num_columns()); |
602 | |
603 | for (size_t i = 0; i < indices.size(); ++i) { |
604 | indices[i] = static_cast<int>(i); |
605 | } |
606 | return ReadRowGroups(row_groups, indices, table); |
607 | } |
608 | |
609 | Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) { |
610 | std::vector<int> indices(reader_->metadata()->num_columns()); |
611 | |
612 | for (size_t i = 0; i < indices.size(); ++i) { |
613 | indices[i] = static_cast<int>(i); |
614 | } |
615 | return ReadRowGroup(i, indices, table); |
616 | } |
617 | |
618 | // Static ctor |
619 | Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, |
620 | MemoryPool* allocator, const ReaderProperties& props, |
621 | const std::shared_ptr<FileMetaData>& metadata, |
622 | std::unique_ptr<FileReader>* reader) { |
623 | std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(file)); |
624 | std::unique_ptr<ParquetReader> pq_reader; |
625 | PARQUET_CATCH_NOT_OK(pq_reader = |
626 | ParquetReader::Open(std::move(io_wrapper), props, metadata)); |
627 | reader->reset(new FileReader(allocator, std::move(pq_reader))); |
628 | return Status::OK(); |
629 | } |
630 | |
631 | Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, |
632 | MemoryPool* allocator, std::unique_ptr<FileReader>* reader) { |
633 | return OpenFile(file, allocator, ::parquet::default_reader_properties(), nullptr, |
634 | reader); |
635 | } |
636 | |
637 | Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) { |
638 | return impl_->GetColumn(i, out); |
639 | } |
640 | |
641 | Status FileReader::GetSchema(const std::vector<int>& indices, |
642 | std::shared_ptr<::arrow::Schema>* out) { |
643 | return impl_->GetSchema(indices, out); |
644 | } |
645 | |
646 | Status FileReader::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) { |
647 | try { |
648 | return impl_->ReadColumn(i, out); |
649 | } catch (const ::parquet::ParquetException& e) { |
650 | return ::arrow::Status::IOError(e.what()); |
651 | } |
652 | } |
653 | |
654 | Status FileReader::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) { |
655 | try { |
656 | return impl_->ReadSchemaField(i, out); |
657 | } catch (const ::parquet::ParquetException& e) { |
658 | return ::arrow::Status::IOError(e.what()); |
659 | } |
660 | } |
661 | |
662 | Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) { |
663 | std::shared_ptr<ChunkedArray> chunked_out; |
664 | RETURN_NOT_OK(ReadColumn(i, &chunked_out)); |
665 | return GetSingleChunk(*chunked_out, out); |
666 | } |
667 | |
668 | Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) { |
669 | std::shared_ptr<ChunkedArray> chunked_out; |
670 | RETURN_NOT_OK(ReadSchemaField(i, &chunked_out)); |
671 | return GetSingleChunk(*chunked_out, out); |
672 | } |
673 | |
674 | Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices, |
675 | std::shared_ptr<RecordBatchReader>* out) { |
676 | std::vector<int> indices(impl_->num_columns()); |
677 | |
678 | for (size_t j = 0; j < indices.size(); ++j) { |
679 | indices[j] = static_cast<int>(j); |
680 | } |
681 | |
682 | return GetRecordBatchReader(row_group_indices, indices, out); |
683 | } |
684 | |
685 | Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices, |
686 | const std::vector<int>& column_indices, |
687 | std::shared_ptr<RecordBatchReader>* out) { |
688 | // column indicies check |
689 | std::shared_ptr<::arrow::Schema> schema; |
690 | RETURN_NOT_OK(GetSchema(column_indices, &schema)); |
691 | |
692 | // row group indices check |
693 | int max_num = num_row_groups(); |
694 | for (auto row_group_index : row_group_indices) { |
695 | if (row_group_index < 0 || row_group_index >= max_num) { |
696 | return Status::Invalid("Some index in row_group_indices is " , row_group_index, |
697 | ", which is either < 0 or >= num_row_groups(" , max_num, ")" ); |
698 | } |
699 | } |
700 | |
701 | *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices, |
702 | schema, this); |
703 | return Status::OK(); |
704 | } |
705 | |
706 | Status FileReader::ReadTable(std::shared_ptr<Table>* out) { |
707 | try { |
708 | return impl_->ReadTable(out); |
709 | } catch (const ::parquet::ParquetException& e) { |
710 | return ::arrow::Status::IOError(e.what()); |
711 | } |
712 | } |
713 | |
714 | Status FileReader::ReadTable(const std::vector<int>& indices, |
715 | std::shared_ptr<Table>* out) { |
716 | try { |
717 | return impl_->ReadTable(indices, out); |
718 | } catch (const ::parquet::ParquetException& e) { |
719 | return ::arrow::Status::IOError(e.what()); |
720 | } |
721 | } |
722 | |
723 | Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) { |
724 | try { |
725 | return impl_->ReadRowGroup(i, out); |
726 | } catch (const ::parquet::ParquetException& e) { |
727 | return ::arrow::Status::IOError(e.what()); |
728 | } |
729 | } |
730 | |
731 | Status FileReader::ReadRowGroup(int i, const std::vector<int>& indices, |
732 | std::shared_ptr<Table>* out) { |
733 | try { |
734 | return impl_->ReadRowGroup(i, indices, out); |
735 | } catch (const ::parquet::ParquetException& e) { |
736 | return ::arrow::Status::IOError(e.what()); |
737 | } |
738 | } |
739 | |
740 | Status FileReader::ReadRowGroups(const std::vector<int>& row_groups, |
741 | std::shared_ptr<Table>* out) { |
742 | try { |
743 | return impl_->ReadRowGroups(row_groups, out); |
744 | } catch (const ::parquet::ParquetException& e) { |
745 | return ::arrow::Status::IOError(e.what()); |
746 | } |
747 | } |
748 | |
749 | Status FileReader::ReadRowGroups(const std::vector<int>& row_groups, |
750 | const std::vector<int>& indices, |
751 | std::shared_ptr<Table>* out) { |
752 | try { |
753 | return impl_->ReadRowGroups(row_groups, indices, out); |
754 | } catch (const ::parquet::ParquetException& e) { |
755 | return ::arrow::Status::IOError(e.what()); |
756 | } |
757 | } |
758 | |
759 | std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) { |
760 | return std::shared_ptr<RowGroupReader>( |
761 | new RowGroupReader(impl_.get(), row_group_index)); |
762 | } |
763 | |
764 | int FileReader::num_row_groups() const { return impl_->num_row_groups(); } |
765 | |
766 | void FileReader::set_num_threads(int num_threads) {} |
767 | |
768 | void FileReader::set_use_threads(bool use_threads) { |
769 | impl_->set_use_threads(use_threads); |
770 | } |
771 | |
772 | Status FileReader::ScanContents(std::vector<int> columns, const int32_t column_batch_size, |
773 | int64_t* num_rows) { |
774 | try { |
775 | *num_rows = ScanFileContents(columns, column_batch_size, impl_->reader()); |
776 | return Status::OK(); |
777 | } catch (const ::parquet::ParquetException& e) { |
778 | return Status::IOError(e.what()); |
779 | } |
780 | } |
781 | |
782 | const ParquetFileReader* FileReader::parquet_reader() const { |
783 | return impl_->parquet_reader(); |
784 | } |
785 | |
786 | template <typename ParquetType> |
787 | Status PrimitiveImpl::WrapIntoListArray(Datum* inout_array) { |
788 | if (descr_->max_repetition_level() == 0) { |
789 | // Flat, no action |
790 | return Status::OK(); |
791 | } |
792 | |
793 | std::shared_ptr<Array> flat_array; |
794 | |
795 | // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is |
796 | // not yet implemented |
797 | if (inout_array->kind() == Datum::CHUNKED_ARRAY) { |
798 | if (inout_array->chunked_array()->num_chunks() > 1) { |
799 | return Status::NotImplemented( |
800 | "Nested data conversions not implemented for " |
801 | "chunked array outputs" ); |
802 | } |
803 | flat_array = inout_array->chunked_array()->chunk(0); |
804 | } else { |
805 | DCHECK_EQ(Datum::ARRAY, inout_array->kind()); |
806 | flat_array = inout_array->make_array(); |
807 | } |
808 | |
809 | const int16_t* def_levels = record_reader_->def_levels(); |
810 | const int16_t* rep_levels = record_reader_->rep_levels(); |
811 | const int64_t total_levels_read = record_reader_->levels_position(); |
812 | |
813 | std::shared_ptr<::arrow::Schema> arrow_schema; |
814 | RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()}, |
815 | input_->metadata()->key_value_metadata(), |
816 | &arrow_schema)); |
817 | std::shared_ptr<Field> current_field = arrow_schema->field(0); |
818 | |
819 | // Walk downwards to extract nullability |
820 | std::vector<bool> nullable; |
821 | std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders; |
822 | std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders; |
823 | nullable.push_back(current_field->nullable()); |
824 | while (current_field->type()->num_children() > 0) { |
825 | if (current_field->type()->num_children() > 1) { |
826 | return Status::NotImplemented("Fields with more than one child are not supported." ); |
827 | } else { |
828 | if (current_field->type()->id() != ::arrow::Type::LIST) { |
829 | return Status::NotImplemented("Currently only nesting with Lists is supported." ); |
830 | } |
831 | current_field = current_field->type()->child(0); |
832 | } |
833 | offset_builders.emplace_back( |
834 | std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_)); |
835 | valid_bits_builders.emplace_back( |
836 | std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_)); |
837 | nullable.push_back(current_field->nullable()); |
838 | } |
839 | |
840 | int64_t list_depth = offset_builders.size(); |
841 | // This describes the minimal definition that describes a level that |
842 | // reflects a value in the primitive values array. |
843 | int16_t values_def_level = descr_->max_definition_level(); |
844 | if (nullable[nullable.size() - 1]) { |
845 | values_def_level--; |
846 | } |
847 | |
848 | // The definition levels that are needed so that a list is declared |
849 | // as empty and not null. |
850 | std::vector<int16_t> empty_def_level(list_depth); |
851 | int def_level = 0; |
852 | for (int i = 0; i < list_depth; i++) { |
853 | if (nullable[i]) { |
854 | def_level++; |
855 | } |
856 | empty_def_level[i] = static_cast<int16_t>(def_level); |
857 | def_level++; |
858 | } |
859 | |
860 | int32_t values_offset = 0; |
861 | std::vector<int64_t> null_counts(list_depth, 0); |
862 | for (int64_t i = 0; i < total_levels_read; i++) { |
863 | int16_t rep_level = rep_levels[i]; |
864 | if (rep_level < descr_->max_repetition_level()) { |
865 | for (int64_t j = rep_level; j < list_depth; j++) { |
866 | if (j == (list_depth - 1)) { |
867 | RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); |
868 | } else { |
869 | RETURN_NOT_OK(offset_builders[j]->Append( |
870 | static_cast<int32_t>(offset_builders[j + 1]->length()))); |
871 | } |
872 | |
873 | if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) { |
874 | RETURN_NOT_OK(valid_bits_builders[j]->Append(false)); |
875 | null_counts[j]++; |
876 | break; |
877 | } else { |
878 | RETURN_NOT_OK(valid_bits_builders[j]->Append(true)); |
879 | if (empty_def_level[j] == def_levels[i]) { |
880 | break; |
881 | } |
882 | } |
883 | } |
884 | } |
885 | if (def_levels[i] >= values_def_level) { |
886 | values_offset++; |
887 | } |
888 | } |
889 | // Add the final offset to all lists |
890 | for (int64_t j = 0; j < list_depth; j++) { |
891 | if (j == (list_depth - 1)) { |
892 | RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); |
893 | } else { |
894 | RETURN_NOT_OK(offset_builders[j]->Append( |
895 | static_cast<int32_t>(offset_builders[j + 1]->length()))); |
896 | } |
897 | } |
898 | |
899 | std::vector<std::shared_ptr<Buffer>> offsets; |
900 | std::vector<std::shared_ptr<Buffer>> valid_bits; |
901 | std::vector<int64_t> list_lengths; |
902 | for (int64_t j = 0; j < list_depth; j++) { |
903 | list_lengths.push_back(offset_builders[j]->length() - 1); |
904 | std::shared_ptr<Array> array; |
905 | RETURN_NOT_OK(offset_builders[j]->Finish(&array)); |
906 | offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values()); |
907 | RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array)); |
908 | valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values()); |
909 | } |
910 | |
911 | std::shared_ptr<Array> output = flat_array; |
912 | for (int64_t j = list_depth - 1; j >= 0; j--) { |
913 | auto list_type = |
914 | ::arrow::list(::arrow::field("item" , output->type(), nullable[j + 1])); |
915 | output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j], |
916 | output, valid_bits[j], null_counts[j]); |
917 | } |
918 | *inout_array = output; |
919 | return Status::OK(); |
920 | } |
921 | |
922 | template <typename ArrowType, typename ParquetType> |
923 | struct supports_fast_path_impl { |
924 | using ArrowCType = typename ArrowType::c_type; |
925 | using ParquetCType = typename ParquetType::c_type; |
926 | static constexpr bool value = std::is_same<ArrowCType, ParquetCType>::value; |
927 | }; |
928 | |
929 | template <typename ArrowType> |
930 | struct supports_fast_path_impl<ArrowType, ByteArrayType> { |
931 | static constexpr bool value = false; |
932 | }; |
933 | |
934 | template <typename ArrowType> |
935 | struct supports_fast_path_impl<ArrowType, FLBAType> { |
936 | static constexpr bool value = false; |
937 | }; |
938 | |
939 | template <typename ArrowType, typename ParquetType> |
940 | using supports_fast_path = |
941 | typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type; |
942 | |
943 | template <typename ArrowType, typename ParquetType, typename Enable = void> |
944 | struct TransferFunctor { |
945 | using ArrowCType = typename ArrowType::c_type; |
946 | using ParquetCType = typename ParquetType::c_type; |
947 | |
948 | Status operator()(RecordReader* reader, MemoryPool* pool, |
949 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
950 | static_assert(!std::is_same<ArrowType, ::arrow::Int32Type>::value, |
951 | "The fast path transfer functor should be used " |
952 | "for primitive values" ); |
953 | |
954 | int64_t length = reader->values_written(); |
955 | std::shared_ptr<Buffer> data; |
956 | RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data)); |
957 | |
958 | auto values = reinterpret_cast<const ParquetCType*>(reader->values()); |
959 | auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data()); |
960 | std::copy(values, values + length, out_ptr); |
961 | |
962 | if (reader->nullable_values()) { |
963 | std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid(); |
964 | *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid, |
965 | reader->null_count()); |
966 | } else { |
967 | *out = std::make_shared<ArrayType<ArrowType>>(type, length, data); |
968 | } |
969 | return Status::OK(); |
970 | } |
971 | }; |
972 | |
973 | template <typename ArrowType, typename ParquetType> |
974 | struct TransferFunctor<ArrowType, ParquetType, |
975 | supports_fast_path<ArrowType, ParquetType>> { |
976 | Status operator()(RecordReader* reader, MemoryPool* pool, |
977 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
978 | int64_t length = reader->values_written(); |
979 | std::shared_ptr<ResizableBuffer> values = reader->ReleaseValues(); |
980 | |
981 | if (reader->nullable_values()) { |
982 | std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid(); |
983 | *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid, |
984 | reader->null_count()); |
985 | } else { |
986 | *out = std::make_shared<ArrayType<ArrowType>>(type, length, values); |
987 | } |
988 | return Status::OK(); |
989 | } |
990 | }; |
991 | |
992 | template <> |
993 | struct TransferFunctor<::arrow::BooleanType, BooleanType> { |
994 | Status operator()(RecordReader* reader, MemoryPool* pool, |
995 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
996 | int64_t length = reader->values_written(); |
997 | std::shared_ptr<Buffer> data; |
998 | |
999 | const int64_t buffer_size = BytesForBits(length); |
1000 | RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data)); |
1001 | |
1002 | // Transfer boolean values to packed bitmap |
1003 | auto values = reinterpret_cast<const bool*>(reader->values()); |
1004 | uint8_t* data_ptr = data->mutable_data(); |
1005 | memset(data_ptr, 0, buffer_size); |
1006 | |
1007 | for (int64_t i = 0; i < length; i++) { |
1008 | if (values[i]) { |
1009 | ::arrow::BitUtil::SetBit(data_ptr, i); |
1010 | } |
1011 | } |
1012 | |
1013 | if (reader->nullable_values()) { |
1014 | std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid(); |
1015 | RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false)); |
1016 | *out = std::make_shared<BooleanArray>(type, length, data, is_valid, |
1017 | reader->null_count()); |
1018 | } else { |
1019 | *out = std::make_shared<BooleanArray>(type, length, data); |
1020 | } |
1021 | return Status::OK(); |
1022 | } |
1023 | }; |
1024 | |
1025 | template <> |
1026 | struct TransferFunctor<::arrow::TimestampType, Int96Type> { |
1027 | Status operator()(RecordReader* reader, MemoryPool* pool, |
1028 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
1029 | int64_t length = reader->values_written(); |
1030 | auto values = reinterpret_cast<const Int96*>(reader->values()); |
1031 | |
1032 | std::shared_ptr<Buffer> data; |
1033 | RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data)); |
1034 | |
1035 | auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data()); |
1036 | for (int64_t i = 0; i < length; i++) { |
1037 | *data_ptr++ = Int96GetNanoSeconds(values[i]); |
1038 | } |
1039 | |
1040 | if (reader->nullable_values()) { |
1041 | std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid(); |
1042 | *out = std::make_shared<TimestampArray>(type, length, data, is_valid, |
1043 | reader->null_count()); |
1044 | } else { |
1045 | *out = std::make_shared<TimestampArray>(type, length, data); |
1046 | } |
1047 | |
1048 | return Status::OK(); |
1049 | } |
1050 | }; |
1051 | |
1052 | template <> |
1053 | struct TransferFunctor<::arrow::Date64Type, Int32Type> { |
1054 | Status operator()(RecordReader* reader, MemoryPool* pool, |
1055 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
1056 | int64_t length = reader->values_written(); |
1057 | auto values = reinterpret_cast<const int32_t*>(reader->values()); |
1058 | |
1059 | std::shared_ptr<Buffer> data; |
1060 | RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data)); |
1061 | auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data()); |
1062 | |
1063 | for (int64_t i = 0; i < length; i++) { |
1064 | *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay; |
1065 | } |
1066 | |
1067 | if (reader->nullable_values()) { |
1068 | std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid(); |
1069 | *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid, |
1070 | reader->null_count()); |
1071 | } else { |
1072 | *out = std::make_shared<::arrow::Date64Array>(type, length, data); |
1073 | } |
1074 | return Status::OK(); |
1075 | } |
1076 | }; |
1077 | |
1078 | template <typename ArrowType, typename ParquetType> |
1079 | struct TransferFunctor< |
1080 | ArrowType, ParquetType, |
1081 | typename std::enable_if< |
1082 | (std::is_base_of<::arrow::BinaryType, ArrowType>::value || |
1083 | std::is_same<::arrow::FixedSizeBinaryType, ArrowType>::value) && |
1084 | (std::is_same<ParquetType, ByteArrayType>::value || |
1085 | std::is_same<ParquetType, FLBAType>::value)>::type> { |
1086 | Status operator()(RecordReader* reader, MemoryPool* pool, |
1087 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
1088 | std::vector<std::shared_ptr<Array>> chunks = reader->GetBuilderChunks(); |
1089 | |
1090 | if (type->id() == ::arrow::Type::STRING) { |
1091 | // Convert from BINARY type to STRING |
1092 | for (size_t i = 0; i < chunks.size(); ++i) { |
1093 | auto new_data = chunks[i]->data()->Copy(); |
1094 | new_data->type = type; |
1095 | chunks[i] = ::arrow::MakeArray(new_data); |
1096 | } |
1097 | } |
1098 | *out = std::make_shared<ChunkedArray>(chunks); |
1099 | return Status::OK(); |
1100 | } |
1101 | }; |
1102 | |
1103 | static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) { |
1104 | const int32_t length = stop - start; |
1105 | |
1106 | DCHECK_GE(length, 0); |
1107 | DCHECK_LE(length, 8); |
1108 | |
1109 | switch (length) { |
1110 | case 0: |
1111 | return 0; |
1112 | case 1: |
1113 | return bytes[start]; |
1114 | case 2: |
1115 | return FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start)); |
1116 | case 3: { |
1117 | const uint64_t first_two_bytes = |
1118 | FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start)); |
1119 | const uint64_t last_byte = bytes[stop - 1]; |
1120 | return first_two_bytes << 8 | last_byte; |
1121 | } |
1122 | case 4: |
1123 | return FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start)); |
1124 | case 5: { |
1125 | const uint64_t first_four_bytes = |
1126 | FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start)); |
1127 | const uint64_t last_byte = bytes[stop - 1]; |
1128 | return first_four_bytes << 8 | last_byte; |
1129 | } |
1130 | case 6: { |
1131 | const uint64_t first_four_bytes = |
1132 | FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start)); |
1133 | const uint64_t last_two_bytes = |
1134 | FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4)); |
1135 | return first_four_bytes << 16 | last_two_bytes; |
1136 | } |
1137 | case 7: { |
1138 | const uint64_t first_four_bytes = |
1139 | FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start)); |
1140 | const uint64_t second_two_bytes = |
1141 | FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4)); |
1142 | const uint64_t last_byte = bytes[stop - 1]; |
1143 | return first_four_bytes << 24 | second_two_bytes << 8 | last_byte; |
1144 | } |
1145 | case 8: |
1146 | return FromBigEndian(*reinterpret_cast<const uint64_t*>(bytes + start)); |
1147 | default: { |
1148 | DCHECK(false); |
1149 | return UINT64_MAX; |
1150 | } |
1151 | } |
1152 | } |
1153 | |
1154 | static constexpr int32_t kMinDecimalBytes = 1; |
1155 | static constexpr int32_t kMaxDecimalBytes = 16; |
1156 | |
1157 | /// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one |
1158 | /// uint64_t (low bits). |
1159 | static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length, |
1160 | int64_t* out_high, uint64_t* out_low) { |
1161 | DCHECK_GE(length, kMinDecimalBytes); |
1162 | DCHECK_LE(length, kMaxDecimalBytes); |
1163 | |
1164 | // XXX This code is copied from Decimal::FromBigEndian |
1165 | |
1166 | int64_t high, low; |
1167 | |
1168 | // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the |
1169 | // sign bit. |
1170 | const bool is_negative = static_cast<int8_t>(bytes[0]) < 0; |
1171 | |
1172 | // 1. Extract the high bytes |
1173 | // Stop byte of the high bytes |
1174 | const int32_t high_bits_offset = std::max(0, length - 8); |
1175 | const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset); |
1176 | |
1177 | if (high_bits_offset == 8) { |
1178 | // Avoid undefined shift by 64 below |
1179 | high = high_bits; |
1180 | } else { |
1181 | high = -1 * (is_negative && length < kMaxDecimalBytes); |
1182 | // Shift left enough bits to make room for the incoming int64_t |
1183 | high = SafeLeftShift(high, high_bits_offset * CHAR_BIT); |
1184 | // Preserve the upper bits by inplace OR-ing the int64_t |
1185 | high |= high_bits; |
1186 | } |
1187 | |
1188 | // 2. Extract the low bytes |
1189 | // Stop byte of the low bytes |
1190 | const int32_t low_bits_offset = std::min(length, 8); |
1191 | const auto low_bits = BytesToInteger(bytes, high_bits_offset, length); |
1192 | |
1193 | if (low_bits_offset == 8) { |
1194 | // Avoid undefined shift by 64 below |
1195 | low = low_bits; |
1196 | } else { |
1197 | // Sign extend the low bits if necessary |
1198 | low = -1 * (is_negative && length < 8); |
1199 | // Shift left enough bits to make room for the incoming int64_t |
1200 | low = SafeLeftShift(low, low_bits_offset * CHAR_BIT); |
1201 | // Preserve the upper bits by inplace OR-ing the int64_t |
1202 | low |= low_bits; |
1203 | } |
1204 | |
1205 | *out_high = high; |
1206 | *out_low = static_cast<uint64_t>(low); |
1207 | } |
1208 | |
1209 | static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, |
1210 | uint8_t* out_buf) { |
1211 | // view the first 8 bytes as an unsigned 64-bit integer |
1212 | auto low = reinterpret_cast<uint64_t*>(out_buf); |
1213 | |
1214 | // view the second 8 bytes as a signed 64-bit integer |
1215 | auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t)); |
1216 | |
1217 | // Convert the fixed size binary array bytes into a Decimal128 compatible layout |
1218 | BytesToIntegerPair(value, byte_width, high, low); |
1219 | } |
1220 | |
1221 | // ---------------------------------------------------------------------- |
1222 | // BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 |
1223 | |
1224 | template <typename T> |
1225 | Status ConvertToDecimal128(const Array& array, const std::shared_ptr<::arrow::DataType>&, |
1226 | MemoryPool* pool, std::shared_ptr<Array>*) { |
1227 | return Status::NotImplemented("not implemented" ); |
1228 | } |
1229 | |
1230 | template <> |
1231 | Status ConvertToDecimal128<FLBAType>(const Array& array, |
1232 | const std::shared_ptr<::arrow::DataType>& type, |
1233 | MemoryPool* pool, std::shared_ptr<Array>* out) { |
1234 | const auto& fixed_size_binary_array = |
1235 | static_cast<const ::arrow::FixedSizeBinaryArray&>(array); |
1236 | |
1237 | // The byte width of each decimal value |
1238 | const int32_t type_length = |
1239 | static_cast<const ::arrow::Decimal128Type&>(*type).byte_width(); |
1240 | |
1241 | // number of elements in the entire array |
1242 | const int64_t length = fixed_size_binary_array.length(); |
1243 | |
1244 | // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time |
1245 | // this will be different from the decimal array width because we write the minimum |
1246 | // number of bytes necessary to represent a given precision |
1247 | const int32_t byte_width = |
1248 | static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type()) |
1249 | .byte_width(); |
1250 | |
1251 | // allocate memory for the decimal array |
1252 | std::shared_ptr<Buffer> data; |
1253 | RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); |
1254 | |
1255 | // raw bytes that we can write to |
1256 | uint8_t* out_ptr = data->mutable_data(); |
1257 | |
1258 | // convert each FixedSizeBinary value to valid decimal bytes |
1259 | const int64_t null_count = fixed_size_binary_array.null_count(); |
1260 | if (null_count > 0) { |
1261 | for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { |
1262 | if (!fixed_size_binary_array.IsNull(i)) { |
1263 | RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr); |
1264 | } |
1265 | } |
1266 | } else { |
1267 | for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { |
1268 | RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr); |
1269 | } |
1270 | } |
1271 | |
1272 | *out = std::make_shared<::arrow::Decimal128Array>( |
1273 | type, length, data, fixed_size_binary_array.null_bitmap(), null_count); |
1274 | |
1275 | return Status::OK(); |
1276 | } |
1277 | |
1278 | template <> |
1279 | Status ConvertToDecimal128<ByteArrayType>(const Array& array, |
1280 | const std::shared_ptr<::arrow::DataType>& type, |
1281 | MemoryPool* pool, std::shared_ptr<Array>* out) { |
1282 | const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array); |
1283 | const int64_t length = binary_array.length(); |
1284 | |
1285 | const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type); |
1286 | const int64_t type_length = decimal_type.byte_width(); |
1287 | |
1288 | std::shared_ptr<Buffer> data; |
1289 | RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); |
1290 | |
1291 | // raw bytes that we can write to |
1292 | uint8_t* out_ptr = data->mutable_data(); |
1293 | |
1294 | const int64_t null_count = binary_array.null_count(); |
1295 | |
1296 | // convert each BinaryArray value to valid decimal bytes |
1297 | for (int64_t i = 0; i < length; i++, out_ptr += type_length) { |
1298 | int32_t record_len = 0; |
1299 | const uint8_t* record_loc = binary_array.GetValue(i, &record_len); |
1300 | |
1301 | if ((record_len < 0) || (record_len > type_length)) { |
1302 | return Status::Invalid("Invalid BYTE_ARRAY size" ); |
1303 | } |
1304 | |
1305 | auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr); |
1306 | out_ptr_view[0] = 0; |
1307 | out_ptr_view[1] = 0; |
1308 | |
1309 | // only convert rows that are not null if there are nulls, or |
1310 | // all rows, if there are not |
1311 | if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) { |
1312 | RawBytesToDecimalBytes(record_loc, record_len, out_ptr); |
1313 | } |
1314 | } |
1315 | |
1316 | *out = std::make_shared<::arrow::Decimal128Array>( |
1317 | type, length, data, binary_array.null_bitmap(), null_count); |
1318 | return Status::OK(); |
1319 | } |
1320 | |
1321 | /// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array |
1322 | /// We do this by: |
1323 | /// 1. Creating an arrow::BinaryArray from the RecordReader's builder |
1324 | /// 2. Allocating a buffer for the arrow::Decimal128Array |
1325 | /// 3. Converting the big-endian bytes in each BinaryArray entry to two integers |
1326 | /// representing the high and low bits of each decimal value. |
1327 | template <typename ArrowType, typename ParquetType> |
1328 | struct TransferFunctor< |
1329 | ArrowType, ParquetType, |
1330 | typename std::enable_if<std::is_same<ArrowType, ::arrow::Decimal128Type>::value && |
1331 | (std::is_same<ParquetType, ByteArrayType>::value || |
1332 | std::is_same<ParquetType, FLBAType>::value)>::type> { |
1333 | Status operator()(RecordReader* reader, MemoryPool* pool, |
1334 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
1335 | DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); |
1336 | |
1337 | ::arrow::ArrayVector chunks = reader->GetBuilderChunks(); |
1338 | |
1339 | for (size_t i = 0; i < chunks.size(); ++i) { |
1340 | std::shared_ptr<Array> chunk_as_decimal; |
1341 | RETURN_NOT_OK( |
1342 | ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal)); |
1343 | |
1344 | // Replace the chunk, which will hopefully also free memory as we go |
1345 | chunks[i] = chunk_as_decimal; |
1346 | } |
1347 | *out = std::make_shared<ChunkedArray>(chunks); |
1348 | return Status::OK(); |
1349 | } |
1350 | }; |
1351 | |
1352 | /// \brief Convert an Int32 or Int64 array into a Decimal128Array |
1353 | /// The parquet spec allows systems to write decimals in int32, int64 if the values are |
1354 | /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. |
1355 | /// This function implements the conversion from int32 and int64 arrays to decimal arrays. |
1356 | template <typename ParquetIntegerType, |
1357 | typename = typename std::enable_if< |
1358 | std::is_same<ParquetIntegerType, Int32Type>::value || |
1359 | std::is_same<ParquetIntegerType, Int64Type>::value>::type> |
1360 | static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, |
1361 | const std::shared_ptr<::arrow::DataType>& type, |
1362 | Datum* out) { |
1363 | DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); |
1364 | |
1365 | const int64_t length = reader->values_written(); |
1366 | |
1367 | using ElementType = typename ParquetIntegerType::c_type; |
1368 | static_assert(std::is_same<ElementType, int32_t>::value || |
1369 | std::is_same<ElementType, int64_t>::value, |
1370 | "ElementType must be int32_t or int64_t" ); |
1371 | |
1372 | const auto values = reinterpret_cast<const ElementType*>(reader->values()); |
1373 | |
1374 | const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type); |
1375 | const int64_t type_length = decimal_type.byte_width(); |
1376 | |
1377 | std::shared_ptr<Buffer> data; |
1378 | RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); |
1379 | uint8_t* out_ptr = data->mutable_data(); |
1380 | |
1381 | using ::arrow::BitUtil::FromLittleEndian; |
1382 | |
1383 | for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { |
1384 | // sign/zero extend int32_t values, otherwise a no-op |
1385 | const auto value = static_cast<int64_t>(values[i]); |
1386 | |
1387 | auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr); |
1388 | |
1389 | // No-op on little endian machines, byteswap on big endian |
1390 | out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value)); |
1391 | |
1392 | // no need to byteswap here because we're sign/zero extending exactly 8 bytes |
1393 | out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0); |
1394 | } |
1395 | |
1396 | if (reader->nullable_values()) { |
1397 | std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid(); |
1398 | *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid, |
1399 | reader->null_count()); |
1400 | } else { |
1401 | *out = std::make_shared<::arrow::Decimal128Array>(type, length, data); |
1402 | } |
1403 | return Status::OK(); |
1404 | } |
1405 | |
1406 | template <> |
1407 | struct TransferFunctor<::arrow::Decimal128Type, Int32Type> { |
1408 | Status operator()(RecordReader* reader, MemoryPool* pool, |
1409 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
1410 | return DecimalIntegerTransfer<Int32Type>(reader, pool, type, out); |
1411 | } |
1412 | }; |
1413 | |
1414 | template <> |
1415 | struct TransferFunctor<::arrow::Decimal128Type, Int64Type> { |
1416 | Status operator()(RecordReader* reader, MemoryPool* pool, |
1417 | const std::shared_ptr<::arrow::DataType>& type, Datum* out) { |
1418 | return DecimalIntegerTransfer<Int64Type>(reader, pool, type, out); |
1419 | } |
1420 | }; |
1421 | |
1422 | #define TRANSFER_DATA(ArrowType, ParquetType) \ |
1423 | TransferFunctor<ArrowType, ParquetType> func; \ |
1424 | RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), &result)); \ |
1425 | RETURN_NOT_OK(WrapIntoListArray<ParquetType>(&result)) |
1426 | |
1427 | #define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \ |
1428 | case ::arrow::Type::ENUM: { \ |
1429 | TRANSFER_DATA(ArrowType, ParquetType); \ |
1430 | } break; |
1431 | |
1432 | Status PrimitiveImpl::NextBatch(int64_t records_to_read, |
1433 | std::shared_ptr<ChunkedArray>* out) { |
1434 | try { |
1435 | // Pre-allocation gives much better performance for flat columns |
1436 | record_reader_->Reserve(records_to_read); |
1437 | |
1438 | record_reader_->Reset(); |
1439 | while (records_to_read > 0) { |
1440 | if (!record_reader_->HasMoreData()) { |
1441 | break; |
1442 | } |
1443 | int64_t records_read = record_reader_->ReadRecords(records_to_read); |
1444 | records_to_read -= records_read; |
1445 | if (records_read == 0) { |
1446 | NextRowGroup(); |
1447 | } |
1448 | } |
1449 | } catch (const ::parquet::ParquetException& e) { |
1450 | return ::arrow::Status::IOError(e.what()); |
1451 | } |
1452 | |
1453 | Datum result; |
1454 | switch (field_->type()->id()) { |
1455 | TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType) |
1456 | TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type) |
1457 | TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type) |
1458 | TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type) |
1459 | TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type) |
1460 | TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type) |
1461 | TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type) |
1462 | TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type) |
1463 | TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type) |
1464 | TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType) |
1465 | TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) |
1466 | TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType) |
1467 | TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType) |
1468 | TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type) |
1469 | TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type) |
1470 | TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType) |
1471 | case ::arrow::Type::NA: { |
1472 | result = std::make_shared<::arrow::NullArray>(record_reader_->values_written()); |
1473 | RETURN_NOT_OK(WrapIntoListArray<Int32Type>(&result)); |
1474 | break; |
1475 | } |
1476 | case ::arrow::Type::DECIMAL: { |
1477 | switch (descr_->physical_type()) { |
1478 | case ::parquet::Type::INT32: { |
1479 | TRANSFER_DATA(::arrow::Decimal128Type, Int32Type); |
1480 | } break; |
1481 | case ::parquet::Type::INT64: { |
1482 | TRANSFER_DATA(::arrow::Decimal128Type, Int64Type); |
1483 | } break; |
1484 | case ::parquet::Type::BYTE_ARRAY: { |
1485 | TRANSFER_DATA(::arrow::Decimal128Type, ByteArrayType); |
1486 | } break; |
1487 | case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { |
1488 | TRANSFER_DATA(::arrow::Decimal128Type, FLBAType); |
1489 | } break; |
1490 | default: |
1491 | return Status::Invalid( |
1492 | "Physical type for decimal must be int32, int64, byte array, or fixed " |
1493 | "length binary" ); |
1494 | } |
1495 | } break; |
1496 | case ::arrow::Type::TIMESTAMP: { |
1497 | ::arrow::TimestampType* timestamp_type = |
1498 | static_cast<::arrow::TimestampType*>(field_->type().get()); |
1499 | switch (timestamp_type->unit()) { |
1500 | case ::arrow::TimeUnit::MILLI: |
1501 | case ::arrow::TimeUnit::MICRO: { |
1502 | TRANSFER_DATA(::arrow::TimestampType, Int64Type); |
1503 | } break; |
1504 | case ::arrow::TimeUnit::NANO: { |
1505 | TRANSFER_DATA(::arrow::TimestampType, Int96Type); |
1506 | } break; |
1507 | default: |
1508 | return Status::NotImplemented("TimeUnit not supported" ); |
1509 | } |
1510 | } break; |
1511 | TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type) |
1512 | TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type) |
1513 | default: |
1514 | return Status::NotImplemented("No support for reading columns of type " , |
1515 | field_->type()->ToString()); |
1516 | } |
1517 | |
1518 | DCHECK_NE(result.kind(), Datum::NONE); |
1519 | |
1520 | if (result.kind() == Datum::ARRAY) { |
1521 | *out = std::make_shared<ChunkedArray>(result.make_array()); |
1522 | } else if (result.kind() == Datum::CHUNKED_ARRAY) { |
1523 | *out = result.chunked_array(); |
1524 | } else { |
1525 | DCHECK(false) << "Should be impossible" ; |
1526 | } |
1527 | return Status::OK(); |
1528 | } |
1529 | |
1530 | void PrimitiveImpl::NextRowGroup() { |
1531 | std::unique_ptr<PageReader> page_reader = input_->NextChunk(); |
1532 | record_reader_->SetPageReader(std::move(page_reader)); |
1533 | } |
1534 | |
1535 | Status PrimitiveImpl::GetDefLevels(const int16_t** data, size_t* length) { |
1536 | *data = record_reader_->def_levels(); |
1537 | *length = record_reader_->levels_written(); |
1538 | return Status::OK(); |
1539 | } |
1540 | |
1541 | Status PrimitiveImpl::GetRepLevels(const int16_t** data, size_t* length) { |
1542 | *data = record_reader_->rep_levels(); |
1543 | *length = record_reader_->levels_written(); |
1544 | return Status::OK(); |
1545 | } |
1546 | |
1547 | ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl) |
1548 | : impl_(std::move(impl)) {} |
1549 | |
1550 | ColumnReader::~ColumnReader() {} |
1551 | |
1552 | Status ColumnReader::NextBatch(int64_t records_to_read, |
1553 | std::shared_ptr<ChunkedArray>* out) { |
1554 | return impl_->NextBatch(records_to_read, out); |
1555 | } |
1556 | |
1557 | Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) { |
1558 | std::shared_ptr<ChunkedArray> chunked_out; |
1559 | RETURN_NOT_OK(impl_->NextBatch(records_to_read, &chunked_out)); |
1560 | return GetSingleChunk(*chunked_out, out); |
1561 | } |
1562 | |
1563 | // StructImpl methods |
1564 | |
1565 | Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out, |
1566 | int64_t* null_count_out) { |
1567 | std::shared_ptr<Buffer> null_bitmap; |
1568 | auto null_count = 0; |
1569 | const int16_t* def_levels_data; |
1570 | size_t def_levels_length; |
1571 | RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length)); |
1572 | RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap)); |
1573 | uint8_t* null_bitmap_ptr = null_bitmap->mutable_data(); |
1574 | for (size_t i = 0; i < def_levels_length; i++) { |
1575 | if (def_levels_data[i] < struct_def_level_) { |
1576 | // Mark null |
1577 | null_count += 1; |
1578 | } else { |
1579 | DCHECK_EQ(def_levels_data[i], struct_def_level_); |
1580 | ::arrow::BitUtil::SetBit(null_bitmap_ptr, i); |
1581 | } |
1582 | } |
1583 | |
1584 | *null_count_out = null_count; |
1585 | *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap; |
1586 | return Status::OK(); |
1587 | } |
1588 | |
1589 | // TODO(itaiin): Consider caching the results of this calculation - |
1590 | // note that this is only used once for each read for now |
1591 | Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) { |
1592 | *data = nullptr; |
1593 | if (children_.size() == 0) { |
1594 | // Empty struct |
1595 | *length = 0; |
1596 | return Status::OK(); |
1597 | } |
1598 | |
1599 | // We have at least one child |
1600 | const int16_t* child_def_levels; |
1601 | size_t child_length; |
1602 | RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length)); |
1603 | auto size = child_length * sizeof(int16_t); |
1604 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, size, &def_levels_buffer_)); |
1605 | // Initialize with the minimal def level |
1606 | std::memset(def_levels_buffer_->mutable_data(), -1, size); |
1607 | auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data()); |
1608 | |
1609 | // When a struct is defined, all of its children def levels are at least at |
1610 | // nesting level, and def level equals nesting level. |
1611 | // When a struct is not defined, all of its children def levels are less than |
1612 | // the nesting level, and the def level equals max(children def levels) |
1613 | // All other possibilities are malformed definition data. |
1614 | for (auto& child : children_) { |
1615 | size_t current_child_length; |
1616 | RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, ¤t_child_length)); |
1617 | DCHECK_EQ(child_length, current_child_length); |
1618 | for (size_t i = 0; i < child_length; i++) { |
1619 | // Check that value is either uninitialized, or current |
1620 | // and previous children def levels agree on the struct level |
1621 | DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) == |
1622 | (child_def_levels[i] >= struct_def_level_))); |
1623 | result_levels[i] = |
1624 | std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_)); |
1625 | } |
1626 | } |
1627 | *data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data()); |
1628 | *length = child_length; |
1629 | return Status::OK(); |
1630 | } |
1631 | |
1632 | void StructImpl::InitField( |
1633 | const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) { |
1634 | // Make a shallow node to field conversion from the children fields |
1635 | std::vector<std::shared_ptr<::arrow::Field>> fields(children.size()); |
1636 | for (size_t i = 0; i < children.size(); i++) { |
1637 | fields[i] = children[i]->field(); |
1638 | } |
1639 | auto type = ::arrow::struct_(fields); |
1640 | field_ = ::arrow::field(node->name(), type); |
1641 | } |
1642 | |
1643 | Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) { |
1644 | return Status::NotImplemented("GetRepLevels is not implemented for struct" ); |
1645 | } |
1646 | |
1647 | Status StructImpl::NextBatch(int64_t records_to_read, |
1648 | std::shared_ptr<ChunkedArray>* out) { |
1649 | std::vector<std::shared_ptr<Array>> children_arrays; |
1650 | std::shared_ptr<Buffer> null_bitmap; |
1651 | int64_t null_count; |
1652 | |
1653 | // Gather children arrays and def levels |
1654 | for (auto& child : children_) { |
1655 | std::shared_ptr<ChunkedArray> field; |
1656 | RETURN_NOT_OK(child->NextBatch(records_to_read, &field)); |
1657 | |
1658 | if (field->num_chunks() > 1) { |
1659 | return Status::Invalid("Chunked field reads not yet supported with StructArray" ); |
1660 | } |
1661 | children_arrays.push_back(field->chunk(0)); |
1662 | } |
1663 | |
1664 | RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count)); |
1665 | |
1666 | int64_t struct_length = children_arrays[0]->length(); |
1667 | for (size_t i = 1; i < children_arrays.size(); ++i) { |
1668 | if (children_arrays[i]->length() != struct_length) { |
1669 | // TODO(wesm): This should really only occur if the Parquet file is |
1670 | // malformed. Should this be a DCHECK? |
1671 | return Status::Invalid("Struct children had different lengths" ); |
1672 | } |
1673 | } |
1674 | |
1675 | auto result = std::make_shared<StructArray>(field()->type(), struct_length, |
1676 | children_arrays, null_bitmap, null_count); |
1677 | *out = std::make_shared<ChunkedArray>(result); |
1678 | return Status::OK(); |
1679 | } |
1680 | |
1681 | std::shared_ptr<ColumnChunkReader> RowGroupReader::Column(int column_index) { |
1682 | return std::shared_ptr<ColumnChunkReader>( |
1683 | new ColumnChunkReader(impl_, row_group_index_, column_index)); |
1684 | } |
1685 | |
1686 | Status RowGroupReader::ReadTable(const std::vector<int>& column_indices, |
1687 | std::shared_ptr<::arrow::Table>* out) { |
1688 | return impl_->ReadRowGroup(row_group_index_, column_indices, out); |
1689 | } |
1690 | |
1691 | Status RowGroupReader::ReadTable(std::shared_ptr<::arrow::Table>* out) { |
1692 | return impl_->ReadRowGroup(row_group_index_, out); |
1693 | } |
1694 | |
1695 | RowGroupReader::~RowGroupReader() {} |
1696 | |
1697 | RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index) |
1698 | : impl_(impl), row_group_index_(row_group_index) {} |
1699 | |
1700 | Status ColumnChunkReader::Read(std::shared_ptr<::arrow::ChunkedArray>* out) { |
1701 | return impl_->ReadColumnChunk(column_index_, row_group_index_, out); |
1702 | } |
1703 | |
1704 | Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) { |
1705 | std::shared_ptr<ChunkedArray> chunked_out; |
1706 | RETURN_NOT_OK(impl_->ReadColumnChunk(column_index_, row_group_index_, &chunked_out)); |
1707 | return GetSingleChunk(*chunked_out, out); |
1708 | } |
1709 | |
1710 | ColumnChunkReader::~ColumnChunkReader() {} |
1711 | |
1712 | ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int row_group_index, |
1713 | int column_index) |
1714 | : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {} |
1715 | |
1716 | } // namespace arrow |
1717 | } // namespace parquet |
1718 | |