1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/file_reader.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <memory>
24#include <ostream>
25#include <string>
26#include <utility>
27
28#include "arrow/io/file.h"
29#include "arrow/util/logging.h"
30#include "arrow/util/ubsan.h"
31
32#include "parquet/column_reader.h"
33#include "parquet/column_scanner.h"
34#include "parquet/deprecated_io.h"
35#include "parquet/exception.h"
36#include "parquet/metadata.h"
37#include "parquet/platform.h"
38#include "parquet/properties.h"
39#include "parquet/schema.h"
40#include "parquet/types.h"
41
42namespace parquet {
43
44// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
45static constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
46static constexpr uint32_t kFooterSize = 8;
47static constexpr uint8_t kParquetMagic[4] = {'P', 'A', 'R', '1'};
48
49// For PARQUET-816
50static constexpr int64_t kMaxDictHeaderSize = 100;
51
52// ----------------------------------------------------------------------
53// RowGroupReader public API
54
55RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
56 : contents_(std::move(contents)) {}
57
58std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
59 DCHECK(i < metadata()->num_columns())
60 << "The RowGroup only has " << metadata()->num_columns()
61 << "columns, requested column: " << i;
62 const ColumnDescriptor* descr = metadata()->schema()->Column(i);
63
64 std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
65 return ColumnReader::Make(
66 descr, std::move(page_reader),
67 const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
68}
69
70std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
71 DCHECK(i < metadata()->num_columns())
72 << "The RowGroup only has " << metadata()->num_columns()
73 << "columns, requested column: " << i;
74 return contents_->GetColumnPageReader(i);
75}
76
77// Returns the rowgroup metadata
78const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
79
80// RowGroupReader::Contents implementation for the Parquet file specification
81class SerializedRowGroup : public RowGroupReader::Contents {
82 public:
83 SerializedRowGroup(const std::shared_ptr<ArrowInputFile>& source,
84 FileMetaData* file_metadata, int row_group_number,
85 const ReaderProperties& props)
86 : source_(source), file_metadata_(file_metadata), properties_(props) {
87 row_group_metadata_ = file_metadata->RowGroup(row_group_number);
88 }
89
90 const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); }
91
92 const ReaderProperties* properties() const override { return &properties_; }
93
94 std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
95 // Read column chunk from the file
96 auto col = row_group_metadata_->ColumnChunk(i);
97
98 int64_t col_start = col->data_page_offset();
99 if (col->has_dictionary_page() && col->dictionary_page_offset() > 0 &&
100 col_start > col->dictionary_page_offset()) {
101 col_start = col->dictionary_page_offset();
102 }
103
104 int64_t col_length = col->total_compressed_size();
105
106 // PARQUET-816 workaround for old files created by older parquet-mr
107 const ApplicationVersion& version = file_metadata_->writer_version();
108 if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) {
109 // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
110 // dictionary page header size in total_compressed_size and total_uncompressed_size
111 // (see IMPALA-694). We add padding to compensate.
112 int64_t size = -1;
113 PARQUET_THROW_NOT_OK(source_->GetSize(&size));
114 int64_t bytes_remaining = size - (col_start + col_length);
115 int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
116 col_length += padding;
117 }
118
119 std::shared_ptr<ArrowInputStream> stream =
120 properties_.GetStream(source_, col_start, col_length);
121 return PageReader::Open(stream, col->num_values(), col->compression(),
122 properties_.memory_pool());
123 }
124
125 private:
126 std::shared_ptr<ArrowInputFile> source_;
127 FileMetaData* file_metadata_;
128 std::unique_ptr<RowGroupMetaData> row_group_metadata_;
129 ReaderProperties properties_;
130};
131
132// ----------------------------------------------------------------------
133// SerializedFile: An implementation of ParquetFileReader::Contents that deals
134// with the Parquet file structure, Thrift deserialization, and other internal
135// matters
136
137// This class takes ownership of the provided data source
138class SerializedFile : public ParquetFileReader::Contents {
139 public:
140 SerializedFile(const std::shared_ptr<ArrowInputFile>& source,
141 const ReaderProperties& props = default_reader_properties())
142 : source_(source), properties_(props) {}
143
144 void Close() override {}
145
146 std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
147 std::unique_ptr<SerializedRowGroup> contents(
148 new SerializedRowGroup(source_, file_metadata_.get(), i, properties_));
149 return std::make_shared<RowGroupReader>(std::move(contents));
150 }
151
152 std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; }
153
154 void set_metadata(const std::shared_ptr<FileMetaData>& metadata) {
155 file_metadata_ = metadata;
156 }
157
158 void ParseMetaData() {
159 int64_t file_size = -1;
160 PARQUET_THROW_NOT_OK(source_->GetSize(&file_size));
161
162 if (file_size == 0) {
163 throw ParquetException("Invalid Parquet file size is 0 bytes");
164 } else if (file_size < kFooterSize) {
165 std::stringstream ss;
166 ss << "Invalid Parquet file size is " << file_size
167 << " bytes, smaller than standard file footer (" << kFooterSize << " bytes)";
168 throw ParquetException(ss.str());
169 }
170
171 std::shared_ptr<Buffer> footer_buffer;
172 int64_t footer_read_size = std::min(file_size, kDefaultFooterReadSize);
173 PARQUET_THROW_NOT_OK(
174 source_->ReadAt(file_size - footer_read_size, footer_read_size, &footer_buffer));
175
176 // Check if all bytes are read. Check if last 4 bytes read have the magic bits
177 if (footer_buffer->size() != footer_read_size ||
178 memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0) {
179 throw ParquetException("Invalid parquet file. Corrupt footer.");
180 }
181
182 uint32_t metadata_len = arrow::util::SafeLoadAs<uint32_t>(
183 reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size -
184 kFooterSize);
185 int64_t metadata_start = file_size - kFooterSize - metadata_len;
186 if (kFooterSize + metadata_len > file_size) {
187 throw ParquetException(
188 "Invalid parquet file. File is less than "
189 "file metadata size.");
190 }
191
192 std::shared_ptr<Buffer> metadata_buffer;
193 // Check if the footer_buffer contains the entire metadata
194 if (footer_read_size >= (metadata_len + kFooterSize)) {
195 metadata_buffer = SliceBuffer(
196 footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len);
197 } else {
198 PARQUET_THROW_NOT_OK(
199 source_->ReadAt(metadata_start, metadata_len, &metadata_buffer));
200 if (metadata_buffer->size() != metadata_len) {
201 throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
202 }
203 }
204 file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
205 }
206
207 private:
208 std::shared_ptr<ArrowInputFile> source_;
209 std::shared_ptr<FileMetaData> file_metadata_;
210 ReaderProperties properties_;
211};
212
213// ----------------------------------------------------------------------
214// ParquetFileReader public API
215
216ParquetFileReader::ParquetFileReader() {}
217
218ParquetFileReader::~ParquetFileReader() {
219 try {
220 Close();
221 } catch (...) {
222 }
223}
224
225// Open the file. If no metadata is passed, it is parsed from the footer of
226// the file
227std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
228 const std::shared_ptr<ArrowInputFile>& source, const ReaderProperties& props,
229 const std::shared_ptr<FileMetaData>& metadata) {
230 std::unique_ptr<ParquetFileReader::Contents> result(new SerializedFile(source, props));
231
232 // Access private methods here, but otherwise unavailable
233 SerializedFile* file = static_cast<SerializedFile*>(result.get());
234
235 if (metadata == nullptr) {
236 // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
237 file->ParseMetaData();
238 } else {
239 file->set_metadata(metadata);
240 }
241
242 return result;
243}
244
245std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
246 const std::shared_ptr<::arrow::io::RandomAccessFile>& source,
247 const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) {
248 auto contents = SerializedFile::Open(source, props, metadata);
249 std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
250 result->Open(std::move(contents));
251 return result;
252}
253
254std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
255 std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
256 const std::shared_ptr<FileMetaData>& metadata) {
257 auto wrapper = std::make_shared<ParquetInputWrapper>(std::move(source));
258 return Open(wrapper, props, metadata);
259}
260
261std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
262 const std::string& path, bool memory_map, const ReaderProperties& props,
263 const std::shared_ptr<FileMetaData>& metadata) {
264 std::shared_ptr<::arrow::io::RandomAccessFile> source;
265 if (memory_map) {
266 std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
267 PARQUET_THROW_NOT_OK(
268 ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
269 source = handle;
270 } else {
271 std::shared_ptr<::arrow::io::ReadableFile> handle;
272 PARQUET_THROW_NOT_OK(
273 ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle));
274 source = handle;
275 }
276
277 return Open(source, props, metadata);
278}
279
280void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
281 contents_ = std::move(contents);
282}
283
284void ParquetFileReader::Close() {
285 if (contents_) {
286 contents_->Close();
287 }
288}
289
290std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
291 return contents_->metadata();
292}
293
294std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
295 DCHECK(i < metadata()->num_row_groups())
296 << "The file only has " << metadata()->num_row_groups()
297 << "row groups, requested reader for: " << i;
298 return contents_->GetRowGroup(i);
299}
300
301// ----------------------------------------------------------------------
302// File metadata helpers
303
304std::shared_ptr<FileMetaData> ReadMetaData(
305 const std::shared_ptr<::arrow::io::RandomAccessFile>& source) {
306 return ParquetFileReader::Open(source)->metadata();
307}
308
309// ----------------------------------------------------------------------
310// File scanner for performance testing
311
312int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
313 ParquetFileReader* reader) {
314 std::vector<int16_t> rep_levels(column_batch_size);
315 std::vector<int16_t> def_levels(column_batch_size);
316
317 int num_columns = static_cast<int>(columns.size());
318
319 // columns are not specified explicitly. Add all columns
320 if (columns.size() == 0) {
321 num_columns = reader->metadata()->num_columns();
322 columns.resize(num_columns);
323 for (int i = 0; i < num_columns; i++) {
324 columns[i] = i;
325 }
326 }
327
328 std::vector<int64_t> total_rows(num_columns, 0);
329
330 for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
331 auto group_reader = reader->RowGroup(r);
332 int col = 0;
333 for (auto i : columns) {
334 std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
335 size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
336 std::vector<uint8_t> values(column_batch_size * value_byte_size);
337
338 int64_t values_read = 0;
339 while (col_reader->HasNext()) {
340 int64_t levels_read =
341 ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
342 values.data(), &values_read, col_reader.get());
343 if (col_reader->descr()->max_repetition_level() > 0) {
344 for (int64_t i = 0; i < levels_read; i++) {
345 if (rep_levels[i] == 0) {
346 total_rows[col]++;
347 }
348 }
349 } else {
350 total_rows[col] += levels_read;
351 }
352 }
353 col++;
354 }
355 }
356
357 for (int i = 1; i < num_columns; ++i) {
358 if (total_rows[0] != total_rows[i]) {
359 throw ParquetException("Parquet error: Total rows among columns do not match");
360 }
361 }
362
363 return total_rows[0];
364}
365
366} // namespace parquet
367