1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <algorithm> |
19 | #include <ostream> |
20 | #include <string> |
21 | #include <utility> |
22 | #include <vector> |
23 | |
24 | #include "arrow/util/logging.h" |
25 | |
26 | #include "parquet/exception.h" |
27 | #include "parquet/metadata.h" |
28 | #include "parquet/schema.h" |
29 | #include "parquet/schema_internal.h" |
30 | #include "parquet/statistics.h" |
31 | #include "parquet/thrift_internal.h" |
32 | |
33 | // ARROW-6096: The boost regex library must be used when compiling with gcc < 4.9 |
34 | #if defined(PARQUET_USE_BOOST_REGEX) |
35 | #include <boost/regex.hpp> // IWYU pragma: keep |
36 | using ::boost::regex; |
37 | using ::boost::regex_match; |
38 | using ::boost::smatch; |
39 | #else |
40 | #include <regex> |
41 | using ::std::regex; |
42 | using ::std::regex_match; |
43 | using ::std::smatch; |
44 | #endif |
45 | |
46 | namespace parquet { |
47 | |
48 | const ApplicationVersion& ApplicationVersion::PARQUET_251_FIXED_VERSION() { |
49 | static ApplicationVersion version("parquet-mr" , 1, 8, 0); |
50 | return version; |
51 | } |
52 | |
53 | const ApplicationVersion& ApplicationVersion::PARQUET_816_FIXED_VERSION() { |
54 | static ApplicationVersion version("parquet-mr" , 1, 2, 9); |
55 | return version; |
56 | } |
57 | |
58 | const ApplicationVersion& ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() { |
59 | static ApplicationVersion version("parquet-cpp" , 1, 3, 0); |
60 | return version; |
61 | } |
62 | |
63 | const ApplicationVersion& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() { |
64 | static ApplicationVersion version("parquet-mr" , 1, 10, 0); |
65 | return version; |
66 | } |
67 | |
68 | std::string ParquetVersionToString(ParquetVersion::type ver) { |
69 | switch (ver) { |
70 | case ParquetVersion::PARQUET_1_0: |
71 | return "1.0" ; |
72 | case ParquetVersion::PARQUET_2_0: |
73 | return "2.0" ; |
74 | } |
75 | |
76 | // This should be unreachable |
77 | return "UNKNOWN" ; |
78 | } |
79 | |
80 | template <typename DType> |
81 | static std::shared_ptr<Statistics> MakeTypedColumnStats( |
82 | const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) { |
83 | // If ColumnOrder is defined, return max_value and min_value |
84 | if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) { |
85 | return MakeStatistics<DType>( |
86 | descr, metadata.statistics.min_value, metadata.statistics.max_value, |
87 | metadata.num_values - metadata.statistics.null_count, |
88 | metadata.statistics.null_count, metadata.statistics.distinct_count, |
89 | metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value); |
90 | } |
91 | // Default behavior |
92 | return MakeStatistics<DType>( |
93 | descr, metadata.statistics.min, metadata.statistics.max, |
94 | metadata.num_values - metadata.statistics.null_count, |
95 | metadata.statistics.null_count, metadata.statistics.distinct_count, |
96 | metadata.statistics.__isset.max || metadata.statistics.__isset.min); |
97 | } |
98 | |
99 | std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_data, |
100 | const ColumnDescriptor* descr) { |
101 | switch (static_cast<Type::type>(meta_data.type)) { |
102 | case Type::BOOLEAN: |
103 | return MakeTypedColumnStats<BooleanType>(meta_data, descr); |
104 | case Type::INT32: |
105 | return MakeTypedColumnStats<Int32Type>(meta_data, descr); |
106 | case Type::INT64: |
107 | return MakeTypedColumnStats<Int64Type>(meta_data, descr); |
108 | case Type::INT96: |
109 | return MakeTypedColumnStats<Int96Type>(meta_data, descr); |
110 | case Type::DOUBLE: |
111 | return MakeTypedColumnStats<DoubleType>(meta_data, descr); |
112 | case Type::FLOAT: |
113 | return MakeTypedColumnStats<FloatType>(meta_data, descr); |
114 | case Type::BYTE_ARRAY: |
115 | return MakeTypedColumnStats<ByteArrayType>(meta_data, descr); |
116 | case Type::FIXED_LEN_BYTE_ARRAY: |
117 | return MakeTypedColumnStats<FLBAType>(meta_data, descr); |
118 | case Type::UNDEFINED: |
119 | break; |
120 | } |
121 | throw ParquetException("Can't decode page statistics for selected column type" ); |
122 | } |
123 | |
124 | // MetaData Accessor |
125 | // ColumnChunk metadata |
126 | class ColumnChunkMetaData::ColumnChunkMetaDataImpl { |
127 | public: |
128 | explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column, |
129 | const ColumnDescriptor* descr, |
130 | const ApplicationVersion* writer_version) |
131 | : column_(column), descr_(descr), writer_version_(writer_version) { |
132 | const format::ColumnMetaData& meta_data = column->meta_data; |
133 | for (auto encoding : meta_data.encodings) { |
134 | encodings_.push_back(FromThrift(encoding)); |
135 | } |
136 | possible_stats_ = nullptr; |
137 | } |
138 | |
139 | // column chunk |
140 | inline int64_t file_offset() const { return column_->file_offset; } |
141 | inline const std::string& file_path() const { return column_->file_path; } |
142 | |
143 | // column metadata |
144 | inline Type::type type() const { return FromThrift(column_->meta_data.type); } |
145 | |
146 | inline int64_t num_values() const { return column_->meta_data.num_values; } |
147 | |
148 | std::shared_ptr<schema::ColumnPath> path_in_schema() { |
149 | return std::make_shared<schema::ColumnPath>(column_->meta_data.path_in_schema); |
150 | } |
151 | |
152 | // Check if statistics are set and are valid |
153 | // 1) Must be set in the metadata |
154 | // 2) Statistics must not be corrupted |
155 | inline bool is_stats_set() const { |
156 | DCHECK(writer_version_ != nullptr); |
157 | // If the column statistics don't exist or column sort order is unknown |
158 | // we cannot use the column stats |
159 | if (!column_->meta_data.__isset.statistics || |
160 | descr_->sort_order() == SortOrder::UNKNOWN) { |
161 | return false; |
162 | } |
163 | if (possible_stats_ == nullptr) { |
164 | possible_stats_ = MakeColumnStats(column_->meta_data, descr_); |
165 | } |
166 | EncodedStatistics encodedStatistics = possible_stats_->Encode(); |
167 | return writer_version_->HasCorrectStatistics(type(), encodedStatistics, |
168 | descr_->sort_order()); |
169 | } |
170 | |
171 | inline std::shared_ptr<Statistics> statistics() const { |
172 | return is_stats_set() ? possible_stats_ : nullptr; |
173 | } |
174 | |
175 | inline Compression::type compression() const { |
176 | return FromThrift(column_->meta_data.codec); |
177 | } |
178 | |
179 | const std::vector<Encoding::type>& encodings() const { return encodings_; } |
180 | |
181 | inline bool has_dictionary_page() const { |
182 | return column_->meta_data.__isset.dictionary_page_offset; |
183 | } |
184 | |
185 | inline int64_t dictionary_page_offset() const { |
186 | return column_->meta_data.dictionary_page_offset; |
187 | } |
188 | |
189 | inline int64_t data_page_offset() const { return column_->meta_data.data_page_offset; } |
190 | |
191 | inline bool has_index_page() const { |
192 | return column_->meta_data.__isset.index_page_offset; |
193 | } |
194 | |
195 | inline int64_t index_page_offset() const { |
196 | return column_->meta_data.index_page_offset; |
197 | } |
198 | |
199 | inline int64_t total_compressed_size() const { |
200 | return column_->meta_data.total_compressed_size; |
201 | } |
202 | |
203 | inline int64_t total_uncompressed_size() const { |
204 | return column_->meta_data.total_uncompressed_size; |
205 | } |
206 | |
207 | private: |
208 | mutable std::shared_ptr<Statistics> possible_stats_; |
209 | std::vector<Encoding::type> encodings_; |
210 | const format::ColumnChunk* column_; |
211 | const ColumnDescriptor* descr_; |
212 | const ApplicationVersion* writer_version_; |
213 | }; |
214 | |
215 | std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make( |
216 | const void* metadata, const ColumnDescriptor* descr, |
217 | const ApplicationVersion* writer_version) { |
218 | return std::unique_ptr<ColumnChunkMetaData>( |
219 | new ColumnChunkMetaData(metadata, descr, writer_version)); |
220 | } |
221 | |
222 | ColumnChunkMetaData::ColumnChunkMetaData(const void* metadata, |
223 | const ColumnDescriptor* descr, |
224 | const ApplicationVersion* writer_version) |
225 | : impl_{std::unique_ptr<ColumnChunkMetaDataImpl>(new ColumnChunkMetaDataImpl( |
226 | reinterpret_cast<const format::ColumnChunk*>(metadata), descr, |
227 | writer_version))} {} |
228 | ColumnChunkMetaData::~ColumnChunkMetaData() {} |
229 | |
230 | // column chunk |
231 | int64_t ColumnChunkMetaData::file_offset() const { return impl_->file_offset(); } |
232 | |
233 | const std::string& ColumnChunkMetaData::file_path() const { return impl_->file_path(); } |
234 | |
235 | // column metadata |
236 | Type::type ColumnChunkMetaData::type() const { return impl_->type(); } |
237 | |
238 | int64_t ColumnChunkMetaData::num_values() const { return impl_->num_values(); } |
239 | |
240 | std::shared_ptr<schema::ColumnPath> ColumnChunkMetaData::path_in_schema() const { |
241 | return impl_->path_in_schema(); |
242 | } |
243 | |
244 | std::shared_ptr<Statistics> ColumnChunkMetaData::statistics() const { |
245 | return impl_->statistics(); |
246 | } |
247 | |
248 | bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); } |
249 | |
250 | bool ColumnChunkMetaData::has_dictionary_page() const { |
251 | return impl_->has_dictionary_page(); |
252 | } |
253 | |
254 | int64_t ColumnChunkMetaData::dictionary_page_offset() const { |
255 | return impl_->dictionary_page_offset(); |
256 | } |
257 | |
258 | int64_t ColumnChunkMetaData::data_page_offset() const { |
259 | return impl_->data_page_offset(); |
260 | } |
261 | |
262 | bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); } |
263 | |
264 | int64_t ColumnChunkMetaData::index_page_offset() const { |
265 | return impl_->index_page_offset(); |
266 | } |
267 | |
268 | Compression::type ColumnChunkMetaData::compression() const { |
269 | return impl_->compression(); |
270 | } |
271 | |
272 | const std::vector<Encoding::type>& ColumnChunkMetaData::encodings() const { |
273 | return impl_->encodings(); |
274 | } |
275 | |
276 | int64_t ColumnChunkMetaData::total_uncompressed_size() const { |
277 | return impl_->total_uncompressed_size(); |
278 | } |
279 | |
280 | int64_t ColumnChunkMetaData::total_compressed_size() const { |
281 | return impl_->total_compressed_size(); |
282 | } |
283 | |
284 | // row-group metadata |
285 | class RowGroupMetaData::RowGroupMetaDataImpl { |
286 | public: |
287 | explicit RowGroupMetaDataImpl(const format::RowGroup* row_group, |
288 | const SchemaDescriptor* schema, |
289 | const ApplicationVersion* writer_version) |
290 | : row_group_(row_group), schema_(schema), writer_version_(writer_version) {} |
291 | |
292 | inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); } |
293 | |
294 | inline int64_t num_rows() const { return row_group_->num_rows; } |
295 | |
296 | inline int64_t total_byte_size() const { return row_group_->total_byte_size; } |
297 | |
298 | inline const SchemaDescriptor* schema() const { return schema_; } |
299 | |
300 | std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) { |
301 | if (!(i < num_columns())) { |
302 | std::stringstream ss; |
303 | ss << "The file only has " << num_columns() |
304 | << " columns, requested metadata for column: " << i; |
305 | throw ParquetException(ss.str()); |
306 | } |
307 | return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i), |
308 | writer_version_); |
309 | } |
310 | |
311 | private: |
312 | const format::RowGroup* row_group_; |
313 | const SchemaDescriptor* schema_; |
314 | const ApplicationVersion* writer_version_; |
315 | }; |
316 | |
317 | std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make( |
318 | const void* metadata, const SchemaDescriptor* schema, |
319 | const ApplicationVersion* writer_version) { |
320 | return std::unique_ptr<RowGroupMetaData>( |
321 | new RowGroupMetaData(metadata, schema, writer_version)); |
322 | } |
323 | |
324 | RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema, |
325 | const ApplicationVersion* writer_version) |
326 | : impl_{std::unique_ptr<RowGroupMetaDataImpl>(new RowGroupMetaDataImpl( |
327 | reinterpret_cast<const format::RowGroup*>(metadata), schema, writer_version))} { |
328 | } |
329 | RowGroupMetaData::~RowGroupMetaData() {} |
330 | |
331 | int RowGroupMetaData::num_columns() const { return impl_->num_columns(); } |
332 | |
333 | int64_t RowGroupMetaData::num_rows() const { return impl_->num_rows(); } |
334 | |
335 | int64_t RowGroupMetaData::total_byte_size() const { return impl_->total_byte_size(); } |
336 | |
337 | const SchemaDescriptor* RowGroupMetaData::schema() const { return impl_->schema(); } |
338 | |
339 | std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int i) const { |
340 | return impl_->ColumnChunk(i); |
341 | } |
342 | |
343 | // file metadata |
344 | class FileMetaData::FileMetaDataImpl { |
345 | public: |
346 | FileMetaDataImpl() : metadata_len_(0) {} |
347 | |
348 | explicit FileMetaDataImpl(const void* metadata, uint32_t* metadata_len) |
349 | : metadata_len_(0) { |
350 | metadata_.reset(new format::FileMetaData); |
351 | DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len, |
352 | metadata_.get()); |
353 | metadata_len_ = *metadata_len; |
354 | |
355 | if (metadata_->__isset.created_by) { |
356 | writer_version_ = ApplicationVersion(metadata_->created_by); |
357 | } else { |
358 | writer_version_ = ApplicationVersion("unknown 0.0.0" ); |
359 | } |
360 | |
361 | InitSchema(); |
362 | InitColumnOrders(); |
363 | InitKeyValueMetadata(); |
364 | } |
365 | |
366 | inline uint32_t size() const { return metadata_len_; } |
367 | inline int num_columns() const { return schema_.num_columns(); } |
368 | inline int64_t num_rows() const { return metadata_->num_rows; } |
369 | inline int num_row_groups() const { |
370 | return static_cast<int>(metadata_->row_groups.size()); |
371 | } |
372 | inline int32_t version() const { return metadata_->version; } |
373 | inline const std::string& created_by() const { return metadata_->created_by; } |
374 | inline int num_schema_elements() const { |
375 | return static_cast<int>(metadata_->schema.size()); |
376 | } |
377 | |
378 | const ApplicationVersion& writer_version() const { return writer_version_; } |
379 | |
380 | void WriteTo(::arrow::io::OutputStream* dst) const { |
381 | ThriftSerializer serializer; |
382 | serializer.Serialize(metadata_.get(), dst); |
383 | } |
384 | |
385 | std::unique_ptr<RowGroupMetaData> RowGroup(int i) { |
386 | if (!(i < num_row_groups())) { |
387 | std::stringstream ss; |
388 | ss << "The file only has " << num_row_groups() |
389 | << " row groups, requested metadata for row group: " << i; |
390 | throw ParquetException(ss.str()); |
391 | } |
392 | return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_); |
393 | } |
394 | |
395 | const SchemaDescriptor* schema() const { return &schema_; } |
396 | |
397 | std::shared_ptr<const KeyValueMetadata> key_value_metadata() const { |
398 | return key_value_metadata_; |
399 | } |
400 | |
401 | void set_file_path(const std::string& path) { |
402 | for (format::RowGroup& row_group : metadata_->row_groups) { |
403 | for (format::ColumnChunk& chunk : row_group.columns) { |
404 | chunk.__set_file_path(path); |
405 | } |
406 | } |
407 | } |
408 | |
409 | format::RowGroup& row_group(int i) { |
410 | DCHECK_LT(i, num_row_groups()); |
411 | return metadata_->row_groups[i]; |
412 | } |
413 | |
414 | void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) { |
415 | format::RowGroup other_rg; |
416 | for (int i = 0; i < other->num_row_groups(); i++) { |
417 | other_rg = other->row_group(i); |
418 | metadata_->row_groups.push_back(other_rg); |
419 | metadata_->num_rows += other_rg.num_rows; |
420 | } |
421 | } |
422 | |
423 | private: |
424 | friend FileMetaDataBuilder; |
425 | uint32_t metadata_len_; |
426 | std::unique_ptr<format::FileMetaData> metadata_; |
427 | void InitSchema() { |
428 | schema::FlatSchemaConverter converter(&metadata_->schema[0], |
429 | static_cast<int>(metadata_->schema.size())); |
430 | schema_.Init(converter.Convert()); |
431 | } |
432 | void InitColumnOrders() { |
433 | // update ColumnOrder |
434 | std::vector<parquet::ColumnOrder> column_orders; |
435 | if (metadata_->__isset.column_orders) { |
436 | for (auto column_order : metadata_->column_orders) { |
437 | if (column_order.__isset.TYPE_ORDER) { |
438 | column_orders.push_back(ColumnOrder::type_defined_); |
439 | } else { |
440 | column_orders.push_back(ColumnOrder::undefined_); |
441 | } |
442 | } |
443 | } else { |
444 | column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_); |
445 | } |
446 | |
447 | schema_.updateColumnOrders(column_orders); |
448 | } |
449 | SchemaDescriptor schema_; |
450 | ApplicationVersion writer_version_; |
451 | |
452 | void InitKeyValueMetadata() { |
453 | std::shared_ptr<KeyValueMetadata> metadata = nullptr; |
454 | if (metadata_->__isset.key_value_metadata) { |
455 | metadata = std::make_shared<KeyValueMetadata>(); |
456 | for (const auto& it : metadata_->key_value_metadata) { |
457 | metadata->Append(it.key, it.value); |
458 | } |
459 | } |
460 | key_value_metadata_ = metadata; |
461 | } |
462 | |
463 | std::shared_ptr<const KeyValueMetadata> key_value_metadata_; |
464 | }; |
465 | |
466 | std::shared_ptr<FileMetaData> FileMetaData::Make(const void* metadata, |
467 | uint32_t* metadata_len) { |
468 | // This FileMetaData ctor is private, not compatible with std::make_shared |
469 | return std::shared_ptr<FileMetaData>(new FileMetaData(metadata, metadata_len)); |
470 | } |
471 | |
472 | FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len) |
473 | : impl_{std::unique_ptr<FileMetaDataImpl>( |
474 | new FileMetaDataImpl(metadata, metadata_len))} {} |
475 | |
476 | FileMetaData::FileMetaData() |
477 | : impl_{std::unique_ptr<FileMetaDataImpl>(new FileMetaDataImpl())} {} |
478 | |
479 | FileMetaData::~FileMetaData() {} |
480 | |
481 | std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i) const { |
482 | return impl_->RowGroup(i); |
483 | } |
484 | |
485 | uint32_t FileMetaData::size() const { return impl_->size(); } |
486 | |
487 | int FileMetaData::num_columns() const { return impl_->num_columns(); } |
488 | |
489 | int64_t FileMetaData::num_rows() const { return impl_->num_rows(); } |
490 | |
491 | int FileMetaData::num_row_groups() const { return impl_->num_row_groups(); } |
492 | |
493 | ParquetVersion::type FileMetaData::version() const { |
494 | switch (impl_->version()) { |
495 | case 1: |
496 | return ParquetVersion::PARQUET_1_0; |
497 | case 2: |
498 | return ParquetVersion::PARQUET_2_0; |
499 | default: |
500 | // Improperly set version, assuming Parquet 1.0 |
501 | break; |
502 | } |
503 | return ParquetVersion::PARQUET_1_0; |
504 | } |
505 | |
506 | const ApplicationVersion& FileMetaData::writer_version() const { |
507 | return impl_->writer_version(); |
508 | } |
509 | |
510 | const std::string& FileMetaData::created_by() const { return impl_->created_by(); } |
511 | |
512 | int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); } |
513 | |
514 | const SchemaDescriptor* FileMetaData::schema() const { return impl_->schema(); } |
515 | |
516 | std::shared_ptr<const KeyValueMetadata> FileMetaData::key_value_metadata() const { |
517 | return impl_->key_value_metadata(); |
518 | } |
519 | |
520 | void FileMetaData::set_file_path(const std::string& path) { impl_->set_file_path(path); } |
521 | |
522 | void FileMetaData::AppendRowGroups(const FileMetaData& other) { |
523 | impl_->AppendRowGroups(other.impl_); |
524 | } |
525 | |
526 | void FileMetaData::WriteTo(::arrow::io::OutputStream* dst) const { |
527 | return impl_->WriteTo(dst); |
528 | } |
529 | |
530 | ApplicationVersion::ApplicationVersion(const std::string& application, int major, |
531 | int minor, int patch) |
532 | : application_(application), version{major, minor, patch, "" , "" , "" } {} |
533 | |
534 | ApplicationVersion::ApplicationVersion(const std::string& created_by) { |
535 | regex app_regex{ApplicationVersion::APPLICATION_FORMAT}; |
536 | regex ver_regex{ApplicationVersion::VERSION_FORMAT}; |
537 | smatch app_matches; |
538 | smatch ver_matches; |
539 | |
540 | std::string created_by_lower = created_by; |
541 | std::transform(created_by_lower.begin(), created_by_lower.end(), |
542 | created_by_lower.begin(), ::tolower); |
543 | |
544 | bool app_success = regex_match(created_by_lower, app_matches, app_regex); |
545 | bool ver_success = false; |
546 | std::string version_str; |
547 | |
548 | if (app_success && app_matches.size() >= 4) { |
549 | // first match is the entire string. sub-matches start from second. |
550 | application_ = app_matches[1]; |
551 | version_str = app_matches[3]; |
552 | build_ = app_matches[4]; |
553 | ver_success = regex_match(version_str, ver_matches, ver_regex); |
554 | } else { |
555 | application_ = "unknown" ; |
556 | } |
557 | |
558 | if (ver_success && ver_matches.size() >= 7) { |
559 | version.major = atoi(ver_matches[1].str().c_str()); |
560 | version.minor = atoi(ver_matches[2].str().c_str()); |
561 | version.patch = atoi(ver_matches[3].str().c_str()); |
562 | version.unknown = ver_matches[4].str(); |
563 | version.pre_release = ver_matches[5].str(); |
564 | version.build_info = ver_matches[6].str(); |
565 | } else { |
566 | version.major = 0; |
567 | version.minor = 0; |
568 | version.patch = 0; |
569 | } |
570 | } |
571 | |
572 | bool ApplicationVersion::VersionLt(const ApplicationVersion& other_version) const { |
573 | if (application_ != other_version.application_) return false; |
574 | |
575 | if (version.major < other_version.version.major) return true; |
576 | if (version.major > other_version.version.major) return false; |
577 | DCHECK_EQ(version.major, other_version.version.major); |
578 | if (version.minor < other_version.version.minor) return true; |
579 | if (version.minor > other_version.version.minor) return false; |
580 | DCHECK_EQ(version.minor, other_version.version.minor); |
581 | return version.patch < other_version.version.patch; |
582 | } |
583 | |
584 | bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) const { |
585 | return application_ == other_version.application_ && |
586 | version.major == other_version.version.major && |
587 | version.minor == other_version.version.minor && |
588 | version.patch == other_version.version.patch; |
589 | } |
590 | |
591 | // Reference: |
592 | // parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java |
593 | // PARQUET-686 has more disussion on statistics |
594 | bool ApplicationVersion::HasCorrectStatistics(Type::type col_type, |
595 | EncodedStatistics& statistics, |
596 | SortOrder::type sort_order) const { |
597 | // parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed |
598 | // correctly for all types |
599 | if ((application_ == "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) || |
600 | (application_ == "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) { |
601 | // Only SIGNED are valid unless max and min are the same |
602 | // (in which case the sort order does not matter) |
603 | bool max_equals_min = statistics.has_min && statistics.has_max |
604 | ? statistics.min() == statistics.max() |
605 | : false; |
606 | if (SortOrder::SIGNED != sort_order && !max_equals_min) { |
607 | return false; |
608 | } |
609 | |
610 | // Statistics of other types are OK |
611 | if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) { |
612 | return true; |
613 | } |
614 | } |
615 | // created_by is not populated, which could have been caused by |
616 | // parquet-mr during the same time as PARQUET-251, see PARQUET-297 |
617 | if (application_ == "unknown" ) { |
618 | return true; |
619 | } |
620 | |
621 | // Unknown sort order has incorrect stats |
622 | if (SortOrder::UNKNOWN == sort_order) { |
623 | return false; |
624 | } |
625 | |
626 | // PARQUET-251 |
627 | if (VersionLt(PARQUET_251_FIXED_VERSION())) { |
628 | return false; |
629 | } |
630 | |
631 | return true; |
632 | } |
633 | |
634 | // MetaData Builders |
635 | // row-group metadata |
636 | class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { |
637 | public: |
638 | explicit ColumnChunkMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props, |
639 | const ColumnDescriptor* column) |
640 | : owned_column_chunk_(new format::ColumnChunk), |
641 | properties_(props), |
642 | column_(column) { |
643 | Init(owned_column_chunk_.get()); |
644 | } |
645 | |
646 | explicit ColumnChunkMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props, |
647 | const ColumnDescriptor* column, |
648 | format::ColumnChunk* column_chunk) |
649 | : properties_(props), column_(column) { |
650 | Init(column_chunk); |
651 | } |
652 | |
653 | const void* contents() const { return column_chunk_; } |
654 | |
655 | // column chunk |
656 | void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); } |
657 | |
658 | // column metadata |
659 | void SetStatistics(const EncodedStatistics& val) { |
660 | column_chunk_->meta_data.__set_statistics(ToThrift(val)); |
661 | } |
662 | |
663 | void Finish(int64_t num_values, int64_t dictionary_page_offset, |
664 | int64_t index_page_offset, int64_t data_page_offset, |
665 | int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, |
666 | bool dictionary_fallback) { |
667 | if (dictionary_page_offset > 0) { |
668 | column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset); |
669 | column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size); |
670 | } else { |
671 | column_chunk_->__set_file_offset(data_page_offset + compressed_size); |
672 | } |
673 | column_chunk_->__isset.meta_data = true; |
674 | column_chunk_->meta_data.__set_num_values(num_values); |
675 | if (index_page_offset >= 0) { |
676 | column_chunk_->meta_data.__set_index_page_offset(index_page_offset); |
677 | } |
678 | column_chunk_->meta_data.__set_data_page_offset(data_page_offset); |
679 | column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size); |
680 | column_chunk_->meta_data.__set_total_compressed_size(compressed_size); |
681 | std::vector<format::Encoding::type> thrift_encodings; |
682 | if (has_dictionary) { |
683 | thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); |
684 | if (properties_->version() == ParquetVersion::PARQUET_1_0) { |
685 | thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); |
686 | } else { |
687 | thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding())); |
688 | } |
689 | } else { // Dictionary not enabled |
690 | thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path()))); |
691 | } |
692 | thrift_encodings.push_back(ToThrift(Encoding::RLE)); |
693 | // Only PLAIN encoding is supported for fallback in V1 |
694 | // TODO(majetideepak): Use user specified encoding for V2 |
695 | if (dictionary_fallback) { |
696 | thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); |
697 | } |
698 | column_chunk_->meta_data.__set_encodings(thrift_encodings); |
699 | } |
700 | |
701 | void WriteTo(::arrow::io::OutputStream* sink) { |
702 | ThriftSerializer serializer; |
703 | serializer.Serialize(column_chunk_, sink); |
704 | } |
705 | |
706 | const ColumnDescriptor* descr() const { return column_; } |
707 | |
708 | private: |
709 | void Init(format::ColumnChunk* column_chunk) { |
710 | column_chunk_ = column_chunk; |
711 | column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type())); |
712 | column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector()); |
713 | column_chunk_->meta_data.__set_codec( |
714 | ToThrift(properties_->compression(column_->path()))); |
715 | } |
716 | |
717 | format::ColumnChunk* column_chunk_; |
718 | std::unique_ptr<format::ColumnChunk> owned_column_chunk_; |
719 | const std::shared_ptr<WriterProperties> properties_; |
720 | const ColumnDescriptor* column_; |
721 | }; |
722 | |
723 | std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make( |
724 | const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column, |
725 | void* contents) { |
726 | return std::unique_ptr<ColumnChunkMetaDataBuilder>( |
727 | new ColumnChunkMetaDataBuilder(props, column, contents)); |
728 | } |
729 | |
730 | std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make( |
731 | const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column) { |
732 | return std::unique_ptr<ColumnChunkMetaDataBuilder>( |
733 | new ColumnChunkMetaDataBuilder(props, column)); |
734 | } |
735 | |
736 | ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( |
737 | const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column) |
738 | : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>( |
739 | new ColumnChunkMetaDataBuilderImpl(props, column))} {} |
740 | |
741 | ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( |
742 | const std::shared_ptr<WriterProperties>& props, const ColumnDescriptor* column, |
743 | void* contents) |
744 | : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>( |
745 | new ColumnChunkMetaDataBuilderImpl( |
746 | props, column, reinterpret_cast<format::ColumnChunk*>(contents)))} {} |
747 | |
748 | ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() {} |
749 | |
750 | const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); } |
751 | |
752 | void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) { |
753 | impl_->set_file_path(path); |
754 | } |
755 | |
756 | void ColumnChunkMetaDataBuilder::Finish(int64_t num_values, |
757 | int64_t dictionary_page_offset, |
758 | int64_t index_page_offset, |
759 | int64_t data_page_offset, int64_t compressed_size, |
760 | int64_t uncompressed_size, bool has_dictionary, |
761 | bool dictionary_fallback) { |
762 | impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset, |
763 | compressed_size, uncompressed_size, has_dictionary, dictionary_fallback); |
764 | } |
765 | |
766 | void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream* sink) { |
767 | impl_->WriteTo(sink); |
768 | } |
769 | |
770 | const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { |
771 | return impl_->descr(); |
772 | } |
773 | |
774 | void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) { |
775 | impl_->SetStatistics(result); |
776 | } |
777 | |
778 | class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { |
779 | public: |
780 | explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props, |
781 | const SchemaDescriptor* schema, void* contents) |
782 | : properties_(props), schema_(schema), next_column_(0) { |
783 | row_group_ = reinterpret_cast<format::RowGroup*>(contents); |
784 | InitializeColumns(schema->num_columns()); |
785 | } |
786 | |
787 | ColumnChunkMetaDataBuilder* NextColumnChunk() { |
788 | if (!(next_column_ < num_columns())) { |
789 | std::stringstream ss; |
790 | ss << "The schema only has " << num_columns() |
791 | << " columns, requested metadata for column: " << next_column_; |
792 | throw ParquetException(ss.str()); |
793 | } |
794 | auto column = schema_->Column(next_column_); |
795 | auto column_builder = ColumnChunkMetaDataBuilder::Make( |
796 | properties_, column, &row_group_->columns[next_column_++]); |
797 | auto column_builder_ptr = column_builder.get(); |
798 | column_builders_.push_back(std::move(column_builder)); |
799 | return column_builder_ptr; |
800 | } |
801 | |
802 | int current_column() { return next_column_ - 1; } |
803 | |
804 | void Finish(int64_t total_bytes_written) { |
805 | if (!(next_column_ == schema_->num_columns())) { |
806 | std::stringstream ss; |
807 | ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns() |
808 | << " columns are initialized" ; |
809 | throw ParquetException(ss.str()); |
810 | } |
811 | int64_t total_byte_size = 0; |
812 | |
813 | for (int i = 0; i < schema_->num_columns(); i++) { |
814 | if (!(row_group_->columns[i].file_offset >= 0)) { |
815 | std::stringstream ss; |
816 | ss << "Column " << i << " is not complete." ; |
817 | throw ParquetException(ss.str()); |
818 | } |
819 | total_byte_size += row_group_->columns[i].meta_data.total_compressed_size; |
820 | } |
821 | DCHECK(total_bytes_written == total_byte_size) |
822 | << "Total bytes in this RowGroup does not match with compressed sizes of columns" ; |
823 | |
824 | row_group_->__set_total_byte_size(total_byte_size); |
825 | } |
826 | |
827 | void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; } |
828 | |
829 | int num_columns() { return static_cast<int>(row_group_->columns.size()); } |
830 | |
831 | int64_t num_rows() { return row_group_->num_rows; } |
832 | |
833 | private: |
834 | void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); } |
835 | |
836 | format::RowGroup* row_group_; |
837 | const std::shared_ptr<WriterProperties> properties_; |
838 | const SchemaDescriptor* schema_; |
839 | std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder>> column_builders_; |
840 | int next_column_; |
841 | }; |
842 | |
843 | std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make( |
844 | const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_, |
845 | void* contents) { |
846 | return std::unique_ptr<RowGroupMetaDataBuilder>( |
847 | new RowGroupMetaDataBuilder(props, schema_, contents)); |
848 | } |
849 | |
850 | RowGroupMetaDataBuilder::RowGroupMetaDataBuilder( |
851 | const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_, |
852 | void* contents) |
853 | : impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>( |
854 | new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {} |
855 | |
856 | RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {} |
857 | |
858 | ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { |
859 | return impl_->NextColumnChunk(); |
860 | } |
861 | |
862 | int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); } |
863 | |
864 | int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); } |
865 | |
866 | int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); } |
867 | |
868 | void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) { |
869 | impl_->set_num_rows(num_rows); |
870 | } |
871 | |
872 | void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) { |
873 | impl_->Finish(total_bytes_written); |
874 | } |
875 | |
876 | // file metadata |
877 | // TODO(PARQUET-595) Support key_value_metadata |
878 | class FileMetaDataBuilder::FileMetaDataBuilderImpl { |
879 | public: |
880 | explicit FileMetaDataBuilderImpl( |
881 | const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props, |
882 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) |
883 | : properties_(props), schema_(schema), key_value_metadata_(key_value_metadata) { |
884 | metadata_.reset(new format::FileMetaData()); |
885 | } |
886 | |
887 | RowGroupMetaDataBuilder* AppendRowGroup() { |
888 | row_groups_.emplace_back(); |
889 | current_row_group_builder_ = |
890 | RowGroupMetaDataBuilder::Make(properties_, schema_, &row_groups_.back()); |
891 | return current_row_group_builder_.get(); |
892 | } |
893 | |
894 | std::unique_ptr<FileMetaData> Finish() { |
895 | int64_t total_rows = 0; |
896 | for (auto row_group : row_groups_) { |
897 | total_rows += row_group.num_rows; |
898 | } |
899 | metadata_->__set_num_rows(total_rows); |
900 | metadata_->__set_row_groups(row_groups_); |
901 | |
902 | if (key_value_metadata_) { |
903 | metadata_->key_value_metadata.clear(); |
904 | metadata_->key_value_metadata.reserve(key_value_metadata_->size()); |
905 | for (int64_t i = 0; i < key_value_metadata_->size(); ++i) { |
906 | format::KeyValue kv_pair; |
907 | kv_pair.__set_key(key_value_metadata_->key(i)); |
908 | kv_pair.__set_value(key_value_metadata_->value(i)); |
909 | metadata_->key_value_metadata.push_back(kv_pair); |
910 | } |
911 | metadata_->__isset.key_value_metadata = true; |
912 | } |
913 | |
914 | int32_t file_version = 0; |
915 | switch (properties_->version()) { |
916 | case ParquetVersion::PARQUET_1_0: |
917 | file_version = 1; |
918 | break; |
919 | case ParquetVersion::PARQUET_2_0: |
920 | file_version = 2; |
921 | break; |
922 | default: |
923 | break; |
924 | } |
925 | metadata_->__set_version(file_version); |
926 | metadata_->__set_created_by(properties_->created_by()); |
927 | |
928 | // Users cannot set the `ColumnOrder` since we donot not have user defined sort order |
929 | // in the spec yet. |
930 | // We always default to `TYPE_DEFINED_ORDER`. We can expose it in |
931 | // the API once we have user defined sort orders in the Parquet format. |
932 | // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType |
933 | format::TypeDefinedOrder type_defined_order; |
934 | format::ColumnOrder column_order; |
935 | column_order.__set_TYPE_ORDER(type_defined_order); |
936 | column_order.__isset.TYPE_ORDER = true; |
937 | metadata_->column_orders.resize(schema_->num_columns(), column_order); |
938 | metadata_->__isset.column_orders = true; |
939 | |
940 | parquet::schema::SchemaFlattener flattener( |
941 | static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()), |
942 | &metadata_->schema); |
943 | flattener.Flatten(); |
944 | auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData()); |
945 | file_meta_data->impl_->metadata_ = std::move(metadata_); |
946 | file_meta_data->impl_->InitSchema(); |
947 | return file_meta_data; |
948 | } |
949 | |
950 | protected: |
951 | std::unique_ptr<format::FileMetaData> metadata_; |
952 | |
953 | private: |
954 | const std::shared_ptr<WriterProperties> properties_; |
955 | std::vector<format::RowGroup> row_groups_; |
956 | |
957 | std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_; |
958 | const SchemaDescriptor* schema_; |
959 | std::shared_ptr<const KeyValueMetadata> key_value_metadata_; |
960 | }; |
961 | |
962 | std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make( |
963 | const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props, |
964 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
965 | return std::unique_ptr<FileMetaDataBuilder>( |
966 | new FileMetaDataBuilder(schema, props, key_value_metadata)); |
967 | } |
968 | |
969 | FileMetaDataBuilder::FileMetaDataBuilder( |
970 | const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props, |
971 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) |
972 | : impl_{std::unique_ptr<FileMetaDataBuilderImpl>( |
973 | new FileMetaDataBuilderImpl(schema, props, key_value_metadata))} {} |
974 | |
975 | FileMetaDataBuilder::~FileMetaDataBuilder() {} |
976 | |
977 | RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { |
978 | return impl_->AppendRowGroup(); |
979 | } |
980 | |
981 | std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); } |
982 | |
983 | } // namespace parquet |
984 | |