1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <algorithm>
19#include <ostream>
20#include <string>
21#include <utility>
22#include <vector>
23
24#include "arrow/util/logging.h"
25
26#include "parquet/exception.h"
27#include "parquet/metadata.h"
28#include "parquet/schema.h"
29#include "parquet/schema_internal.h"
30#include "parquet/statistics.h"
31#include "parquet/thrift_internal.h"
32
33// ARROW-6096: The boost regex library must be used when compiling with gcc < 4.9
34#if defined(PARQUET_USE_BOOST_REGEX)
35#include <boost/regex.hpp> // IWYU pragma: keep
36using ::boost::regex;
37using ::boost::regex_match;
38using ::boost::smatch;
39#else
40#include <regex>
41using ::std::regex;
42using ::std::regex_match;
43using ::std::smatch;
44#endif
45
46namespace parquet {
47
48const ApplicationVersion& ApplicationVersion::PARQUET_251_FIXED_VERSION() {
49 static ApplicationVersion version("parquet-mr", 1, 8, 0);
50 return version;
51}
52
53const ApplicationVersion& ApplicationVersion::PARQUET_816_FIXED_VERSION() {
54 static ApplicationVersion version("parquet-mr", 1, 2, 9);
55 return version;
56}
57
58const ApplicationVersion& ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() {
59 static ApplicationVersion version("parquet-cpp", 1, 3, 0);
60 return version;
61}
62
63const ApplicationVersion& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() {
64 static ApplicationVersion version("parquet-mr", 1, 10, 0);
65 return version;
66}
67
68std::string ParquetVersionToString(ParquetVersion::type ver) {
69 switch (ver) {
70 case ParquetVersion::PARQUET_1_0:
71 return "1.0";
72 case ParquetVersion::PARQUET_2_0:
73 return "2.0";
74 }
75
76 // This should be unreachable
77 return "UNKNOWN";
78}
79
80template <typename DType>
81static std::shared_ptr<Statistics> MakeTypedColumnStats(
82 const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
83 // If ColumnOrder is defined, return max_value and min_value
84 if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
85 return MakeStatistics<DType>(
86 descr, metadata.statistics.min_value, metadata.statistics.max_value,
87 metadata.num_values - metadata.statistics.null_count,
88 metadata.statistics.null_count, metadata.statistics.distinct_count,
89 metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value);
90 }
91 // Default behavior
92 return MakeStatistics<DType>(
93 descr, metadata.statistics.min, metadata.statistics.max,
94 metadata.num_values - metadata.statistics.null_count,
95 metadata.statistics.null_count, metadata.statistics.distinct_count,
96 metadata.statistics.__isset.max || metadata.statistics.__isset.min);
97}
98
99std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_data,
100 const ColumnDescriptor* descr) {
101 switch (static_cast<Type::type>(meta_data.type)) {
102 case Type::BOOLEAN:
103 return MakeTypedColumnStats<BooleanType>(meta_data, descr);
104 case Type::INT32:
105 return MakeTypedColumnStats<Int32Type>(meta_data, descr);
106 case Type::INT64:
107 return MakeTypedColumnStats<Int64Type>(meta_data, descr);
108 case Type::INT96:
109 return MakeTypedColumnStats<Int96Type>(meta_data, descr);
110 case Type::DOUBLE:
111 return MakeTypedColumnStats<DoubleType>(meta_data, descr);
112 case Type::FLOAT:
113 return MakeTypedColumnStats<FloatType>(meta_data, descr);
114 case Type::BYTE_ARRAY:
115 return MakeTypedColumnStats<ByteArrayType>(meta_data, descr);
116 case Type::FIXED_LEN_BYTE_ARRAY:
117 return MakeTypedColumnStats<FLBAType>(meta_data, descr);
118 case Type::UNDEFINED:
119 break;
120 }
121 throw ParquetException("Can't decode page statistics for selected column type");
122}
123
124// MetaData Accessor
125// ColumnChunk metadata
126class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
127 public:
128 explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column,
129 const ColumnDescriptor* descr,
130 const ApplicationVersion* writer_version)
131 : column_(column), descr_(descr), writer_version_(writer_version) {
132 const format::ColumnMetaData& meta_data = column->meta_data;
133 for (auto encoding : meta_data.encodings) {
134 encodings_.push_back(FromThrift(encoding));
135 }
136 possible_stats_ = nullptr;
137 }
138
139 // column chunk
140 inline int64_t file_offset() const { return column_->file_offset; }
141 inline const std::string& file_path() const { return column_->file_path; }
142
143 // column metadata
144 inline Type::type type() const { return FromThrift(column_->meta_data.type); }
145
146 inline int64_t num_values() const { return column_->meta_data.num_values; }
147
148 std::shared_ptr<schema::ColumnPath> path_in_schema() {
149 return std::make_shared<schema::ColumnPath>(column_->meta_data.path_in_schema);
150 }
151
152 // Check if statistics are set and are valid
153 // 1) Must be set in the metadata
154 // 2) Statistics must not be corrupted
155 inline bool is_stats_set() const {
156 DCHECK(writer_version_ != nullptr);
157 // If the column statistics don't exist or column sort order is unknown
158 // we cannot use the column stats
159 if (!column_->meta_data.__isset.statistics ||
160 descr_->sort_order() == SortOrder::UNKNOWN) {
161 return false;
162 }
163 if (possible_stats_ == nullptr) {
164 possible_stats_ = MakeColumnStats(column_->meta_data, descr_);
165 }
166 EncodedStatistics encodedStatistics = possible_stats_->Encode();
167 return writer_version_->HasCorrectStatistics(type(), encodedStatistics,
168 descr_->sort_order());
169 }
170
171 inline std::shared_ptr<Statistics> statistics() const {
172 return is_stats_set() ? possible_stats_ : nullptr;
173 }
174
175 inline Compression::type compression() const {
176 return FromThrift(column_->meta_data.codec);
177 }
178
179 const std::vector<Encoding::type>& encodings() const { return encodings_; }
180
181 inline bool has_dictionary_page() const {
182 return column_->meta_data.__isset.dictionary_page_offset;
183 }
184
185 inline int64_t dictionary_page_offset() const {
186 return column_->meta_data.dictionary_page_offset;
187 }
188
189 inline int64_t data_page_offset() const { return column_->meta_data.data_page_offset; }
190
191 inline bool has_index_page() const {
192 return column_->meta_data.__isset.index_page_offset;
193 }
194
195 inline int64_t index_page_offset() const {
196 return column_->meta_data.index_page_offset;
197 }
198
199 inline int64_t total_compressed_size() const {
200 return column_->meta_data.total_compressed_size;
201 }
202
203 inline int64_t total_uncompressed_size() const {
204 return column_->meta_data.total_uncompressed_size;
205 }
206
207 private:
208 mutable std::shared_ptr<Statistics> possible_stats_;
209 std::vector<Encoding::type> encodings_;
210 const format::ColumnChunk* column_;
211 const ColumnDescriptor* descr_;
212 const ApplicationVersion* writer_version_;
213};
214
215std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
216 const void* metadata, const ColumnDescriptor* descr,
217 const ApplicationVersion* writer_version) {
218 return std::unique_ptr<ColumnChunkMetaData>(
219 new ColumnChunkMetaData(metadata, descr, writer_version));
220}
221
222ColumnChunkMetaData::ColumnChunkMetaData(const void* metadata,
223 const ColumnDescriptor* descr,
224 const ApplicationVersion* writer_version)
225 : impl_{std::unique_ptr<ColumnChunkMetaDataImpl>(new ColumnChunkMetaDataImpl(
226 reinterpret_cast<const format::ColumnChunk*>(metadata), descr,
227 writer_version))} {}
228ColumnChunkMetaData::~ColumnChunkMetaData() {}
229
230// column chunk
231int64_t ColumnChunkMetaData::file_offset() const { return impl_->file_offset(); }
232
233const std::string& ColumnChunkMetaData::file_path() const { return impl_->file_path(); }
234
235// column metadata
236Type::type ColumnChunkMetaData::type() const { return impl_->type(); }
237
238int64_t ColumnChunkMetaData::num_values() const { return impl_->num_values(); }
239
240std::shared_ptr<schema::ColumnPath> ColumnChunkMetaData::path_in_schema() const {
241 return impl_->path_in_schema();
242}
243
244std::shared_ptr<Statistics> ColumnChunkMetaData::statistics() const {
245 return impl_->statistics();
246}
247
248bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); }
249
250bool ColumnChunkMetaData::has_dictionary_page() const {
251 return impl_->has_dictionary_page();
252}
253
254int64_t ColumnChunkMetaData::dictionary_page_offset() const {
255 return impl_->dictionary_page_offset();
256}
257
258int64_t ColumnChunkMetaData::data_page_offset() const {
259 return impl_->data_page_offset();
260}
261
262bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); }
263
264int64_t ColumnChunkMetaData::index_page_offset() const {
265 return impl_->index_page_offset();
266}
267
268Compression::type ColumnChunkMetaData::compression() const {
269 return impl_->compression();
270}
271
272const std::vector<Encoding::type>& ColumnChunkMetaData::encodings() const {
273 return impl_->encodings();
274}
275
276int64_t ColumnChunkMetaData::total_uncompressed_size() const {
277 return impl_->total_uncompressed_size();
278}
279
280int64_t ColumnChunkMetaData::total_compressed_size() const {
281 return impl_->total_compressed_size();
282}
283
284// row-group metadata
285class RowGroupMetaData::RowGroupMetaDataImpl {
286 public:
287 explicit RowGroupMetaDataImpl(const format::RowGroup* row_group,
288 const SchemaDescriptor* schema,
289 const ApplicationVersion* writer_version)
290 : row_group_(row_group), schema_(schema), writer_version_(writer_version) {}
291
292 inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); }
293
294 inline int64_t num_rows() const { return row_group_->num_rows; }
295
296 inline int64_t total_byte_size() const { return row_group_->total_byte_size; }
297
298 inline const SchemaDescriptor* schema() const { return schema_; }
299
300 std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
301 if (!(i < num_columns())) {
302 std::stringstream ss;
303 ss << "The file only has " << num_columns()
304 << " columns, requested metadata for column: " << i;
305 throw ParquetException(ss.str());
306 }
307 return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i),
308 writer_version_);
309 }
310
311 private:
312 const format::RowGroup* row_group_;
313 const SchemaDescriptor* schema_;
314 const ApplicationVersion* writer_version_;
315};
316
317std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
318 const void* metadata, const SchemaDescriptor* schema,
319 const ApplicationVersion* writer_version) {
320 return std::unique_ptr<RowGroupMetaData>(
321 new RowGroupMetaData(metadata, schema, writer_version));
322}
323
324RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
325 const ApplicationVersion* writer_version)
326 : impl_{std::unique_ptr<RowGroupMetaDataImpl>(new RowGroupMetaDataImpl(
327 reinterpret_cast<const format::RowGroup*>(metadata), schema, writer_version))} {
328}
329RowGroupMetaData::~RowGroupMetaData() {}
330
331int RowGroupMetaData::num_columns() const { return impl_->num_columns(); }
332
333int64_t RowGroupMetaData::num_rows() const { return impl_->num_rows(); }
334
335int64_t RowGroupMetaData::total_byte_size() const { return impl_->total_byte_size(); }
336
337const SchemaDescriptor* RowGroupMetaData::schema() const { return impl_->schema(); }
338
339std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int i) const {
340 return impl_->ColumnChunk(i);
341}
342
343// file metadata
344class FileMetaData::FileMetaDataImpl {
345 public:
346 FileMetaDataImpl() : metadata_len_(0) {}
347
348 explicit FileMetaDataImpl(const void* metadata, uint32_t* metadata_len)
349 : metadata_len_(0) {
350 metadata_.reset(new format::FileMetaData);
351 DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len,
352 metadata_.get());
353 metadata_len_ = *metadata_len;
354
355 if (metadata_->__isset.created_by) {
356 writer_version_ = ApplicationVersion(metadata_->created_by);
357 } else {
358 writer_version_ = ApplicationVersion("unknown 0.0.0");
359 }
360
361 InitSchema();
362 InitColumnOrders();
363 InitKeyValueMetadata();
364 }
365
366 inline uint32_t size() const { return metadata_len_; }
367 inline int num_columns() const { return schema_.num_columns(); }
368 inline int64_t num_rows() const { return metadata_->num_rows; }
369 inline int num_row_groups() const {
370 return static_cast<int>(metadata_->row_groups.size());
371 }
372 inline int32_t version() const { return metadata_->version; }
373 inline const std::string& created_by() const { return metadata_->created_by; }
374 inline int num_schema_elements() const {
375 return static_cast<int>(metadata_->schema.size());
376 }
377
378 const ApplicationVersion& writer_version() const { return writer_version_; }
379
380 void WriteTo(::arrow::io::OutputStream* dst) const {
381 ThriftSerializer serializer;
382 serializer.Serialize(metadata_.get(), dst);
383 }
384
385 std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
386 if (!(i < num_row_groups())) {
387 std::stringstream ss;
388 ss << "The file only has " << num_row_groups()
389 << " row groups, requested metadata for row group: " << i;
390 throw ParquetException(ss.str());
391 }
392 return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_);
393 }
394
395 const SchemaDescriptor* schema() const { return &schema_; }
396
397 std::shared_ptr<const KeyValueMetadata> key_value_metadata() const {
398 return key_value_metadata_;
399 }
400
401 void set_file_path(const std::string& path) {
402 for (format::RowGroup& row_group : metadata_->row_groups) {
403 for (format::ColumnChunk& chunk : row_group.columns) {
404 chunk.__set_file_path(path);
405 }
406 }
407 }
408
409 format::RowGroup& row_group(int i) {
410 DCHECK_LT(i, num_row_groups());
411 return metadata_->row_groups[i];
412 }
413
414 void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) {
415 format::RowGroup other_rg;
416 for (int i = 0; i < other->num_row_groups(); i++) {
417 other_rg = other->row_group(i);
418 metadata_->row_groups.push_back(other_rg);
419 metadata_->num_rows += other_rg.num_rows;
420 }
421 }
422
423 private:
424 friend FileMetaDataBuilder;
425 uint32_t metadata_len_;
426 std::unique_ptr<format::FileMetaData> metadata_;
427 void InitSchema() {
428 schema::FlatSchemaConverter converter(&metadata_->schema[0],
429 static_cast<int>(metadata_->schema.size()));
430 schema_.Init(converter.Convert());
431 }
432 void InitColumnOrders() {
433 // update ColumnOrder
434 std::vector<parquet::ColumnOrder> column_orders;
435 if (metadata_->__isset.column_orders) {
436 for (auto column_order : metadata_->column_orders) {
437 if (column_order.__isset.TYPE_ORDER) {
438 column_orders.push_back(ColumnOrder::type_defined_);
439 } else {
440 column_orders.push_back(ColumnOrder::undefined_);
441 }
442 }
443 } else {
444 column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_);
445 }
446
447 schema_.updateColumnOrders(column_orders);
448 }
449 SchemaDescriptor schema_;
450 ApplicationVersion writer_version_;
451
452 void InitKeyValueMetadata() {
453 std::shared_ptr<KeyValueMetadata> metadata = nullptr;
454 if (metadata_->__isset.key_value_metadata) {
455 metadata = std::make_shared<KeyValueMetadata>();
456 for (const auto& it : metadata_->key_value_metadata) {
457 metadata->Append(it.key, it.value);
458 }
459 }
460 key_value_metadata_ = metadata;
461 }
462
463 std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
464};
465
466std::shared_ptr<FileMetaData> FileMetaData::Make(const void* metadata,
467 uint32_t* metadata_len) {
468 // This FileMetaData ctor is private, not compatible with std::make_shared
469 return std::shared_ptr<FileMetaData>(new FileMetaData(metadata, metadata_len));
470}
471
472FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len)
473 : impl_{std::unique_ptr<FileMetaDataImpl>(
474 new FileMetaDataImpl(metadata, metadata_len))} {}
475
476FileMetaData::FileMetaData()
477 : impl_{std::unique_ptr<FileMetaDataImpl>(new FileMetaDataImpl())} {}
478
479FileMetaData::~FileMetaData() {}
480
481std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i) const {
482 return impl_->RowGroup(i);
483}
484
485uint32_t FileMetaData::size() const { return impl_->size(); }
486
487int FileMetaData::num_columns() const { return impl_->num_columns(); }
488
489int64_t FileMetaData::num_rows() const { return impl_->num_rows(); }
490
491int FileMetaData::num_row_groups() const { return impl_->num_row_groups(); }
492
493ParquetVersion::type FileMetaData::version() const {
494 switch (impl_->version()) {
495 case 1:
496 return ParquetVersion::PARQUET_1_0;
497 case 2:
498 return ParquetVersion::PARQUET_2_0;
499 default:
500 // Improperly set version, assuming Parquet 1.0
501 break;
502 }
503 return ParquetVersion::PARQUET_1_0;
504}
505
506const ApplicationVersion& FileMetaData::writer_version() const {
507 return impl_->writer_version();
508}
509
510const std::string& FileMetaData::created_by() const { return impl_->created_by(); }
511
512int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); }
513
514const SchemaDescriptor* FileMetaData::schema() const { return impl_->schema(); }
515
516std::shared_ptr<const KeyValueMetadata> FileMetaData::key_value_metadata() const {
517 return impl_->key_value_metadata();
518}
519
520void FileMetaData::set_file_path(const std::string& path) { impl_->set_file_path(path); }
521
522void FileMetaData::AppendRowGroups(const FileMetaData& other) {
523 impl_->AppendRowGroups(other.impl_);
524}
525
526void FileMetaData::WriteTo(::arrow::io::OutputStream* dst) const {
527 return impl_->WriteTo(dst);
528}
529
530ApplicationVersion::ApplicationVersion(const std::string& application, int major,
531 int minor, int patch)
532 : application_(application), version{major, minor, patch, "", "", ""} {}
533
534ApplicationVersion::ApplicationVersion(const std::string& created_by) {
535 regex app_regex{ApplicationVersion::APPLICATION_FORMAT};
536 regex ver_regex{ApplicationVersion::VERSION_FORMAT};
537 smatch app_matches;
538 smatch ver_matches;
539
540 std::string created_by_lower = created_by;
541 std::transform(created_by_lower.begin(), created_by_lower.end(),
542 created_by_lower.begin(), ::tolower);
543
544 bool app_success = regex_match(created_by_lower, app_matches, app_regex);
545 bool ver_success = false;
546 std::string version_str;
547
548 if (app_success && app_matches.size() >= 4) {
549 // first match is the entire string. sub-matches start from second.
550 application_ = app_matches[1];
551 version_str = app_matches[3];
552 build_ = app_matches[4];
553 ver_success = regex_match(version_str, ver_matches, ver_regex);
554 } else {
555 application_ = "unknown";
556 }
557
558 if (ver_success && ver_matches.size() >= 7) {
559 version.major = atoi(ver_matches[1].str().c_str());
560 version.minor = atoi(ver_matches[2].str().c_str());
561 version.patch = atoi(ver_matches[3].str().c_str());
562 version.unknown = ver_matches[4].str();
563 version.pre_release = ver_matches[5].str();
564 version.build_info = ver_matches[6].str();
565 } else {
566 version.major = 0;
567 version.minor = 0;
568 version.patch = 0;
569 }
570}
571
572bool ApplicationVersion::VersionLt(const ApplicationVersion& other_version) const {
573 if (application_ != other_version.application_) return false;
574
575 if (version.major < other_version.version.major) return true;
576 if (version.major > other_version.version.major) return false;
577 DCHECK_EQ(version.major, other_version.version.major);
578 if (version.minor < other_version.version.minor) return true;
579 if (version.minor > other_version.version.minor) return false;
580 DCHECK_EQ(version.minor, other_version.version.minor);
581 return version.patch < other_version.version.patch;
582}
583
584bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) const {
585 return application_ == other_version.application_ &&
586 version.major == other_version.version.major &&
587 version.minor == other_version.version.minor &&
588 version.patch == other_version.version.patch;
589}
590
591// Reference:
592// parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
593// PARQUET-686 has more disussion on statistics
594bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
595 EncodedStatistics& statistics,
596 SortOrder::type sort_order) const {
597 // parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed
598 // correctly for all types
599 if ((application_ == "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) ||
600 (application_ == "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) {
601 // Only SIGNED are valid unless max and min are the same
602 // (in which case the sort order does not matter)
603 bool max_equals_min = statistics.has_min && statistics.has_max
604 ? statistics.min() == statistics.max()
605 : false;
606 if (SortOrder::SIGNED != sort_order && !max_equals_min) {
607 return false;
608 }
609
610 // Statistics of other types are OK
611 if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
612 return true;
613 }
614 }
615 // created_by is not populated, which could have been caused by
616 // parquet-mr during the same time as PARQUET-251, see PARQUET-297
617 if (application_ == "unknown") {
618 return true;
619 }
620
621 // Unknown sort order has incorrect stats
622 if (SortOrder::UNKNOWN == sort_order) {
623 return false;
624 }
625
626 // PARQUET-251
627 if (VersionLt(PARQUET_251_FIXED_VERSION())) {
628 return false;
629 }
630
631 return true;
632}
633
634// MetaData Builders
635// row-group metadata
636class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
637 public:
638 explicit ColumnChunkMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
639 const ColumnDescriptor* column)
640 : owned_column_chunk_(new format::ColumnChunk),
641 properties_(props),
642 column_(column) {
643 Init(owned_column_chunk_.get());
644 }
645
646 explicit ColumnChunkMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
647 const ColumnDescriptor* column,
648 format::ColumnChunk* column_chunk)
649 : properties_(props), column_(column) {
650 Init(column_chunk);
651 }
652
653 const void* contents() const { return column_chunk_; }
654
655 // column chunk
656 void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }
657
658 // column metadata
659 void SetStatistics(const EncodedStatistics& val) {
660 column_chunk_->meta_data.__set_statistics(ToThrift(val));
661 }
662
663 void Finish(int64_t num_values, int64_t dictionary_page_offset,
664 int64_t index_page_offset, int64_t data_page_offset,
665 int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
666 bool dictionary_fallback) {
667 if (dictionary_page_offset > 0) {
668 column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
669 column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
670 } else {
671 column_chunk_->__set_file_offset(data_page_offset + compressed_size);
672 }
673 column_chunk_->__isset.meta_data = true;
674 column_chunk_->meta_data.__set_num_values(num_values);
675 if (index_page_offset >= 0) {
676 column_chunk_->meta_data.__set_index_page_offset(index_page_offset);
677 }
678 column_chunk_->meta_data.__set_data_page_offset(data_page_offset);
679 column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);
680 column_chunk_->meta_data.__set_total_compressed_size(compressed_size);
681 std::vector<format::Encoding::type> thrift_encodings;
682 if (has_dictionary) {
683 thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding()));
684 if (properties_->version() == ParquetVersion::PARQUET_1_0) {
685 thrift_encodings.push_back(ToThrift(Encoding::PLAIN));
686 } else {
687 thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding()));
688 }
689 } else { // Dictionary not enabled
690 thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path())));
691 }
692 thrift_encodings.push_back(ToThrift(Encoding::RLE));
693 // Only PLAIN encoding is supported for fallback in V1
694 // TODO(majetideepak): Use user specified encoding for V2
695 if (dictionary_fallback) {
696 thrift_encodings.push_back(ToThrift(Encoding::PLAIN));
697 }
698 column_chunk_->meta_data.__set_encodings(thrift_encodings);
699 }
700
701 void WriteTo(::arrow::io::OutputStream* sink) {
702 ThriftSerializer serializer;
703 serializer.Serialize(column_chunk_, sink);
704 }
705
706 const ColumnDescriptor* descr() const { return column_; }
707
708 private:
709 void Init(format::ColumnChunk* column_chunk) {
710 column_chunk_ = column_chunk;
711 column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type()));
712 column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector());
713 column_chunk_->meta_data.__set_codec(
714 ToThrift(properties_->compression(column_->path())));
715 }
716
717 format::ColumnChunk* column_chunk_;
718 std::unique_ptr<format::ColumnChunk> owned_column_chunk_;
719 const std::shared_ptr<WriterProperties> properties_;
720 const ColumnDescriptor* column_;
721};
722
723std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
724 const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
725 void* contents) {
726 return std::unique_ptr<ColumnChunkMetaDataBuilder>(
727 new ColumnChunkMetaDataBuilder(props, column, contents));
728}
729
730std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
731 const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column) {
732 return std::unique_ptr<ColumnChunkMetaDataBuilder>(
733 new ColumnChunkMetaDataBuilder(props, column));
734}
735
736ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
737 const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column)
738 : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>(
739 new ColumnChunkMetaDataBuilderImpl(props, column))} {}
740
741ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
742 const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column,
743 void* contents)
744 : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>(
745 new ColumnChunkMetaDataBuilderImpl(
746 props, column, reinterpret_cast<format::ColumnChunk*>(contents)))} {}
747
748ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() {}
749
750const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); }
751
752void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) {
753 impl_->set_file_path(path);
754}
755
756void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
757 int64_t dictionary_page_offset,
758 int64_t index_page_offset,
759 int64_t data_page_offset, int64_t compressed_size,
760 int64_t uncompressed_size, bool has_dictionary,
761 bool dictionary_fallback) {
762 impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset,
763 compressed_size, uncompressed_size, has_dictionary, dictionary_fallback);
764}
765
766void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream* sink) {
767 impl_->WriteTo(sink);
768}
769
770const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
771 return impl_->descr();
772}
773
774void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) {
775 impl_->SetStatistics(result);
776}
777
778class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
779 public:
780 explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
781 const SchemaDescriptor* schema, void* contents)
782 : properties_(props), schema_(schema), next_column_(0) {
783 row_group_ = reinterpret_cast<format::RowGroup*>(contents);
784 InitializeColumns(schema->num_columns());
785 }
786
787 ColumnChunkMetaDataBuilder* NextColumnChunk() {
788 if (!(next_column_ < num_columns())) {
789 std::stringstream ss;
790 ss << "The schema only has " << num_columns()
791 << " columns, requested metadata for column: " << next_column_;
792 throw ParquetException(ss.str());
793 }
794 auto column = schema_->Column(next_column_);
795 auto column_builder = ColumnChunkMetaDataBuilder::Make(
796 properties_, column, &row_group_->columns[next_column_++]);
797 auto column_builder_ptr = column_builder.get();
798 column_builders_.push_back(std::move(column_builder));
799 return column_builder_ptr;
800 }
801
802 int current_column() { return next_column_ - 1; }
803
804 void Finish(int64_t total_bytes_written) {
805 if (!(next_column_ == schema_->num_columns())) {
806 std::stringstream ss;
807 ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns()
808 << " columns are initialized";
809 throw ParquetException(ss.str());
810 }
811 int64_t total_byte_size = 0;
812
813 for (int i = 0; i < schema_->num_columns(); i++) {
814 if (!(row_group_->columns[i].file_offset >= 0)) {
815 std::stringstream ss;
816 ss << "Column " << i << " is not complete.";
817 throw ParquetException(ss.str());
818 }
819 total_byte_size += row_group_->columns[i].meta_data.total_compressed_size;
820 }
821 DCHECK(total_bytes_written == total_byte_size)
822 << "Total bytes in this RowGroup does not match with compressed sizes of columns";
823
824 row_group_->__set_total_byte_size(total_byte_size);
825 }
826
827 void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; }
828
829 int num_columns() { return static_cast<int>(row_group_->columns.size()); }
830
831 int64_t num_rows() { return row_group_->num_rows; }
832
833 private:
834 void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }
835
836 format::RowGroup* row_group_;
837 const std::shared_ptr<WriterProperties> properties_;
838 const SchemaDescriptor* schema_;
839 std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder>> column_builders_;
840 int next_column_;
841};
842
843std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
844 const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
845 void* contents) {
846 return std::unique_ptr<RowGroupMetaDataBuilder>(
847 new RowGroupMetaDataBuilder(props, schema_, contents));
848}
849
850RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
851 const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
852 void* contents)
853 : impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
854 new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
855
856RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {}
857
858ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() {
859 return impl_->NextColumnChunk();
860}
861
862int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); }
863
864int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); }
865
866int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); }
867
868void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) {
869 impl_->set_num_rows(num_rows);
870}
871
872void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) {
873 impl_->Finish(total_bytes_written);
874}
875
876// file metadata
877// TODO(PARQUET-595) Support key_value_metadata
878class FileMetaDataBuilder::FileMetaDataBuilderImpl {
879 public:
880 explicit FileMetaDataBuilderImpl(
881 const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props,
882 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
883 : properties_(props), schema_(schema), key_value_metadata_(key_value_metadata) {
884 metadata_.reset(new format::FileMetaData());
885 }
886
887 RowGroupMetaDataBuilder* AppendRowGroup() {
888 row_groups_.emplace_back();
889 current_row_group_builder_ =
890 RowGroupMetaDataBuilder::Make(properties_, schema_, &row_groups_.back());
891 return current_row_group_builder_.get();
892 }
893
894 std::unique_ptr<FileMetaData> Finish() {
895 int64_t total_rows = 0;
896 for (auto row_group : row_groups_) {
897 total_rows += row_group.num_rows;
898 }
899 metadata_->__set_num_rows(total_rows);
900 metadata_->__set_row_groups(row_groups_);
901
902 if (key_value_metadata_) {
903 metadata_->key_value_metadata.clear();
904 metadata_->key_value_metadata.reserve(key_value_metadata_->size());
905 for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
906 format::KeyValue kv_pair;
907 kv_pair.__set_key(key_value_metadata_->key(i));
908 kv_pair.__set_value(key_value_metadata_->value(i));
909 metadata_->key_value_metadata.push_back(kv_pair);
910 }
911 metadata_->__isset.key_value_metadata = true;
912 }
913
914 int32_t file_version = 0;
915 switch (properties_->version()) {
916 case ParquetVersion::PARQUET_1_0:
917 file_version = 1;
918 break;
919 case ParquetVersion::PARQUET_2_0:
920 file_version = 2;
921 break;
922 default:
923 break;
924 }
925 metadata_->__set_version(file_version);
926 metadata_->__set_created_by(properties_->created_by());
927
928 // Users cannot set the `ColumnOrder` since we donot not have user defined sort order
929 // in the spec yet.
930 // We always default to `TYPE_DEFINED_ORDER`. We can expose it in
931 // the API once we have user defined sort orders in the Parquet format.
932 // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType
933 format::TypeDefinedOrder type_defined_order;
934 format::ColumnOrder column_order;
935 column_order.__set_TYPE_ORDER(type_defined_order);
936 column_order.__isset.TYPE_ORDER = true;
937 metadata_->column_orders.resize(schema_->num_columns(), column_order);
938 metadata_->__isset.column_orders = true;
939
940 parquet::schema::SchemaFlattener flattener(
941 static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()),
942 &metadata_->schema);
943 flattener.Flatten();
944 auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData());
945 file_meta_data->impl_->metadata_ = std::move(metadata_);
946 file_meta_data->impl_->InitSchema();
947 return file_meta_data;
948 }
949
950 protected:
951 std::unique_ptr<format::FileMetaData> metadata_;
952
953 private:
954 const std::shared_ptr<WriterProperties> properties_;
955 std::vector<format::RowGroup> row_groups_;
956
957 std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_;
958 const SchemaDescriptor* schema_;
959 std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
960};
961
962std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
963 const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props,
964 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
965 return std::unique_ptr<FileMetaDataBuilder>(
966 new FileMetaDataBuilder(schema, props, key_value_metadata));
967}
968
969FileMetaDataBuilder::FileMetaDataBuilder(
970 const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props,
971 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
972 : impl_{std::unique_ptr<FileMetaDataBuilderImpl>(
973 new FileMetaDataBuilderImpl(schema, props, key_value_metadata))} {}
974
975FileMetaDataBuilder::~FileMetaDataBuilder() {}
976
977RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
978 return impl_->AppendRowGroup();
979}
980
981std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
982
983} // namespace parquet
984