1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/file_reader.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstdio>
23#include <memory>
24#include <sstream>
25#include <string>
26#include <utility>
27#include <vector>
28
29#include "arrow/io/file.h"
30#include "arrow/util/logging.h"
31
32#include "parquet/column_page.h"
33#include "parquet/column_reader.h"
34#include "parquet/column_scanner.h"
35#include "parquet/exception.h"
36#include "parquet/metadata.h"
37#include "parquet/properties.h"
38#include "parquet/thrift.h"
39#include "parquet/types.h"
40#include "parquet/util/memory.h"
41
42using std::string;
43
44namespace parquet {
45
46// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
47static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
48static constexpr uint32_t FOOTER_SIZE = 8;
49static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
50
51// For PARQUET-816
52static constexpr int64_t kMaxDictHeaderSize = 100;
53
54// ----------------------------------------------------------------------
55// RowGroupReader public API
56
57RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
58 : contents_(std::move(contents)) {}
59
60std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
61 DCHECK(i < metadata()->num_columns())
62 << "The RowGroup only has " << metadata()->num_columns()
63 << "columns, requested column: " << i;
64 const ColumnDescriptor* descr = metadata()->schema()->Column(i);
65
66 std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
67 return ColumnReader::Make(
68 descr, std::move(page_reader),
69 const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
70}
71
72std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
73 DCHECK(i < metadata()->num_columns())
74 << "The RowGroup only has " << metadata()->num_columns()
75 << "columns, requested column: " << i;
76 return contents_->GetColumnPageReader(i);
77}
78
79// Returns the rowgroup metadata
80const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
81
82// RowGroupReader::Contents implementation for the Parquet file specification
83class SerializedRowGroup : public RowGroupReader::Contents {
84 public:
85 SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
86 int row_group_number, const ReaderProperties& props)
87 : source_(source), file_metadata_(file_metadata), properties_(props) {
88 row_group_metadata_ = file_metadata->RowGroup(row_group_number);
89 }
90
91 const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); }
92
93 const ReaderProperties* properties() const override { return &properties_; }
94
95 std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
96 // Read column chunk from the file
97 auto col = row_group_metadata_->ColumnChunk(i);
98
99 int64_t col_start = col->data_page_offset();
100 if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) {
101 col_start = col->dictionary_page_offset();
102 }
103
104 int64_t col_length = col->total_compressed_size();
105 std::unique_ptr<InputStream> stream;
106
107 // PARQUET-816 workaround for old files created by older parquet-mr
108 const ApplicationVersion& version = file_metadata_->writer_version();
109 if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) {
110 // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
111 // dictionary page header size in total_compressed_size and total_uncompressed_size
112 // (see IMPALA-694). We add padding to compensate.
113 int64_t bytes_remaining = source_->Size() - (col_start + col_length);
114 int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
115 col_length += padding;
116 }
117
118 stream = properties_.GetStream(source_, col_start, col_length);
119
120 return PageReader::Open(std::move(stream), col->num_values(), col->compression(),
121 properties_.memory_pool());
122 }
123
124 private:
125 RandomAccessSource* source_;
126 FileMetaData* file_metadata_;
127 std::unique_ptr<RowGroupMetaData> row_group_metadata_;
128 ReaderProperties properties_;
129};
130
131// ----------------------------------------------------------------------
132// SerializedFile: An implementation of ParquetFileReader::Contents that deals
133// with the Parquet file structure, Thrift deserialization, and other internal
134// matters
135
136// This class takes ownership of the provided data source
137class SerializedFile : public ParquetFileReader::Contents {
138 public:
139 SerializedFile(std::unique_ptr<RandomAccessSource> source,
140 const ReaderProperties& props = default_reader_properties())
141 : source_(std::move(source)), properties_(props) {}
142
143 ~SerializedFile() override {
144 try {
145 Close();
146 } catch (...) {
147 }
148 }
149
150 void Close() override { source_->Close(); }
151
152 std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
153 std::unique_ptr<SerializedRowGroup> contents(
154 new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_));
155 return std::make_shared<RowGroupReader>(std::move(contents));
156 }
157
158 std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; }
159
160 void set_metadata(const std::shared_ptr<FileMetaData>& metadata) {
161 file_metadata_ = metadata;
162 }
163
164 void ParseMetaData() {
165 int64_t file_size = source_->Size();
166
167 if (file_size < FOOTER_SIZE) {
168 throw ParquetException("Corrupted file, smaller than file footer");
169 }
170
171 uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE];
172 int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE);
173 int64_t bytes_read =
174 source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer);
175
176 // Check if all bytes are read. Check if last 4 bytes read have the magic bits
177 if (bytes_read != footer_read_size ||
178 memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
179 throw ParquetException("Invalid parquet file. Corrupt footer.");
180 }
181
182 uint32_t metadata_len =
183 *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
184 int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
185 if (FOOTER_SIZE + metadata_len > file_size) {
186 throw ParquetException(
187 "Invalid parquet file. File is less than "
188 "file metadata size.");
189 }
190
191 std::shared_ptr<ResizableBuffer> metadata_buffer =
192 AllocateBuffer(properties_.memory_pool(), metadata_len);
193
194 // Check if the footer_buffer contains the entire metadata
195 if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
196 memcpy(metadata_buffer->mutable_data(),
197 footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE),
198 metadata_len);
199 } else {
200 bytes_read =
201 source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
202 if (bytes_read != metadata_len) {
203 throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
204 }
205 }
206
207 file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
208 }
209
210 private:
211 std::unique_ptr<RandomAccessSource> source_;
212 std::shared_ptr<FileMetaData> file_metadata_;
213 ReaderProperties properties_;
214};
215
216// ----------------------------------------------------------------------
217// ParquetFileReader public API
218
219ParquetFileReader::ParquetFileReader() {}
220
221ParquetFileReader::~ParquetFileReader() {
222 try {
223 Close();
224 } catch (...) {
225 }
226}
227
228// Open the file. If no metadata is passed, it is parsed from the footer of
229// the file
230std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
231 std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
232 const std::shared_ptr<FileMetaData>& metadata) {
233 std::unique_ptr<ParquetFileReader::Contents> result(
234 new SerializedFile(std::move(source), props));
235
236 // Access private methods here, but otherwise unavailable
237 SerializedFile* file = static_cast<SerializedFile*>(result.get());
238
239 if (metadata == nullptr) {
240 // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
241 file->ParseMetaData();
242 } else {
243 file->set_metadata(metadata);
244 }
245
246 return result;
247}
248
249std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
250 const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
251 const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) {
252 std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source));
253 return Open(std::move(io_wrapper), props, metadata);
254}
255
256std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
257 std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
258 const std::shared_ptr<FileMetaData>& metadata) {
259 auto contents = SerializedFile::Open(std::move(source), props, metadata);
260 std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
261 result->Open(std::move(contents));
262 return result;
263}
264
265std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
266 const std::string& path, bool memory_map, const ReaderProperties& props,
267 const std::shared_ptr<FileMetaData>& metadata) {
268 std::shared_ptr<::arrow::io::ReadableFileInterface> source;
269 if (memory_map) {
270 std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
271 PARQUET_THROW_NOT_OK(
272 ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
273 source = handle;
274 } else {
275 std::shared_ptr<::arrow::io::ReadableFile> handle;
276 PARQUET_THROW_NOT_OK(
277 ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle));
278 source = handle;
279 }
280
281 return Open(source, props, metadata);
282}
283
284void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
285 contents_ = std::move(contents);
286}
287
288void ParquetFileReader::Close() {
289 if (contents_) {
290 contents_->Close();
291 }
292}
293
294std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
295 return contents_->metadata();
296}
297
298std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
299 DCHECK(i < metadata()->num_row_groups())
300 << "The file only has " << metadata()->num_row_groups()
301 << "row groups, requested reader for: " << i;
302 return contents_->GetRowGroup(i);
303}
304
305// ----------------------------------------------------------------------
306// File metadata helpers
307
308std::shared_ptr<FileMetaData> ReadMetaData(
309 const std::shared_ptr<::arrow::io::ReadableFileInterface>& source) {
310 return ParquetFileReader::Open(source)->metadata();
311}
312
313// ----------------------------------------------------------------------
314// File scanner for performance testing
315
316int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
317 ParquetFileReader* reader) {
318 std::vector<int16_t> rep_levels(column_batch_size);
319 std::vector<int16_t> def_levels(column_batch_size);
320
321 int num_columns = static_cast<int>(columns.size());
322
323 // columns are not specified explicitly. Add all columns
324 if (columns.size() == 0) {
325 num_columns = reader->metadata()->num_columns();
326 columns.resize(num_columns);
327 for (int i = 0; i < num_columns; i++) {
328 columns[i] = i;
329 }
330 }
331
332 std::vector<int64_t> total_rows(num_columns, 0);
333
334 for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
335 auto group_reader = reader->RowGroup(r);
336 int col = 0;
337 for (auto i : columns) {
338 std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
339 size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
340 std::vector<uint8_t> values(column_batch_size * value_byte_size);
341
342 int64_t values_read = 0;
343 while (col_reader->HasNext()) {
344 int64_t levels_read =
345 ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
346 values.data(), &values_read, col_reader.get());
347 if (col_reader->descr()->max_repetition_level() > 0) {
348 for (int64_t i = 0; i < levels_read; i++) {
349 if (rep_levels[i] == 0) {
350 total_rows[col]++;
351 }
352 }
353 } else {
354 total_rows[col] += levels_read;
355 }
356 }
357 col++;
358 }
359 }
360
361 for (int i = 1; i < num_columns; ++i) {
362 if (total_rows[0] != total_rows[i]) {
363 throw ParquetException("Parquet error: Total rows among columns do not match");
364 }
365 }
366
367 return total_rows[0];
368}
369
370} // namespace parquet
371