1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/arrow/schema.h" |
19 | |
20 | #include <string> |
21 | #include <unordered_set> |
22 | #include <vector> |
23 | |
24 | #include "parquet/api/schema.h" |
25 | #include "parquet/util/schema-util.h" |
26 | |
27 | #include "arrow/api.h" |
28 | #include "arrow/util/logging.h" |
29 | |
30 | using arrow::Field; |
31 | using arrow::Status; |
32 | |
33 | using ArrowType = arrow::DataType; |
34 | using ArrowTypeId = arrow::Type; |
35 | |
36 | using parquet::Repetition; |
37 | using parquet::schema::GroupNode; |
38 | using parquet::schema::Node; |
39 | using parquet::schema::NodePtr; |
40 | using parquet::schema::PrimitiveNode; |
41 | |
42 | using ParquetType = parquet::Type; |
43 | using parquet::LogicalType; |
44 | |
45 | namespace parquet { |
46 | |
47 | namespace arrow { |
48 | |
49 | const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); |
50 | const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); |
51 | const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); |
52 | |
53 | std::shared_ptr<ArrowType> MakeDecimal128Type(const PrimitiveNode& node) { |
54 | const auto& metadata = node.decimal_metadata(); |
55 | return ::arrow::decimal(metadata.precision, metadata.scale); |
56 | } |
57 | |
58 | static Status FromByteArray(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) { |
59 | switch (node.logical_type()) { |
60 | case LogicalType::UTF8: |
61 | *out = ::arrow::utf8(); |
62 | break; |
63 | case LogicalType::DECIMAL: |
64 | *out = MakeDecimal128Type(node); |
65 | break; |
66 | default: |
67 | // BINARY |
68 | *out = ::arrow::binary(); |
69 | break; |
70 | } |
71 | return Status::OK(); |
72 | } |
73 | |
74 | static Status FromFLBA(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) { |
75 | switch (node.logical_type()) { |
76 | case LogicalType::NONE: |
77 | *out = ::arrow::fixed_size_binary(node.type_length()); |
78 | break; |
79 | case LogicalType::DECIMAL: |
80 | *out = MakeDecimal128Type(node); |
81 | break; |
82 | default: |
83 | return Status::NotImplemented("Unhandled logical type " , |
84 | LogicalTypeToString(node.logical_type()), |
85 | " for fixed-length binary array" ); |
86 | } |
87 | |
88 | return Status::OK(); |
89 | } |
90 | |
91 | static Status FromInt32(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) { |
92 | switch (node.logical_type()) { |
93 | case LogicalType::NONE: |
94 | *out = ::arrow::int32(); |
95 | break; |
96 | case LogicalType::UINT_8: |
97 | *out = ::arrow::uint8(); |
98 | break; |
99 | case LogicalType::INT_8: |
100 | *out = ::arrow::int8(); |
101 | break; |
102 | case LogicalType::UINT_16: |
103 | *out = ::arrow::uint16(); |
104 | break; |
105 | case LogicalType::INT_16: |
106 | *out = ::arrow::int16(); |
107 | break; |
108 | case LogicalType::INT_32: |
109 | *out = ::arrow::int32(); |
110 | break; |
111 | case LogicalType::UINT_32: |
112 | *out = ::arrow::uint32(); |
113 | break; |
114 | case LogicalType::DATE: |
115 | *out = ::arrow::date32(); |
116 | break; |
117 | case LogicalType::TIME_MILLIS: |
118 | *out = ::arrow::time32(::arrow::TimeUnit::MILLI); |
119 | break; |
120 | case LogicalType::DECIMAL: |
121 | *out = MakeDecimal128Type(node); |
122 | break; |
123 | default: |
124 | return Status::NotImplemented("Unhandled logical type " , |
125 | LogicalTypeToString(node.logical_type()), |
126 | " for INT32" ); |
127 | } |
128 | return Status::OK(); |
129 | } |
130 | |
131 | static Status FromInt64(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) { |
132 | switch (node.logical_type()) { |
133 | case LogicalType::NONE: |
134 | *out = ::arrow::int64(); |
135 | break; |
136 | case LogicalType::INT_64: |
137 | *out = ::arrow::int64(); |
138 | break; |
139 | case LogicalType::UINT_64: |
140 | *out = ::arrow::uint64(); |
141 | break; |
142 | case LogicalType::DECIMAL: |
143 | *out = MakeDecimal128Type(node); |
144 | break; |
145 | case LogicalType::TIMESTAMP_MILLIS: |
146 | *out = TIMESTAMP_MS; |
147 | break; |
148 | case LogicalType::TIMESTAMP_MICROS: |
149 | *out = TIMESTAMP_US; |
150 | break; |
151 | case LogicalType::TIME_MICROS: |
152 | *out = ::arrow::time64(::arrow::TimeUnit::MICRO); |
153 | break; |
154 | default: |
155 | return Status::NotImplemented("Unhandled logical type " , |
156 | LogicalTypeToString(node.logical_type()), |
157 | " for INT64" ); |
158 | } |
159 | return Status::OK(); |
160 | } |
161 | |
162 | Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr<ArrowType>* out) { |
163 | if (primitive.logical_type() == LogicalType::NA) { |
164 | *out = ::arrow::null(); |
165 | return Status::OK(); |
166 | } |
167 | |
168 | switch (primitive.physical_type()) { |
169 | case ParquetType::BOOLEAN: |
170 | *out = ::arrow::boolean(); |
171 | break; |
172 | case ParquetType::INT32: |
173 | RETURN_NOT_OK(FromInt32(primitive, out)); |
174 | break; |
175 | case ParquetType::INT64: |
176 | RETURN_NOT_OK(FromInt64(primitive, out)); |
177 | break; |
178 | case ParquetType::INT96: |
179 | *out = TIMESTAMP_NS; |
180 | break; |
181 | case ParquetType::FLOAT: |
182 | *out = ::arrow::float32(); |
183 | break; |
184 | case ParquetType::DOUBLE: |
185 | *out = ::arrow::float64(); |
186 | break; |
187 | case ParquetType::BYTE_ARRAY: |
188 | RETURN_NOT_OK(FromByteArray(primitive, out)); |
189 | break; |
190 | case ParquetType::FIXED_LEN_BYTE_ARRAY: |
191 | RETURN_NOT_OK(FromFLBA(primitive, out)); |
192 | break; |
193 | } |
194 | return Status::OK(); |
195 | } |
196 | |
197 | // Forward declaration |
198 | Status NodeToFieldInternal(const Node& node, |
199 | const std::unordered_set<const Node*>* included_leaf_nodes, |
200 | std::shared_ptr<Field>* out); |
201 | |
202 | /* |
203 | * Auxilary function to test if a parquet schema node is a leaf node |
204 | * that should be included in a resulting arrow schema |
205 | */ |
206 | inline bool IsIncludedLeaf(const Node& node, |
207 | const std::unordered_set<const Node*>* included_leaf_nodes) { |
208 | if (included_leaf_nodes == nullptr) { |
209 | return true; |
210 | } |
211 | auto search = included_leaf_nodes->find(&node); |
212 | return (search != included_leaf_nodes->end()); |
213 | } |
214 | |
215 | Status StructFromGroup(const GroupNode& group, |
216 | const std::unordered_set<const Node*>* included_leaf_nodes, |
217 | std::shared_ptr<ArrowType>* out) { |
218 | std::vector<std::shared_ptr<Field>> fields; |
219 | std::shared_ptr<Field> field; |
220 | |
221 | *out = nullptr; |
222 | |
223 | for (int i = 0; i < group.field_count(); i++) { |
224 | RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field)); |
225 | if (field != nullptr) { |
226 | fields.push_back(field); |
227 | } |
228 | } |
229 | if (fields.size() > 0) { |
230 | *out = std::make_shared<::arrow::StructType>(fields); |
231 | } |
232 | return Status::OK(); |
233 | } |
234 | |
235 | Status NodeToList(const GroupNode& group, |
236 | const std::unordered_set<const Node*>* included_leaf_nodes, |
237 | std::shared_ptr<ArrowType>* out) { |
238 | *out = nullptr; |
239 | if (group.field_count() == 1) { |
240 | // This attempts to resolve the preferred 3-level list encoding. |
241 | const Node& list_node = *group.field(0); |
242 | if (list_node.is_group() && list_node.is_repeated()) { |
243 | const auto& list_group = static_cast<const GroupNode&>(list_node); |
244 | // Special case mentioned in the format spec: |
245 | // If the name is array or ends in _tuple, this should be a list of struct |
246 | // even for single child elements. |
247 | if (list_group.field_count() == 1 && !HasStructListName(list_group)) { |
248 | // List of primitive type |
249 | std::shared_ptr<Field> item_field; |
250 | RETURN_NOT_OK( |
251 | NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field)); |
252 | |
253 | if (item_field != nullptr) { |
254 | *out = ::arrow::list(item_field); |
255 | } |
256 | } else { |
257 | // List of struct |
258 | std::shared_ptr<ArrowType> inner_type; |
259 | RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type)); |
260 | if (inner_type != nullptr) { |
261 | auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false); |
262 | *out = ::arrow::list(item_field); |
263 | } |
264 | } |
265 | } else if (list_node.is_repeated()) { |
266 | // repeated primitive node |
267 | std::shared_ptr<ArrowType> inner_type; |
268 | if (IsIncludedLeaf(static_cast<const Node&>(list_node), included_leaf_nodes)) { |
269 | RETURN_NOT_OK( |
270 | FromPrimitive(static_cast<const PrimitiveNode&>(list_node), &inner_type)); |
271 | auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false); |
272 | *out = ::arrow::list(item_field); |
273 | } |
274 | } else { |
275 | return Status::NotImplemented( |
276 | "Non-repeated groups in a LIST-annotated group are not supported." ); |
277 | } |
278 | } else { |
279 | return Status::NotImplemented( |
280 | "Only LIST-annotated groups with a single child can be handled." ); |
281 | } |
282 | return Status::OK(); |
283 | } |
284 | |
285 | Status NodeToField(const Node& node, std::shared_ptr<Field>* out) { |
286 | return NodeToFieldInternal(node, nullptr, out); |
287 | } |
288 | |
289 | Status NodeToFieldInternal(const Node& node, |
290 | const std::unordered_set<const Node*>* included_leaf_nodes, |
291 | std::shared_ptr<Field>* out) { |
292 | std::shared_ptr<ArrowType> type = nullptr; |
293 | bool nullable = !node.is_required(); |
294 | |
295 | *out = nullptr; |
296 | |
297 | if (node.is_repeated()) { |
298 | // 1-level LIST encoding fields are required |
299 | std::shared_ptr<ArrowType> inner_type; |
300 | if (node.is_group()) { |
301 | RETURN_NOT_OK(StructFromGroup(static_cast<const GroupNode&>(node), |
302 | included_leaf_nodes, &inner_type)); |
303 | } else if (IsIncludedLeaf(node, included_leaf_nodes)) { |
304 | RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &inner_type)); |
305 | } |
306 | if (inner_type != nullptr) { |
307 | auto item_field = std::make_shared<Field>(node.name(), inner_type, false); |
308 | type = ::arrow::list(item_field); |
309 | nullable = false; |
310 | } |
311 | } else if (node.is_group()) { |
312 | const auto& group = static_cast<const GroupNode&>(node); |
313 | if (node.logical_type() == LogicalType::LIST) { |
314 | RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type)); |
315 | } else { |
316 | RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type)); |
317 | } |
318 | } else { |
319 | // Primitive (leaf) node |
320 | if (IsIncludedLeaf(node, included_leaf_nodes)) { |
321 | RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &type)); |
322 | } |
323 | } |
324 | if (type != nullptr) { |
325 | *out = std::make_shared<Field>(node.name(), type, nullable); |
326 | } |
327 | return Status::OK(); |
328 | } |
329 | |
330 | Status FromParquetSchema( |
331 | const SchemaDescriptor* parquet_schema, |
332 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata, |
333 | std::shared_ptr<::arrow::Schema>* out) { |
334 | const GroupNode& schema_node = *parquet_schema->group_node(); |
335 | |
336 | int num_fields = static_cast<int>(schema_node.field_count()); |
337 | std::vector<std::shared_ptr<Field>> fields(num_fields); |
338 | for (int i = 0; i < num_fields; i++) { |
339 | RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i])); |
340 | } |
341 | |
342 | *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata); |
343 | return Status::OK(); |
344 | } |
345 | |
346 | Status FromParquetSchema( |
347 | const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices, |
348 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata, |
349 | std::shared_ptr<::arrow::Schema>* out) { |
350 | // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes |
351 | // from the root Parquet node |
352 | |
353 | // Put the right leaf nodes in an unordered set |
354 | // Index in column_indices should be unique, duplicate indices are merged into one and |
355 | // ordering by its first appearing. |
356 | int num_columns = static_cast<int>(column_indices.size()); |
357 | std::unordered_set<const Node*> top_nodes; // to deduplicate the top nodes |
358 | std::vector<const Node*> base_nodes; // to keep the ordering |
359 | std::unordered_set<const Node*> included_leaf_nodes(num_columns); |
360 | for (int i = 0; i < num_columns; i++) { |
361 | const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]); |
362 | included_leaf_nodes.insert(column_desc->schema_node().get()); |
363 | const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]); |
364 | auto insertion = top_nodes.insert(column_root); |
365 | if (insertion.second) { |
366 | base_nodes.push_back(column_root); |
367 | } |
368 | } |
369 | |
370 | std::vector<std::shared_ptr<Field>> fields; |
371 | std::shared_ptr<Field> field; |
372 | for (auto node : base_nodes) { |
373 | RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field)); |
374 | if (field != nullptr) { |
375 | fields.push_back(field); |
376 | } |
377 | } |
378 | |
379 | *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata); |
380 | return Status::OK(); |
381 | } |
382 | |
383 | Status FromParquetSchema(const SchemaDescriptor* parquet_schema, |
384 | const std::vector<int>& column_indices, |
385 | std::shared_ptr<::arrow::Schema>* out) { |
386 | return FromParquetSchema(parquet_schema, column_indices, nullptr, out); |
387 | } |
388 | |
389 | Status FromParquetSchema(const SchemaDescriptor* parquet_schema, |
390 | std::shared_ptr<::arrow::Schema>* out) { |
391 | return FromParquetSchema(parquet_schema, nullptr, out); |
392 | } |
393 | |
394 | Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name, |
395 | bool nullable, const WriterProperties& properties, |
396 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { |
397 | Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; |
398 | |
399 | NodePtr element; |
400 | RETURN_NOT_OK(FieldToNode(type->value_field(), properties, arrow_properties, &element)); |
401 | |
402 | NodePtr list = GroupNode::Make("list" , Repetition::REPEATED, {element}); |
403 | *out = GroupNode::Make(name, repetition, {list}, LogicalType::LIST); |
404 | return Status::OK(); |
405 | } |
406 | |
407 | Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, |
408 | const std::string& name, bool nullable, |
409 | const WriterProperties& properties, |
410 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { |
411 | Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; |
412 | |
413 | std::vector<NodePtr> children(type->num_children()); |
414 | for (int i = 0; i < type->num_children(); i++) { |
415 | RETURN_NOT_OK( |
416 | FieldToNode(type->child(i), properties, arrow_properties, &children[i])); |
417 | } |
418 | |
419 | *out = GroupNode::Make(name, repetition, children); |
420 | return Status::OK(); |
421 | } |
422 | |
423 | static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) { |
424 | switch (time_unit) { |
425 | case ::arrow::TimeUnit::MILLI: |
426 | return LogicalType::TIMESTAMP_MILLIS; |
427 | case ::arrow::TimeUnit::MICRO: |
428 | return LogicalType::TIMESTAMP_MICROS; |
429 | case ::arrow::TimeUnit::SECOND: |
430 | case ::arrow::TimeUnit::NANO: |
431 | // No equivalent parquet logical type. |
432 | break; |
433 | } |
434 | |
435 | return LogicalType::NONE; |
436 | } |
437 | |
438 | static Status GetTimestampMetadata(const ::arrow::TimestampType& type, |
439 | const ArrowWriterProperties& properties, |
440 | ParquetType::type* physical_type, |
441 | LogicalType::type* logical_type) { |
442 | const bool coerce = properties.coerce_timestamps_enabled(); |
443 | const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit(); |
444 | |
445 | // The user is explicitly asking for Impala int96 encoding, there is no |
446 | // logical type. |
447 | if (properties.support_deprecated_int96_timestamps()) { |
448 | *physical_type = ParquetType::INT96; |
449 | return Status::OK(); |
450 | } |
451 | |
452 | *physical_type = ParquetType::INT64; |
453 | *logical_type = LogicalTypeFromArrowTimeUnit(unit); |
454 | |
455 | // The user is requesting that all timestamp columns are casted to a specific |
456 | // type. Only 2 TimeUnit are supported by arrow-parquet. |
457 | if (coerce) { |
458 | switch (unit) { |
459 | case ::arrow::TimeUnit::MILLI: |
460 | case ::arrow::TimeUnit::MICRO: |
461 | break; |
462 | case ::arrow::TimeUnit::NANO: |
463 | case ::arrow::TimeUnit::SECOND: |
464 | return Status::NotImplemented( |
465 | "Can only coerce Arrow timestamps to milliseconds" |
466 | " or microseconds" ); |
467 | } |
468 | |
469 | return Status::OK(); |
470 | } |
471 | |
472 | // Until ARROW-3729 is resolved, nanoseconds are explicitly converted to |
473 | // int64 microseconds when deprecated int96 is not requested. |
474 | if (type.unit() == ::arrow::TimeUnit::NANO) |
475 | *logical_type = LogicalType::TIMESTAMP_MICROS; |
476 | else if (type.unit() == ::arrow::TimeUnit::SECOND) |
477 | return Status::NotImplemented( |
478 | "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with " |
479 | "Parquet." ); |
480 | |
481 | return Status::OK(); |
482 | } // namespace arrow |
483 | |
484 | Status FieldToNode(const std::shared_ptr<Field>& field, |
485 | const WriterProperties& properties, |
486 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { |
487 | LogicalType::type logical_type = LogicalType::NONE; |
488 | ParquetType::type type; |
489 | Repetition::type repetition = |
490 | field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED; |
491 | |
492 | int length = -1; |
493 | int precision = -1; |
494 | int scale = -1; |
495 | |
496 | switch (field->type()->id()) { |
497 | case ArrowTypeId::NA: |
498 | type = ParquetType::INT32; |
499 | logical_type = LogicalType::NA; |
500 | break; |
501 | case ArrowTypeId::BOOL: |
502 | type = ParquetType::BOOLEAN; |
503 | break; |
504 | case ArrowTypeId::UINT8: |
505 | type = ParquetType::INT32; |
506 | logical_type = LogicalType::UINT_8; |
507 | break; |
508 | case ArrowTypeId::INT8: |
509 | type = ParquetType::INT32; |
510 | logical_type = LogicalType::INT_8; |
511 | break; |
512 | case ArrowTypeId::UINT16: |
513 | type = ParquetType::INT32; |
514 | logical_type = LogicalType::UINT_16; |
515 | break; |
516 | case ArrowTypeId::INT16: |
517 | type = ParquetType::INT32; |
518 | logical_type = LogicalType::INT_16; |
519 | break; |
520 | case ArrowTypeId::UINT32: |
521 | if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { |
522 | type = ParquetType::INT64; |
523 | } else { |
524 | type = ParquetType::INT32; |
525 | logical_type = LogicalType::UINT_32; |
526 | } |
527 | break; |
528 | case ArrowTypeId::INT32: |
529 | type = ParquetType::INT32; |
530 | break; |
531 | case ArrowTypeId::UINT64: |
532 | type = ParquetType::INT64; |
533 | logical_type = LogicalType::UINT_64; |
534 | break; |
535 | case ArrowTypeId::INT64: |
536 | type = ParquetType::INT64; |
537 | break; |
538 | case ArrowTypeId::FLOAT: |
539 | type = ParquetType::FLOAT; |
540 | break; |
541 | case ArrowTypeId::DOUBLE: |
542 | type = ParquetType::DOUBLE; |
543 | break; |
544 | case ArrowTypeId::STRING: |
545 | type = ParquetType::BYTE_ARRAY; |
546 | logical_type = LogicalType::UTF8; |
547 | break; |
548 | case ArrowTypeId::BINARY: |
549 | type = ParquetType::BYTE_ARRAY; |
550 | break; |
551 | case ArrowTypeId::FIXED_SIZE_BINARY: { |
552 | type = ParquetType::FIXED_LEN_BYTE_ARRAY; |
553 | const auto& fixed_size_binary_type = |
554 | static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type()); |
555 | length = fixed_size_binary_type.byte_width(); |
556 | } break; |
557 | case ArrowTypeId::DECIMAL: { |
558 | type = ParquetType::FIXED_LEN_BYTE_ARRAY; |
559 | logical_type = LogicalType::DECIMAL; |
560 | const auto& decimal_type = |
561 | static_cast<const ::arrow::Decimal128Type&>(*field->type()); |
562 | precision = decimal_type.precision(); |
563 | scale = decimal_type.scale(); |
564 | length = DecimalSize(precision); |
565 | } break; |
566 | case ArrowTypeId::DATE32: |
567 | type = ParquetType::INT32; |
568 | logical_type = LogicalType::DATE; |
569 | break; |
570 | case ArrowTypeId::DATE64: |
571 | type = ParquetType::INT32; |
572 | logical_type = LogicalType::DATE; |
573 | break; |
574 | case ArrowTypeId::TIMESTAMP: |
575 | RETURN_NOT_OK( |
576 | GetTimestampMetadata(static_cast<::arrow::TimestampType&>(*field->type()), |
577 | arrow_properties, &type, &logical_type)); |
578 | break; |
579 | case ArrowTypeId::TIME32: |
580 | type = ParquetType::INT32; |
581 | logical_type = LogicalType::TIME_MILLIS; |
582 | break; |
583 | case ArrowTypeId::TIME64: { |
584 | auto time_type = static_cast<::arrow::Time64Type*>(field->type().get()); |
585 | if (time_type->unit() == ::arrow::TimeUnit::NANO) { |
586 | return Status::NotImplemented("Nanosecond time not supported in Parquet." ); |
587 | } |
588 | type = ParquetType::INT64; |
589 | logical_type = LogicalType::TIME_MICROS; |
590 | } break; |
591 | case ArrowTypeId::STRUCT: { |
592 | auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type()); |
593 | return StructToNode(struct_type, field->name(), field->nullable(), properties, |
594 | arrow_properties, out); |
595 | } |
596 | case ArrowTypeId::LIST: { |
597 | auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type()); |
598 | return ListToNode(list_type, field->name(), field->nullable(), properties, |
599 | arrow_properties, out); |
600 | } |
601 | case ArrowTypeId::DICTIONARY: { |
602 | // Parquet has no Dictionary type, dictionary-encoded is handled on |
603 | // the encoding, not the schema level. |
604 | const ::arrow::DictionaryType& dict_type = |
605 | static_cast<const ::arrow::DictionaryType&>(*field->type()); |
606 | std::shared_ptr<::arrow::Field> unpacked_field = |
607 | ::arrow::field(field->name(), dict_type.dictionary()->type(), field->nullable(), |
608 | field->metadata()); |
609 | return FieldToNode(unpacked_field, properties, arrow_properties, out); |
610 | } |
611 | default: { |
612 | // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR |
613 | return Status::NotImplemented( |
614 | "Unhandled type for Arrow to Parquet schema conversion: " , |
615 | field->type()->ToString()); |
616 | } |
617 | } |
618 | PARQUET_CATCH_NOT_OK(*out = |
619 | PrimitiveNode::Make(field->name(), repetition, type, |
620 | logical_type, length, precision, scale)); |
621 | return Status::OK(); |
622 | } |
623 | |
624 | Status ToParquetSchema(const ::arrow::Schema* arrow_schema, |
625 | const WriterProperties& properties, |
626 | const ArrowWriterProperties& arrow_properties, |
627 | std::shared_ptr<SchemaDescriptor>* out) { |
628 | std::vector<NodePtr> nodes(arrow_schema->num_fields()); |
629 | for (int i = 0; i < arrow_schema->num_fields(); i++) { |
630 | RETURN_NOT_OK( |
631 | FieldToNode(arrow_schema->field(i), properties, arrow_properties, &nodes[i])); |
632 | } |
633 | |
634 | NodePtr schema = GroupNode::Make("schema" , Repetition::REQUIRED, nodes); |
635 | *out = std::make_shared<::parquet::SchemaDescriptor>(); |
636 | PARQUET_CATCH_NOT_OK((*out)->Init(schema)); |
637 | |
638 | return Status::OK(); |
639 | } |
640 | |
641 | Status ToParquetSchema(const ::arrow::Schema* arrow_schema, |
642 | const WriterProperties& properties, |
643 | std::shared_ptr<SchemaDescriptor>* out) { |
644 | return ToParquetSchema(arrow_schema, properties, *default_arrow_writer_properties(), |
645 | out); |
646 | } |
647 | |
648 | /// \brief Compute the number of bytes required to represent a decimal of a |
649 | /// given precision. Taken from the Apache Impala codebase. The comments next |
650 | /// to the return values are the maximum value that can be represented in 2's |
651 | /// complement with the returned number of bytes. |
652 | int32_t DecimalSize(int32_t precision) { |
653 | DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " |
654 | << precision; |
655 | DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got " |
656 | << precision; |
657 | |
658 | switch (precision) { |
659 | case 1: |
660 | case 2: |
661 | return 1; // 127 |
662 | case 3: |
663 | case 4: |
664 | return 2; // 32,767 |
665 | case 5: |
666 | case 6: |
667 | return 3; // 8,388,607 |
668 | case 7: |
669 | case 8: |
670 | case 9: |
671 | return 4; // 2,147,483,427 |
672 | case 10: |
673 | case 11: |
674 | return 5; // 549,755,813,887 |
675 | case 12: |
676 | case 13: |
677 | case 14: |
678 | return 6; // 140,737,488,355,327 |
679 | case 15: |
680 | case 16: |
681 | return 7; // 36,028,797,018,963,967 |
682 | case 17: |
683 | case 18: |
684 | return 8; // 9,223,372,036,854,775,807 |
685 | case 19: |
686 | case 20: |
687 | case 21: |
688 | return 9; // 2,361,183,241,434,822,606,847 |
689 | case 22: |
690 | case 23: |
691 | return 10; // 604,462,909,807,314,587,353,087 |
692 | case 24: |
693 | case 25: |
694 | case 26: |
695 | return 11; // 154,742,504,910,672,534,362,390,527 |
696 | case 27: |
697 | case 28: |
698 | return 12; // 39,614,081,257,132,168,796,771,975,167 |
699 | case 29: |
700 | case 30: |
701 | case 31: |
702 | return 13; // 10,141,204,801,825,835,211,973,625,643,007 |
703 | case 32: |
704 | case 33: |
705 | return 14; // 2,596,148,429,267,413,814,265,248,164,610,047 |
706 | case 34: |
707 | case 35: |
708 | return 15; // 664,613,997,892,457,936,451,903,530,140,172,287 |
709 | case 36: |
710 | case 37: |
711 | case 38: |
712 | return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727 |
713 | default: |
714 | break; |
715 | } |
716 | DCHECK(false); |
717 | return -1; |
718 | } // namespace arrow |
719 | |
720 | } // namespace arrow |
721 | } // namespace parquet |
722 | |