1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/schema.h"
19
20#include <algorithm>
21#include <cstring>
22#include <memory>
23#include <string>
24#include <type_traits>
25#include <utility>
26
27#include "arrow/util/logging.h"
28
29#include "parquet/exception.h"
30#include "parquet/schema_internal.h"
31#include "parquet/thrift_internal.h"
32
33using parquet::format::SchemaElement;
34
35namespace parquet {
36
37namespace schema {
38
39// ----------------------------------------------------------------------
40// ColumnPath
41
42std::shared_ptr<ColumnPath> ColumnPath::FromDotString(const std::string& dotstring) {
43 std::stringstream ss(dotstring);
44 std::string item;
45 std::vector<std::string> path;
46 while (std::getline(ss, item, '.')) {
47 path.push_back(item);
48 }
49 return std::shared_ptr<ColumnPath>(new ColumnPath(std::move(path)));
50}
51
52std::shared_ptr<ColumnPath> ColumnPath::FromNode(const Node& node) {
53 // Build the path in reverse order as we traverse the nodes to the top
54 std::vector<std::string> rpath_;
55 const Node* cursor = &node;
56 // The schema node is not part of the ColumnPath
57 while (cursor->parent()) {
58 rpath_.push_back(cursor->name());
59 cursor = cursor->parent();
60 }
61
62 // Build ColumnPath in correct order
63 std::vector<std::string> path(rpath_.crbegin(), rpath_.crend());
64 return std::make_shared<ColumnPath>(std::move(path));
65}
66
67std::shared_ptr<ColumnPath> ColumnPath::extend(const std::string& node_name) const {
68 std::vector<std::string> path;
69 path.reserve(path_.size() + 1);
70 path.resize(path_.size() + 1);
71 std::copy(path_.cbegin(), path_.cend(), path.begin());
72 path[path_.size()] = node_name;
73
74 return std::shared_ptr<ColumnPath>(new ColumnPath(std::move(path)));
75}
76
77std::string ColumnPath::ToDotString() const {
78 std::stringstream ss;
79 for (auto it = path_.cbegin(); it != path_.cend(); ++it) {
80 if (it != path_.cbegin()) {
81 ss << ".";
82 }
83 ss << *it;
84 }
85 return ss.str();
86}
87
88const std::vector<std::string>& ColumnPath::ToDotVector() const { return path_; }
89
90// ----------------------------------------------------------------------
91// Base node
92
93const std::shared_ptr<ColumnPath> Node::path() const {
94 // TODO(itaiin): Cache the result, or more precisely, cache ->ToDotString()
95 // since it is being used to access the leaf nodes
96 return ColumnPath::FromNode(*this);
97}
98
99bool Node::EqualsInternal(const Node* other) const {
100 return type_ == other->type_ && name_ == other->name_ &&
101 repetition_ == other->repetition_ && converted_type_ == other->converted_type_ &&
102 logical_type_->Equals(*(other->logical_type()));
103}
104
105void Node::SetParent(const Node* parent) { parent_ = parent; }
106
107// ----------------------------------------------------------------------
108// Primitive node
109
110PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetition,
111 Type::type type, ConvertedType::type converted_type,
112 int length, int precision, int scale, int id)
113 : Node(Node::PRIMITIVE, name, repetition, converted_type, id),
114 physical_type_(type),
115 type_length_(length) {
116 std::stringstream ss;
117
118 // PARQUET-842: In an earlier revision, decimal_metadata_.isset was being
119 // set to true, but Impala will raise an incompatible metadata in such cases
120 memset(&decimal_metadata_, 0, sizeof(decimal_metadata_));
121
122 // Check if the physical and logical types match
123 // Mapping referred from Apache parquet-mr as on 2016-02-22
124 switch (converted_type) {
125 case ConvertedType::NONE:
126 // Logical type not set
127 break;
128 case ConvertedType::UTF8:
129 case ConvertedType::JSON:
130 case ConvertedType::BSON:
131 if (type != Type::BYTE_ARRAY) {
132 ss << ConvertedTypeToString(converted_type);
133 ss << " can only annotate BYTE_ARRAY fields";
134 throw ParquetException(ss.str());
135 }
136 break;
137 case ConvertedType::DECIMAL:
138 if ((type != Type::INT32) && (type != Type::INT64) && (type != Type::BYTE_ARRAY) &&
139 (type != Type::FIXED_LEN_BYTE_ARRAY)) {
140 ss << "DECIMAL can only annotate INT32, INT64, BYTE_ARRAY, and FIXED";
141 throw ParquetException(ss.str());
142 }
143 if (precision <= 0) {
144 ss << "Invalid DECIMAL precision: " << precision
145 << ". Precision must be a number between 1 and 38 inclusive";
146 throw ParquetException(ss.str());
147 }
148 if (scale < 0) {
149 ss << "Invalid DECIMAL scale: " << scale
150 << ". Scale must be a number between 0 and precision inclusive";
151 throw ParquetException(ss.str());
152 }
153 if (scale > precision) {
154 ss << "Invalid DECIMAL scale " << scale;
155 ss << " cannot be greater than precision " << precision;
156 throw ParquetException(ss.str());
157 }
158 decimal_metadata_.isset = true;
159 decimal_metadata_.precision = precision;
160 decimal_metadata_.scale = scale;
161 break;
162 case ConvertedType::DATE:
163 case ConvertedType::TIME_MILLIS:
164 case ConvertedType::UINT_8:
165 case ConvertedType::UINT_16:
166 case ConvertedType::UINT_32:
167 case ConvertedType::INT_8:
168 case ConvertedType::INT_16:
169 case ConvertedType::INT_32:
170 if (type != Type::INT32) {
171 ss << ConvertedTypeToString(converted_type);
172 ss << " can only annotate INT32";
173 throw ParquetException(ss.str());
174 }
175 break;
176 case ConvertedType::TIME_MICROS:
177 case ConvertedType::TIMESTAMP_MILLIS:
178 case ConvertedType::TIMESTAMP_MICROS:
179 case ConvertedType::UINT_64:
180 case ConvertedType::INT_64:
181 if (type != Type::INT64) {
182 ss << ConvertedTypeToString(converted_type);
183 ss << " can only annotate INT64";
184 throw ParquetException(ss.str());
185 }
186 break;
187 case ConvertedType::INTERVAL:
188 if ((type != Type::FIXED_LEN_BYTE_ARRAY) || (length != 12)) {
189 ss << "INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)";
190 throw ParquetException(ss.str());
191 }
192 break;
193 case ConvertedType::ENUM:
194 if (type != Type::BYTE_ARRAY) {
195 ss << "ENUM can only annotate BYTE_ARRAY fields";
196 throw ParquetException(ss.str());
197 }
198 break;
199 case ConvertedType::NA:
200 // NA can annotate any type
201 break;
202 default:
203 ss << ConvertedTypeToString(converted_type);
204 ss << " can not be applied to a primitive type";
205 throw ParquetException(ss.str());
206 }
207 // For forward compatibility, create an equivalent logical type
208 logical_type_ = LogicalType::FromConvertedType(converted_type_, decimal_metadata_);
209 DCHECK(logical_type_ && !logical_type_->is_nested() &&
210 logical_type_->is_compatible(converted_type_, decimal_metadata_));
211
212 if (type == Type::FIXED_LEN_BYTE_ARRAY) {
213 if (length <= 0) {
214 ss << "Invalid FIXED_LEN_BYTE_ARRAY length: " << length;
215 throw ParquetException(ss.str());
216 }
217 type_length_ = length;
218 }
219}
220
221PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetition,
222 std::shared_ptr<const LogicalType> logical_type,
223 Type::type physical_type, int physical_length, int id)
224 : Node(Node::PRIMITIVE, name, repetition, logical_type, id),
225 physical_type_(physical_type),
226 type_length_(physical_length) {
227 std::stringstream error;
228 if (logical_type_) {
229 // Check for logical type <=> node type consistency
230 if (!logical_type_->is_nested()) {
231 // Check for logical type <=> physical type consistency
232 if (logical_type_->is_applicable(physical_type, physical_length)) {
233 // For backward compatibility, assign equivalent legacy
234 // converted type (if possible)
235 converted_type_ = logical_type_->ToConvertedType(&decimal_metadata_);
236 } else {
237 error << logical_type_->ToString();
238 error << " can not be applied to primitive type ";
239 error << TypeToString(physical_type);
240 throw ParquetException(error.str());
241 }
242 } else {
243 error << "Nested logical type ";
244 error << logical_type_->ToString();
245 error << " can not be applied to non-group node";
246 throw ParquetException(error.str());
247 }
248 } else {
249 logical_type_ = NoLogicalType::Make();
250 converted_type_ = logical_type_->ToConvertedType(&decimal_metadata_);
251 }
252 DCHECK(logical_type_ && !logical_type_->is_nested() &&
253 logical_type_->is_compatible(converted_type_, decimal_metadata_));
254
255 if (physical_type == Type::FIXED_LEN_BYTE_ARRAY) {
256 if (physical_length <= 0) {
257 error << "Invalid FIXED_LEN_BYTE_ARRAY length: " << physical_length;
258 throw ParquetException(error.str());
259 }
260 }
261}
262
263bool PrimitiveNode::EqualsInternal(const PrimitiveNode* other) const {
264 bool is_equal = true;
265 if (physical_type_ != other->physical_type_) {
266 return false;
267 }
268 if (converted_type_ == ConvertedType::DECIMAL) {
269 is_equal &= (decimal_metadata_.precision == other->decimal_metadata_.precision) &&
270 (decimal_metadata_.scale == other->decimal_metadata_.scale);
271 }
272 if (physical_type_ == Type::FIXED_LEN_BYTE_ARRAY) {
273 is_equal &= (type_length_ == other->type_length_);
274 }
275 return is_equal;
276}
277
278bool PrimitiveNode::Equals(const Node* other) const {
279 if (!Node::EqualsInternal(other)) {
280 return false;
281 }
282 return EqualsInternal(static_cast<const PrimitiveNode*>(other));
283}
284
285void PrimitiveNode::Visit(Node::Visitor* visitor) { visitor->Visit(this); }
286
287void PrimitiveNode::VisitConst(Node::ConstVisitor* visitor) const {
288 visitor->Visit(this);
289}
290
291// ----------------------------------------------------------------------
292// Group node
293
294GroupNode::GroupNode(const std::string& name, Repetition::type repetition,
295 const NodeVector& fields, ConvertedType::type converted_type, int id)
296 : Node(Node::GROUP, name, repetition, converted_type, id), fields_(fields) {
297 // For forward compatibility, create an equivalent logical type
298 logical_type_ = LogicalType::FromConvertedType(converted_type_);
299 DCHECK(logical_type_ && (logical_type_->is_nested() || logical_type_->is_none()) &&
300 logical_type_->is_compatible(converted_type_));
301
302 field_name_to_idx_.clear();
303 auto field_idx = 0;
304 for (NodePtr& field : fields_) {
305 field->SetParent(this);
306 field_name_to_idx_.emplace(field->name(), field_idx++);
307 }
308}
309
310GroupNode::GroupNode(const std::string& name, Repetition::type repetition,
311 const NodeVector& fields,
312 std::shared_ptr<const LogicalType> logical_type, int id)
313 : Node(Node::GROUP, name, repetition, logical_type, id), fields_(fields) {
314 if (logical_type_) {
315 // Check for logical type <=> node type consistency
316 if (logical_type_->is_nested()) {
317 // For backward compatibility, assign equivalent legacy converted type (if possible)
318 converted_type_ = logical_type_->ToConvertedType(nullptr);
319 } else {
320 std::stringstream error;
321 error << "Logical type ";
322 error << logical_type_->ToString();
323 error << " can not be applied to group node";
324 throw ParquetException(error.str());
325 }
326 } else {
327 logical_type_ = NoLogicalType::Make();
328 converted_type_ = logical_type_->ToConvertedType(nullptr);
329 }
330 DCHECK(logical_type_ && (logical_type_->is_nested() || logical_type_->is_none()) &&
331 logical_type_->is_compatible(converted_type_));
332
333 field_name_to_idx_.clear();
334 auto field_idx = 0;
335 for (NodePtr& field : fields_) {
336 field->SetParent(this);
337 field_name_to_idx_.emplace(field->name(), field_idx++);
338 }
339}
340
341bool GroupNode::EqualsInternal(const GroupNode* other) const {
342 if (this == other) {
343 return true;
344 }
345 if (this->field_count() != other->field_count()) {
346 return false;
347 }
348 for (int i = 0; i < this->field_count(); ++i) {
349 if (!this->field(i)->Equals(other->field(i).get())) {
350 return false;
351 }
352 }
353 return true;
354}
355
356bool GroupNode::Equals(const Node* other) const {
357 if (!Node::EqualsInternal(other)) {
358 return false;
359 }
360 return EqualsInternal(static_cast<const GroupNode*>(other));
361}
362
363int GroupNode::FieldIndex(const std::string& name) const {
364 auto search = field_name_to_idx_.find(name);
365 if (search == field_name_to_idx_.end()) {
366 // Not found
367 return -1;
368 }
369 return search->second;
370}
371
372int GroupNode::FieldIndex(const Node& node) const {
373 auto search = field_name_to_idx_.equal_range(node.name());
374 for (auto it = search.first; it != search.second; ++it) {
375 const int idx = it->second;
376 if (&node == field(idx).get()) {
377 return idx;
378 }
379 }
380 return -1;
381}
382
383void GroupNode::Visit(Node::Visitor* visitor) { visitor->Visit(this); }
384
385void GroupNode::VisitConst(Node::ConstVisitor* visitor) const { visitor->Visit(this); }
386
387// ----------------------------------------------------------------------
388// Node construction from Parquet metadata
389
390std::unique_ptr<Node> GroupNode::FromParquet(const void* opaque_element, int node_id,
391 const NodeVector& fields) {
392 const format::SchemaElement* element =
393 static_cast<const format::SchemaElement*>(opaque_element);
394
395 std::unique_ptr<GroupNode> group_node;
396 if (element->__isset.logicalType) {
397 // updated writer with logical type present
398 group_node = std::unique_ptr<GroupNode>(
399 new GroupNode(element->name, FromThrift(element->repetition_type), fields,
400 LogicalType::FromThrift(element->logicalType), node_id));
401 } else {
402 group_node = std::unique_ptr<GroupNode>(new GroupNode(
403 element->name, FromThrift(element->repetition_type), fields,
404 (element->__isset.converted_type ? FromThrift(element->converted_type)
405 : ConvertedType::NONE),
406 node_id));
407 }
408
409 return std::unique_ptr<Node>(group_node.release());
410}
411
412namespace {
413
414// If the parquet file is corrupted it is possible the type value decoded
415// will not be in the range of format::Type::type, which is undefined behavior.
416// This method prevents this by loading the value as the underlying type and checking
417// to make sure it is in range.
418template <typename ApiType>
419struct SafeLoader {
420 using ApiTypeEnum = typename ApiType::type;
421 using ApiTypeRawEnum = typename std::underlying_type<ApiTypeEnum>::type;
422
423 template <typename ThriftType>
424 inline static ApiTypeRawEnum LoadRaw(ThriftType* in) {
425 static_assert(
426 sizeof(ApiTypeEnum) >= sizeof(ThriftType),
427 "parquet type should always be the same size of larger then thrift type");
428 typename std::underlying_type<ThriftType>::type raw_value;
429 memcpy(&raw_value, in, sizeof(ThriftType));
430 return static_cast<ApiTypeRawEnum>(raw_value);
431 }
432
433 template <typename ThriftType, bool IsUnsigned = true>
434 inline static ApiTypeEnum LoadChecked(
435 typename std::enable_if<IsUnsigned, ThriftType>::type* in) {
436 auto raw_value = LoadRaw(in);
437 if (ARROW_PREDICT_FALSE(raw_value >=
438 static_cast<ApiTypeRawEnum>(ApiType::UNDEFINED))) {
439 return ApiType::UNDEFINED;
440 }
441 return FromThrift(static_cast<ThriftType>(raw_value));
442 }
443
444 template <typename ThriftType, bool IsUnsigned = false>
445 inline static ApiTypeEnum LoadChecked(
446 typename std::enable_if<!IsUnsigned, ThriftType>::type* in) {
447 auto raw_value = LoadRaw(in);
448 if (ARROW_PREDICT_FALSE(raw_value >=
449 static_cast<ApiTypeRawEnum>(ApiType::UNDEFINED) ||
450 raw_value < 0)) {
451 return ApiType::UNDEFINED;
452 }
453 return FromThrift(static_cast<ThriftType>(raw_value));
454 }
455
456 template <typename ThriftType>
457 inline static ApiTypeEnum Load(ThriftType* in) {
458 return LoadChecked<ThriftType, std::is_unsigned<ApiTypeRawEnum>::value>(in);
459 }
460};
461
462} // namespace
463
464std::unique_ptr<Node> PrimitiveNode::FromParquet(const void* opaque_element,
465 int node_id) {
466 const format::SchemaElement* element =
467 static_cast<const format::SchemaElement*>(opaque_element);
468
469 std::unique_ptr<PrimitiveNode> primitive_node;
470 if (element->__isset.logicalType) {
471 // updated writer with logical type present
472 primitive_node = std::unique_ptr<PrimitiveNode>(new PrimitiveNode(
473 element->name, SafeLoader<Repetition>::Load(&(element->repetition_type)),
474 LogicalType::FromThrift(element->logicalType),
475 SafeLoader<Type>::Load(&(element->type)), element->type_length, node_id));
476 } else if (element->__isset.converted_type) {
477 // legacy writer with logical type present
478 primitive_node = std::unique_ptr<PrimitiveNode>(new PrimitiveNode(
479 element->name, SafeLoader<Repetition>::Load(&(element->repetition_type)),
480 SafeLoader<Type>::Load(&(element->type)),
481 SafeLoader<ConvertedType>::Load(&(element->converted_type)), element->type_length,
482 element->precision, element->scale, node_id));
483 } else {
484 // logical type not present
485 primitive_node = std::unique_ptr<PrimitiveNode>(new PrimitiveNode(
486 element->name, SafeLoader<Repetition>::Load(&(element->repetition_type)),
487 NoLogicalType::Make(), SafeLoader<Type>::Load(&(element->type)),
488 element->type_length, node_id));
489 }
490
491 // Return as unique_ptr to the base type
492 return std::unique_ptr<Node>(primitive_node.release());
493}
494
495void GroupNode::ToParquet(void* opaque_element) const {
496 format::SchemaElement* element = static_cast<format::SchemaElement*>(opaque_element);
497 element->__set_name(name_);
498 element->__set_num_children(field_count());
499 element->__set_repetition_type(ToThrift(repetition_));
500 if (converted_type_ != ConvertedType::NONE) {
501 element->__set_converted_type(ToThrift(converted_type_));
502 }
503 if (logical_type_ && logical_type_->is_serialized()) {
504 element->__set_logicalType(logical_type_->ToThrift());
505 }
506 return;
507}
508
509void PrimitiveNode::ToParquet(void* opaque_element) const {
510 format::SchemaElement* element = static_cast<format::SchemaElement*>(opaque_element);
511 element->__set_name(name_);
512 element->__set_repetition_type(ToThrift(repetition_));
513 if (converted_type_ != ConvertedType::NONE) {
514 element->__set_converted_type(ToThrift(converted_type_));
515 }
516 if (logical_type_ && logical_type_->is_serialized() &&
517 // TODO(tpboudreau): remove the following conjunct to enable serialization
518 // of IntervalTypes after parquet.thrift recognizes them
519 !logical_type_->is_interval()) {
520 element->__set_logicalType(logical_type_->ToThrift());
521 }
522 element->__set_type(ToThrift(physical_type_));
523 if (physical_type_ == Type::FIXED_LEN_BYTE_ARRAY) {
524 element->__set_type_length(type_length_);
525 }
526 if (decimal_metadata_.isset) {
527 element->__set_precision(decimal_metadata_.precision);
528 element->__set_scale(decimal_metadata_.scale);
529 }
530 return;
531}
532
533// ----------------------------------------------------------------------
534// Schema converters
535
536std::unique_ptr<Node> FlatSchemaConverter::Convert() {
537 const SchemaElement& root = elements_[0];
538
539 if (root.num_children == 0) {
540 if (length_ == 1) {
541 // Degenerate case of Parquet file with no columns
542 return GroupNode::FromParquet(static_cast<const void*>(&root), next_id(), {});
543 } else {
544 throw ParquetException(
545 "Parquet schema had multiple nodes but root had no children");
546 }
547 }
548
549 // Relaxing this restriction as some implementations don't set this
550 // if (root.repetition_type != FieldRepetitionType::REPEATED) {
551 // throw ParquetException("Root node was not FieldRepetitionType::REPEATED");
552 // }
553
554 return NextNode();
555}
556
557std::unique_ptr<Node> FlatSchemaConverter::NextNode() {
558 const SchemaElement& element = Next();
559
560 int node_id = next_id();
561
562 const void* opaque_element = static_cast<const void*>(&element);
563
564 if (element.num_children == 0) {
565 // Leaf (primitive) node
566 return PrimitiveNode::FromParquet(opaque_element, node_id);
567 } else {
568 // Group
569 NodeVector fields;
570 for (int i = 0; i < element.num_children; ++i) {
571 std::unique_ptr<Node> field = NextNode();
572 fields.push_back(NodePtr(field.release()));
573 }
574 return GroupNode::FromParquet(opaque_element, node_id, fields);
575 }
576}
577
578const format::SchemaElement& FlatSchemaConverter::Next() {
579 if (pos_ == length_) {
580 throw ParquetException("Malformed schema: not enough SchemaElement values");
581 }
582 return elements_[pos_++];
583}
584
585std::shared_ptr<SchemaDescriptor> FromParquet(const std::vector<SchemaElement>& schema) {
586 FlatSchemaConverter converter(&schema[0], static_cast<int>(schema.size()));
587 std::unique_ptr<Node> root = converter.Convert();
588
589 std::shared_ptr<SchemaDescriptor> descr = std::make_shared<SchemaDescriptor>();
590 descr->Init(std::shared_ptr<GroupNode>(static_cast<GroupNode*>(root.release())));
591
592 return descr;
593}
594
595void ToParquet(const GroupNode* schema, std::vector<format::SchemaElement>* out) {
596 SchemaFlattener flattener(schema, out);
597 flattener.Flatten();
598}
599
600class SchemaVisitor : public Node::ConstVisitor {
601 public:
602 explicit SchemaVisitor(std::vector<format::SchemaElement>* elements)
603 : elements_(elements) {}
604
605 void Visit(const Node* node) override {
606 format::SchemaElement element;
607 node->ToParquet(&element);
608 elements_->push_back(element);
609
610 if (node->is_group()) {
611 const GroupNode* group_node = static_cast<const GroupNode*>(node);
612 for (int i = 0; i < group_node->field_count(); ++i) {
613 group_node->field(i)->VisitConst(this);
614 }
615 }
616 }
617
618 private:
619 std::vector<format::SchemaElement>* elements_;
620};
621
622SchemaFlattener::SchemaFlattener(const GroupNode* schema,
623 std::vector<format::SchemaElement>* out)
624 : root_(schema), elements_(out) {}
625
626void SchemaFlattener::Flatten() {
627 SchemaVisitor visitor(elements_);
628 root_->VisitConst(&visitor);
629}
630
631// ----------------------------------------------------------------------
632// Schema printing
633
634class SchemaPrinter : public Node::ConstVisitor {
635 public:
636 explicit SchemaPrinter(std::ostream& stream, int indent_width)
637 : stream_(stream), indent_(0), indent_width_(2) {}
638
639 void Visit(const Node* node) override;
640
641 private:
642 void Visit(const PrimitiveNode* node);
643 void Visit(const GroupNode* node);
644
645 void Indent();
646
647 std::ostream& stream_;
648
649 int indent_;
650 int indent_width_;
651};
652
653static void PrintRepLevel(Repetition::type repetition, std::ostream& stream) {
654 switch (repetition) {
655 case Repetition::REQUIRED:
656 stream << "required";
657 break;
658 case Repetition::OPTIONAL:
659 stream << "optional";
660 break;
661 case Repetition::REPEATED:
662 stream << "repeated";
663 break;
664 default:
665 break;
666 }
667}
668
669static void PrintType(const PrimitiveNode* node, std::ostream& stream) {
670 switch (node->physical_type()) {
671 case Type::BOOLEAN:
672 stream << "boolean";
673 break;
674 case Type::INT32:
675 stream << "int32";
676 break;
677 case Type::INT64:
678 stream << "int64";
679 break;
680 case Type::INT96:
681 stream << "int96";
682 break;
683 case Type::FLOAT:
684 stream << "float";
685 break;
686 case Type::DOUBLE:
687 stream << "double";
688 break;
689 case Type::BYTE_ARRAY:
690 stream << "binary";
691 break;
692 case Type::FIXED_LEN_BYTE_ARRAY:
693 stream << "fixed_len_byte_array(" << node->type_length() << ")";
694 break;
695 default:
696 break;
697 }
698}
699
700static void PrintConvertedType(const PrimitiveNode* node, std::ostream& stream) {
701 auto lt = node->converted_type();
702 auto la = node->logical_type();
703 if (la && la->is_valid() && !la->is_none()) {
704 stream << " (" << la->ToString() << ")";
705 } else if (lt == ConvertedType::DECIMAL) {
706 stream << " (" << ConvertedTypeToString(lt) << "("
707 << node->decimal_metadata().precision << "," << node->decimal_metadata().scale
708 << "))";
709 } else if (lt != ConvertedType::NONE) {
710 stream << " (" << ConvertedTypeToString(lt) << ")";
711 }
712}
713
714void SchemaPrinter::Visit(const PrimitiveNode* node) {
715 PrintRepLevel(node->repetition(), stream_);
716 stream_ << " ";
717 PrintType(node, stream_);
718 stream_ << " " << node->name();
719 PrintConvertedType(node, stream_);
720 stream_ << ";" << std::endl;
721}
722
723void SchemaPrinter::Visit(const GroupNode* node) {
724 if (!node->parent()) {
725 stream_ << "message " << node->name() << " {" << std::endl;
726 } else {
727 PrintRepLevel(node->repetition(), stream_);
728 stream_ << " group " << node->name();
729 auto lt = node->converted_type();
730 auto la = node->logical_type();
731 if (la && la->is_valid() && !la->is_none()) {
732 stream_ << " (" << la->ToString() << ")";
733 } else if (lt != ConvertedType::NONE) {
734 stream_ << " (" << ConvertedTypeToString(lt) << ")";
735 }
736 stream_ << " {" << std::endl;
737 }
738
739 indent_ += indent_width_;
740 for (int i = 0; i < node->field_count(); ++i) {
741 node->field(i)->VisitConst(this);
742 }
743 indent_ -= indent_width_;
744 Indent();
745 stream_ << "}" << std::endl;
746}
747
748void SchemaPrinter::Indent() {
749 if (indent_ > 0) {
750 std::string spaces(indent_, ' ');
751 stream_ << spaces;
752 }
753}
754
755void SchemaPrinter::Visit(const Node* node) {
756 Indent();
757 if (node->is_group()) {
758 Visit(static_cast<const GroupNode*>(node));
759 } else {
760 // Primitive
761 Visit(static_cast<const PrimitiveNode*>(node));
762 }
763}
764
765void PrintSchema(const Node* schema, std::ostream& stream, int indent_width) {
766 SchemaPrinter printer(stream, indent_width);
767 printer.Visit(schema);
768}
769
770} // namespace schema
771
772using schema::ColumnPath;
773using schema::GroupNode;
774using schema::Node;
775using schema::NodePtr;
776using schema::PrimitiveNode;
777
778void SchemaDescriptor::Init(std::unique_ptr<schema::Node> schema) {
779 Init(NodePtr(schema.release()));
780}
781
782class SchemaUpdater : public Node::Visitor {
783 public:
784 explicit SchemaUpdater(const std::vector<ColumnOrder>& column_orders)
785 : column_orders_(column_orders), leaf_count_(0) {}
786
787 void Visit(Node* node) override {
788 if (node->is_group()) {
789 GroupNode* group_node = static_cast<GroupNode*>(node);
790 for (int i = 0; i < group_node->field_count(); ++i) {
791 group_node->field(i)->Visit(this);
792 }
793 } else { // leaf node
794 PrimitiveNode* leaf_node = static_cast<PrimitiveNode*>(node);
795 leaf_node->SetColumnOrder(column_orders_[leaf_count_++]);
796 }
797 }
798
799 private:
800 const std::vector<ColumnOrder>& column_orders_;
801 int leaf_count_;
802};
803
804void SchemaDescriptor::updateColumnOrders(const std::vector<ColumnOrder>& column_orders) {
805 if (static_cast<int>(column_orders.size()) != num_columns()) {
806 throw ParquetException("Malformed schema: not enough ColumnOrder values");
807 }
808 SchemaUpdater visitor(column_orders);
809 const_cast<GroupNode*>(group_node_)->Visit(&visitor);
810}
811
812void SchemaDescriptor::Init(const NodePtr& schema) {
813 schema_ = schema;
814
815 if (!schema_->is_group()) {
816 throw ParquetException("Must initialize with a schema group");
817 }
818
819 group_node_ = static_cast<const GroupNode*>(schema_.get());
820 leaves_.clear();
821
822 for (int i = 0; i < group_node_->field_count(); ++i) {
823 BuildTree(group_node_->field(i), 0, 0, group_node_->field(i));
824 }
825}
826
827bool SchemaDescriptor::Equals(const SchemaDescriptor& other) const {
828 if (this->num_columns() != other.num_columns()) {
829 return false;
830 }
831
832 for (int i = 0; i < this->num_columns(); ++i) {
833 if (!this->Column(i)->Equals(*other.Column(i))) {
834 return false;
835 }
836 }
837
838 return true;
839}
840
841void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level,
842 int16_t max_rep_level, const NodePtr& base) {
843 if (node->is_optional()) {
844 ++max_def_level;
845 } else if (node->is_repeated()) {
846 // Repeated fields add a definition level. This is used to distinguish
847 // between an empty list and a list with an item in it.
848 ++max_rep_level;
849 ++max_def_level;
850 }
851
852 // Now, walk the schema and create a ColumnDescriptor for each leaf node
853 if (node->is_group()) {
854 const GroupNode* group = static_cast<const GroupNode*>(node.get());
855 for (int i = 0; i < group->field_count(); ++i) {
856 BuildTree(group->field(i), max_def_level, max_rep_level, base);
857 }
858 } else {
859 node_to_leaf_index_[static_cast<const PrimitiveNode*>(node.get())] =
860 static_cast<int>(leaves_.size());
861
862 // Primitive node, append to leaves
863 leaves_.push_back(ColumnDescriptor(node, max_def_level, max_rep_level, this));
864 leaf_to_base_.emplace(static_cast<int>(leaves_.size()) - 1, base);
865 leaf_to_idx_.emplace(node->path()->ToDotString(),
866 static_cast<int>(leaves_.size()) - 1);
867 }
868}
869
870int SchemaDescriptor::GetColumnIndex(const PrimitiveNode& node) const {
871 auto it = node_to_leaf_index_.find(&node);
872 if (it == node_to_leaf_index_.end()) {
873 return -1;
874 }
875 return it->second;
876}
877
878ColumnDescriptor::ColumnDescriptor(const schema::NodePtr& node,
879 int16_t max_definition_level,
880 int16_t max_repetition_level,
881 const SchemaDescriptor* schema_descr)
882 : node_(node),
883 max_definition_level_(max_definition_level),
884 max_repetition_level_(max_repetition_level) {
885 if (!node_->is_primitive()) {
886 throw ParquetException("Must be a primitive type");
887 }
888 primitive_node_ = static_cast<const PrimitiveNode*>(node_.get());
889}
890
891bool ColumnDescriptor::Equals(const ColumnDescriptor& other) const {
892 return primitive_node_->Equals(other.primitive_node_) &&
893 max_repetition_level() == other.max_repetition_level() &&
894 max_definition_level() == other.max_definition_level();
895}
896
897const ColumnDescriptor* SchemaDescriptor::Column(int i) const {
898 DCHECK(i >= 0 && i < static_cast<int>(leaves_.size()));
899 return &leaves_[i];
900}
901
902int SchemaDescriptor::ColumnIndex(const std::string& node_path) const {
903 auto search = leaf_to_idx_.find(node_path);
904 if (search == leaf_to_idx_.end()) {
905 // Not found
906 return -1;
907 }
908 return search->second;
909}
910
911int SchemaDescriptor::ColumnIndex(const Node& node) const {
912 auto search = leaf_to_idx_.equal_range(node.path()->ToDotString());
913 for (auto it = search.first; it != search.second; ++it) {
914 const int idx = it->second;
915 if (&node == Column(idx)->schema_node().get()) {
916 return idx;
917 }
918 }
919 return -1;
920}
921
922const schema::Node* SchemaDescriptor::GetColumnRoot(int i) const {
923 DCHECK(i >= 0 && i < static_cast<int>(leaves_.size()));
924 return leaf_to_base_.find(i)->second.get();
925}
926
927std::string SchemaDescriptor::ToString() const {
928 std::ostringstream ss;
929 PrintSchema(schema_.get(), ss);
930 return ss.str();
931}
932
933std::string ColumnDescriptor::ToString() const {
934 std::ostringstream ss;
935 ss << "column descriptor = {" << std::endl
936 << " name: " << name() << "," << std::endl
937 << " path: " << path()->ToDotString() << "," << std::endl
938 << " physical_type: " << TypeToString(physical_type()) << "," << std::endl
939 << " converted_type: " << ConvertedTypeToString(converted_type()) << ","
940 << std::endl
941 << " logical_type: " << logical_type()->ToString() << "," << std::endl
942 << " max_definition_level: " << max_definition_level() << "," << std::endl
943 << " max_repetition_level: " << max_repetition_level() << "," << std::endl;
944
945 if (physical_type() == ::parquet::Type::FIXED_LEN_BYTE_ARRAY) {
946 ss << " length: " << type_length() << "," << std::endl;
947 }
948
949 if (converted_type() == parquet::ConvertedType::DECIMAL) {
950 ss << " precision: " << type_precision() << "," << std::endl
951 << " scale: " << type_scale() << "," << std::endl;
952 }
953
954 ss << "}";
955 return ss.str();
956}
957
958int ColumnDescriptor::type_scale() const {
959 return primitive_node_->decimal_metadata().scale;
960}
961
962int ColumnDescriptor::type_precision() const {
963 return primitive_node_->decimal_metadata().precision;
964}
965
966int ColumnDescriptor::type_length() const { return primitive_node_->type_length(); }
967
968const std::shared_ptr<ColumnPath> ColumnDescriptor::path() const {
969 return primitive_node_->path();
970}
971
972} // namespace parquet
973