| 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | #include "parquet/file_reader.h" |
| 19 | |
| 20 | #include <algorithm> |
| 21 | #include <cstdint> |
| 22 | #include <cstring> |
| 23 | #include <memory> |
| 24 | #include <ostream> |
| 25 | #include <string> |
| 26 | #include <utility> |
| 27 | |
| 28 | #include "arrow/io/file.h" |
| 29 | #include "arrow/util/logging.h" |
| 30 | #include "arrow/util/ubsan.h" |
| 31 | |
| 32 | #include "parquet/column_reader.h" |
| 33 | #include "parquet/column_scanner.h" |
| 34 | #include "parquet/deprecated_io.h" |
| 35 | #include "parquet/exception.h" |
| 36 | #include "parquet/metadata.h" |
| 37 | #include "parquet/platform.h" |
| 38 | #include "parquet/properties.h" |
| 39 | #include "parquet/schema.h" |
| 40 | #include "parquet/types.h" |
| 41 | |
| 42 | namespace parquet { |
| 43 | |
| 44 | // PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file |
| 45 | static constexpr int64_t = 64 * 1024; |
| 46 | static constexpr uint32_t = 8; |
| 47 | static constexpr uint8_t kParquetMagic[4] = {'P', 'A', 'R', '1'}; |
| 48 | |
| 49 | // For PARQUET-816 |
| 50 | static constexpr int64_t = 100; |
| 51 | |
| 52 | // ---------------------------------------------------------------------- |
| 53 | // RowGroupReader public API |
| 54 | |
| 55 | RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents) |
| 56 | : contents_(std::move(contents)) {} |
| 57 | |
| 58 | std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) { |
| 59 | DCHECK(i < metadata()->num_columns()) |
| 60 | << "The RowGroup only has " << metadata()->num_columns() |
| 61 | << "columns, requested column: " << i; |
| 62 | const ColumnDescriptor* descr = metadata()->schema()->Column(i); |
| 63 | |
| 64 | std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i); |
| 65 | return ColumnReader::Make( |
| 66 | descr, std::move(page_reader), |
| 67 | const_cast<ReaderProperties*>(contents_->properties())->memory_pool()); |
| 68 | } |
| 69 | |
| 70 | std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) { |
| 71 | DCHECK(i < metadata()->num_columns()) |
| 72 | << "The RowGroup only has " << metadata()->num_columns() |
| 73 | << "columns, requested column: " << i; |
| 74 | return contents_->GetColumnPageReader(i); |
| 75 | } |
| 76 | |
| 77 | // Returns the rowgroup metadata |
| 78 | const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); } |
| 79 | |
| 80 | // RowGroupReader::Contents implementation for the Parquet file specification |
| 81 | class SerializedRowGroup : public RowGroupReader::Contents { |
| 82 | public: |
| 83 | SerializedRowGroup(const std::shared_ptr<ArrowInputFile>& source, |
| 84 | FileMetaData* file_metadata, int row_group_number, |
| 85 | const ReaderProperties& props) |
| 86 | : source_(source), file_metadata_(file_metadata), properties_(props) { |
| 87 | row_group_metadata_ = file_metadata->RowGroup(row_group_number); |
| 88 | } |
| 89 | |
| 90 | const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); } |
| 91 | |
| 92 | const ReaderProperties* properties() const override { return &properties_; } |
| 93 | |
| 94 | std::unique_ptr<PageReader> GetColumnPageReader(int i) override { |
| 95 | // Read column chunk from the file |
| 96 | auto col = row_group_metadata_->ColumnChunk(i); |
| 97 | |
| 98 | int64_t col_start = col->data_page_offset(); |
| 99 | if (col->has_dictionary_page() && col->dictionary_page_offset() > 0 && |
| 100 | col_start > col->dictionary_page_offset()) { |
| 101 | col_start = col->dictionary_page_offset(); |
| 102 | } |
| 103 | |
| 104 | int64_t col_length = col->total_compressed_size(); |
| 105 | |
| 106 | // PARQUET-816 workaround for old files created by older parquet-mr |
| 107 | const ApplicationVersion& version = file_metadata_->writer_version(); |
| 108 | if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) { |
| 109 | // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the |
| 110 | // dictionary page header size in total_compressed_size and total_uncompressed_size |
| 111 | // (see IMPALA-694). We add padding to compensate. |
| 112 | int64_t size = -1; |
| 113 | PARQUET_THROW_NOT_OK(source_->GetSize(&size)); |
| 114 | int64_t bytes_remaining = size - (col_start + col_length); |
| 115 | int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining); |
| 116 | col_length += padding; |
| 117 | } |
| 118 | |
| 119 | std::shared_ptr<ArrowInputStream> stream = |
| 120 | properties_.GetStream(source_, col_start, col_length); |
| 121 | return PageReader::Open(stream, col->num_values(), col->compression(), |
| 122 | properties_.memory_pool()); |
| 123 | } |
| 124 | |
| 125 | private: |
| 126 | std::shared_ptr<ArrowInputFile> source_; |
| 127 | FileMetaData* file_metadata_; |
| 128 | std::unique_ptr<RowGroupMetaData> row_group_metadata_; |
| 129 | ReaderProperties properties_; |
| 130 | }; |
| 131 | |
| 132 | // ---------------------------------------------------------------------- |
| 133 | // SerializedFile: An implementation of ParquetFileReader::Contents that deals |
| 134 | // with the Parquet file structure, Thrift deserialization, and other internal |
| 135 | // matters |
| 136 | |
| 137 | // This class takes ownership of the provided data source |
| 138 | class SerializedFile : public ParquetFileReader::Contents { |
| 139 | public: |
| 140 | SerializedFile(const std::shared_ptr<ArrowInputFile>& source, |
| 141 | const ReaderProperties& props = default_reader_properties()) |
| 142 | : source_(source), properties_(props) {} |
| 143 | |
| 144 | void Close() override {} |
| 145 | |
| 146 | std::shared_ptr<RowGroupReader> GetRowGroup(int i) override { |
| 147 | std::unique_ptr<SerializedRowGroup> contents( |
| 148 | new SerializedRowGroup(source_, file_metadata_.get(), i, properties_)); |
| 149 | return std::make_shared<RowGroupReader>(std::move(contents)); |
| 150 | } |
| 151 | |
| 152 | std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; } |
| 153 | |
| 154 | void set_metadata(const std::shared_ptr<FileMetaData>& metadata) { |
| 155 | file_metadata_ = metadata; |
| 156 | } |
| 157 | |
| 158 | void ParseMetaData() { |
| 159 | int64_t file_size = -1; |
| 160 | PARQUET_THROW_NOT_OK(source_->GetSize(&file_size)); |
| 161 | |
| 162 | if (file_size == 0) { |
| 163 | throw ParquetException("Invalid Parquet file size is 0 bytes" ); |
| 164 | } else if (file_size < kFooterSize) { |
| 165 | std::stringstream ss; |
| 166 | ss << "Invalid Parquet file size is " << file_size |
| 167 | << " bytes, smaller than standard file footer (" << kFooterSize << " bytes)" ; |
| 168 | throw ParquetException(ss.str()); |
| 169 | } |
| 170 | |
| 171 | std::shared_ptr<Buffer> ; |
| 172 | int64_t = std::min(file_size, kDefaultFooterReadSize); |
| 173 | PARQUET_THROW_NOT_OK( |
| 174 | source_->ReadAt(file_size - footer_read_size, footer_read_size, &footer_buffer)); |
| 175 | |
| 176 | // Check if all bytes are read. Check if last 4 bytes read have the magic bits |
| 177 | if (footer_buffer->size() != footer_read_size || |
| 178 | memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0) { |
| 179 | throw ParquetException("Invalid parquet file. Corrupt footer." ); |
| 180 | } |
| 181 | |
| 182 | uint32_t metadata_len = arrow::util::SafeLoadAs<uint32_t>( |
| 183 | reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size - |
| 184 | kFooterSize); |
| 185 | int64_t metadata_start = file_size - kFooterSize - metadata_len; |
| 186 | if (kFooterSize + metadata_len > file_size) { |
| 187 | throw ParquetException( |
| 188 | "Invalid parquet file. File is less than " |
| 189 | "file metadata size." ); |
| 190 | } |
| 191 | |
| 192 | std::shared_ptr<Buffer> metadata_buffer; |
| 193 | // Check if the footer_buffer contains the entire metadata |
| 194 | if (footer_read_size >= (metadata_len + kFooterSize)) { |
| 195 | metadata_buffer = SliceBuffer( |
| 196 | footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len); |
| 197 | } else { |
| 198 | PARQUET_THROW_NOT_OK( |
| 199 | source_->ReadAt(metadata_start, metadata_len, &metadata_buffer)); |
| 200 | if (metadata_buffer->size() != metadata_len) { |
| 201 | throw ParquetException("Invalid parquet file. Could not read metadata bytes." ); |
| 202 | } |
| 203 | } |
| 204 | file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len); |
| 205 | } |
| 206 | |
| 207 | private: |
| 208 | std::shared_ptr<ArrowInputFile> source_; |
| 209 | std::shared_ptr<FileMetaData> file_metadata_; |
| 210 | ReaderProperties properties_; |
| 211 | }; |
| 212 | |
| 213 | // ---------------------------------------------------------------------- |
| 214 | // ParquetFileReader public API |
| 215 | |
| 216 | ParquetFileReader::ParquetFileReader() {} |
| 217 | |
| 218 | ParquetFileReader::~ParquetFileReader() { |
| 219 | try { |
| 220 | Close(); |
| 221 | } catch (...) { |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | // Open the file. If no metadata is passed, it is parsed from the footer of |
| 226 | // the file |
| 227 | std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open( |
| 228 | const std::shared_ptr<ArrowInputFile>& source, const ReaderProperties& props, |
| 229 | const std::shared_ptr<FileMetaData>& metadata) { |
| 230 | std::unique_ptr<ParquetFileReader::Contents> result(new SerializedFile(source, props)); |
| 231 | |
| 232 | // Access private methods here, but otherwise unavailable |
| 233 | SerializedFile* file = static_cast<SerializedFile*>(result.get()); |
| 234 | |
| 235 | if (metadata == nullptr) { |
| 236 | // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor |
| 237 | file->ParseMetaData(); |
| 238 | } else { |
| 239 | file->set_metadata(metadata); |
| 240 | } |
| 241 | |
| 242 | return result; |
| 243 | } |
| 244 | |
| 245 | std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( |
| 246 | const std::shared_ptr<::arrow::io::RandomAccessFile>& source, |
| 247 | const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) { |
| 248 | auto contents = SerializedFile::Open(source, props, metadata); |
| 249 | std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); |
| 250 | result->Open(std::move(contents)); |
| 251 | return result; |
| 252 | } |
| 253 | |
| 254 | std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( |
| 255 | std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props, |
| 256 | const std::shared_ptr<FileMetaData>& metadata) { |
| 257 | auto wrapper = std::make_shared<ParquetInputWrapper>(std::move(source)); |
| 258 | return Open(wrapper, props, metadata); |
| 259 | } |
| 260 | |
| 261 | std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile( |
| 262 | const std::string& path, bool memory_map, const ReaderProperties& props, |
| 263 | const std::shared_ptr<FileMetaData>& metadata) { |
| 264 | std::shared_ptr<::arrow::io::RandomAccessFile> source; |
| 265 | if (memory_map) { |
| 266 | std::shared_ptr<::arrow::io::MemoryMappedFile> handle; |
| 267 | PARQUET_THROW_NOT_OK( |
| 268 | ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle)); |
| 269 | source = handle; |
| 270 | } else { |
| 271 | std::shared_ptr<::arrow::io::ReadableFile> handle; |
| 272 | PARQUET_THROW_NOT_OK( |
| 273 | ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle)); |
| 274 | source = handle; |
| 275 | } |
| 276 | |
| 277 | return Open(source, props, metadata); |
| 278 | } |
| 279 | |
| 280 | void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) { |
| 281 | contents_ = std::move(contents); |
| 282 | } |
| 283 | |
| 284 | void ParquetFileReader::Close() { |
| 285 | if (contents_) { |
| 286 | contents_->Close(); |
| 287 | } |
| 288 | } |
| 289 | |
| 290 | std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const { |
| 291 | return contents_->metadata(); |
| 292 | } |
| 293 | |
| 294 | std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) { |
| 295 | DCHECK(i < metadata()->num_row_groups()) |
| 296 | << "The file only has " << metadata()->num_row_groups() |
| 297 | << "row groups, requested reader for: " << i; |
| 298 | return contents_->GetRowGroup(i); |
| 299 | } |
| 300 | |
| 301 | // ---------------------------------------------------------------------- |
| 302 | // File metadata helpers |
| 303 | |
| 304 | std::shared_ptr<FileMetaData> ReadMetaData( |
| 305 | const std::shared_ptr<::arrow::io::RandomAccessFile>& source) { |
| 306 | return ParquetFileReader::Open(source)->metadata(); |
| 307 | } |
| 308 | |
| 309 | // ---------------------------------------------------------------------- |
| 310 | // File scanner for performance testing |
| 311 | |
| 312 | int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size, |
| 313 | ParquetFileReader* reader) { |
| 314 | std::vector<int16_t> rep_levels(column_batch_size); |
| 315 | std::vector<int16_t> def_levels(column_batch_size); |
| 316 | |
| 317 | int num_columns = static_cast<int>(columns.size()); |
| 318 | |
| 319 | // columns are not specified explicitly. Add all columns |
| 320 | if (columns.size() == 0) { |
| 321 | num_columns = reader->metadata()->num_columns(); |
| 322 | columns.resize(num_columns); |
| 323 | for (int i = 0; i < num_columns; i++) { |
| 324 | columns[i] = i; |
| 325 | } |
| 326 | } |
| 327 | |
| 328 | std::vector<int64_t> total_rows(num_columns, 0); |
| 329 | |
| 330 | for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) { |
| 331 | auto group_reader = reader->RowGroup(r); |
| 332 | int col = 0; |
| 333 | for (auto i : columns) { |
| 334 | std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i); |
| 335 | size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type()); |
| 336 | std::vector<uint8_t> values(column_batch_size * value_byte_size); |
| 337 | |
| 338 | int64_t values_read = 0; |
| 339 | while (col_reader->HasNext()) { |
| 340 | int64_t levels_read = |
| 341 | ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(), |
| 342 | values.data(), &values_read, col_reader.get()); |
| 343 | if (col_reader->descr()->max_repetition_level() > 0) { |
| 344 | for (int64_t i = 0; i < levels_read; i++) { |
| 345 | if (rep_levels[i] == 0) { |
| 346 | total_rows[col]++; |
| 347 | } |
| 348 | } |
| 349 | } else { |
| 350 | total_rows[col] += levels_read; |
| 351 | } |
| 352 | } |
| 353 | col++; |
| 354 | } |
| 355 | } |
| 356 | |
| 357 | for (int i = 1; i < num_columns; ++i) { |
| 358 | if (total_rows[0] != total_rows[i]) { |
| 359 | throw ParquetException("Parquet error: Total rows among columns do not match" ); |
| 360 | } |
| 361 | } |
| 362 | |
| 363 | return total_rows[0]; |
| 364 | } |
| 365 | |
| 366 | } // namespace parquet |
| 367 | |