1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/file_reader.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstdio> |
23 | #include <memory> |
24 | #include <sstream> |
25 | #include <string> |
26 | #include <utility> |
27 | #include <vector> |
28 | |
29 | #include "arrow/io/file.h" |
30 | #include "arrow/util/logging.h" |
31 | |
32 | #include "parquet/column_page.h" |
33 | #include "parquet/column_reader.h" |
34 | #include "parquet/column_scanner.h" |
35 | #include "parquet/exception.h" |
36 | #include "parquet/metadata.h" |
37 | #include "parquet/properties.h" |
38 | #include "parquet/thrift.h" |
39 | #include "parquet/types.h" |
40 | #include "parquet/util/memory.h" |
41 | |
42 | using std::string; |
43 | |
44 | namespace parquet { |
45 | |
46 | // PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file |
47 | static constexpr int64_t = 64 * 1024; |
48 | static constexpr uint32_t = 8; |
49 | static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; |
50 | |
51 | // For PARQUET-816 |
52 | static constexpr int64_t = 100; |
53 | |
54 | // ---------------------------------------------------------------------- |
55 | // RowGroupReader public API |
56 | |
57 | RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents) |
58 | : contents_(std::move(contents)) {} |
59 | |
60 | std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) { |
61 | DCHECK(i < metadata()->num_columns()) |
62 | << "The RowGroup only has " << metadata()->num_columns() |
63 | << "columns, requested column: " << i; |
64 | const ColumnDescriptor* descr = metadata()->schema()->Column(i); |
65 | |
66 | std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i); |
67 | return ColumnReader::Make( |
68 | descr, std::move(page_reader), |
69 | const_cast<ReaderProperties*>(contents_->properties())->memory_pool()); |
70 | } |
71 | |
72 | std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) { |
73 | DCHECK(i < metadata()->num_columns()) |
74 | << "The RowGroup only has " << metadata()->num_columns() |
75 | << "columns, requested column: " << i; |
76 | return contents_->GetColumnPageReader(i); |
77 | } |
78 | |
79 | // Returns the rowgroup metadata |
80 | const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); } |
81 | |
82 | // RowGroupReader::Contents implementation for the Parquet file specification |
83 | class SerializedRowGroup : public RowGroupReader::Contents { |
84 | public: |
85 | SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata, |
86 | int row_group_number, const ReaderProperties& props) |
87 | : source_(source), file_metadata_(file_metadata), properties_(props) { |
88 | row_group_metadata_ = file_metadata->RowGroup(row_group_number); |
89 | } |
90 | |
91 | const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); } |
92 | |
93 | const ReaderProperties* properties() const override { return &properties_; } |
94 | |
95 | std::unique_ptr<PageReader> GetColumnPageReader(int i) override { |
96 | // Read column chunk from the file |
97 | auto col = row_group_metadata_->ColumnChunk(i); |
98 | |
99 | int64_t col_start = col->data_page_offset(); |
100 | if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) { |
101 | col_start = col->dictionary_page_offset(); |
102 | } |
103 | |
104 | int64_t col_length = col->total_compressed_size(); |
105 | std::unique_ptr<InputStream> stream; |
106 | |
107 | // PARQUET-816 workaround for old files created by older parquet-mr |
108 | const ApplicationVersion& version = file_metadata_->writer_version(); |
109 | if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) { |
110 | // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the |
111 | // dictionary page header size in total_compressed_size and total_uncompressed_size |
112 | // (see IMPALA-694). We add padding to compensate. |
113 | int64_t bytes_remaining = source_->Size() - (col_start + col_length); |
114 | int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining); |
115 | col_length += padding; |
116 | } |
117 | |
118 | stream = properties_.GetStream(source_, col_start, col_length); |
119 | |
120 | return PageReader::Open(std::move(stream), col->num_values(), col->compression(), |
121 | properties_.memory_pool()); |
122 | } |
123 | |
124 | private: |
125 | RandomAccessSource* source_; |
126 | FileMetaData* file_metadata_; |
127 | std::unique_ptr<RowGroupMetaData> row_group_metadata_; |
128 | ReaderProperties properties_; |
129 | }; |
130 | |
131 | // ---------------------------------------------------------------------- |
132 | // SerializedFile: An implementation of ParquetFileReader::Contents that deals |
133 | // with the Parquet file structure, Thrift deserialization, and other internal |
134 | // matters |
135 | |
136 | // This class takes ownership of the provided data source |
137 | class SerializedFile : public ParquetFileReader::Contents { |
138 | public: |
139 | SerializedFile(std::unique_ptr<RandomAccessSource> source, |
140 | const ReaderProperties& props = default_reader_properties()) |
141 | : source_(std::move(source)), properties_(props) {} |
142 | |
143 | ~SerializedFile() override { |
144 | try { |
145 | Close(); |
146 | } catch (...) { |
147 | } |
148 | } |
149 | |
150 | void Close() override { source_->Close(); } |
151 | |
152 | std::shared_ptr<RowGroupReader> GetRowGroup(int i) override { |
153 | std::unique_ptr<SerializedRowGroup> contents( |
154 | new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_)); |
155 | return std::make_shared<RowGroupReader>(std::move(contents)); |
156 | } |
157 | |
158 | std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; } |
159 | |
160 | void set_metadata(const std::shared_ptr<FileMetaData>& metadata) { |
161 | file_metadata_ = metadata; |
162 | } |
163 | |
164 | void ParseMetaData() { |
165 | int64_t file_size = source_->Size(); |
166 | |
167 | if (file_size < FOOTER_SIZE) { |
168 | throw ParquetException("Corrupted file, smaller than file footer" ); |
169 | } |
170 | |
171 | uint8_t [DEFAULT_FOOTER_READ_SIZE]; |
172 | int64_t = std::min(file_size, DEFAULT_FOOTER_READ_SIZE); |
173 | int64_t bytes_read = |
174 | source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer); |
175 | |
176 | // Check if all bytes are read. Check if last 4 bytes read have the magic bits |
177 | if (bytes_read != footer_read_size || |
178 | memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) { |
179 | throw ParquetException("Invalid parquet file. Corrupt footer." ); |
180 | } |
181 | |
182 | uint32_t metadata_len = |
183 | *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE); |
184 | int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len; |
185 | if (FOOTER_SIZE + metadata_len > file_size) { |
186 | throw ParquetException( |
187 | "Invalid parquet file. File is less than " |
188 | "file metadata size." ); |
189 | } |
190 | |
191 | std::shared_ptr<ResizableBuffer> metadata_buffer = |
192 | AllocateBuffer(properties_.memory_pool(), metadata_len); |
193 | |
194 | // Check if the footer_buffer contains the entire metadata |
195 | if (footer_read_size >= (metadata_len + FOOTER_SIZE)) { |
196 | memcpy(metadata_buffer->mutable_data(), |
197 | footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), |
198 | metadata_len); |
199 | } else { |
200 | bytes_read = |
201 | source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data()); |
202 | if (bytes_read != metadata_len) { |
203 | throw ParquetException("Invalid parquet file. Could not read metadata bytes." ); |
204 | } |
205 | } |
206 | |
207 | file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len); |
208 | } |
209 | |
210 | private: |
211 | std::unique_ptr<RandomAccessSource> source_; |
212 | std::shared_ptr<FileMetaData> file_metadata_; |
213 | ReaderProperties properties_; |
214 | }; |
215 | |
216 | // ---------------------------------------------------------------------- |
217 | // ParquetFileReader public API |
218 | |
219 | ParquetFileReader::ParquetFileReader() {} |
220 | |
221 | ParquetFileReader::~ParquetFileReader() { |
222 | try { |
223 | Close(); |
224 | } catch (...) { |
225 | } |
226 | } |
227 | |
228 | // Open the file. If no metadata is passed, it is parsed from the footer of |
229 | // the file |
230 | std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open( |
231 | std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props, |
232 | const std::shared_ptr<FileMetaData>& metadata) { |
233 | std::unique_ptr<ParquetFileReader::Contents> result( |
234 | new SerializedFile(std::move(source), props)); |
235 | |
236 | // Access private methods here, but otherwise unavailable |
237 | SerializedFile* file = static_cast<SerializedFile*>(result.get()); |
238 | |
239 | if (metadata == nullptr) { |
240 | // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor |
241 | file->ParseMetaData(); |
242 | } else { |
243 | file->set_metadata(metadata); |
244 | } |
245 | |
246 | return result; |
247 | } |
248 | |
249 | std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( |
250 | const std::shared_ptr<::arrow::io::ReadableFileInterface>& source, |
251 | const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) { |
252 | std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source)); |
253 | return Open(std::move(io_wrapper), props, metadata); |
254 | } |
255 | |
256 | std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( |
257 | std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props, |
258 | const std::shared_ptr<FileMetaData>& metadata) { |
259 | auto contents = SerializedFile::Open(std::move(source), props, metadata); |
260 | std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); |
261 | result->Open(std::move(contents)); |
262 | return result; |
263 | } |
264 | |
265 | std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile( |
266 | const std::string& path, bool memory_map, const ReaderProperties& props, |
267 | const std::shared_ptr<FileMetaData>& metadata) { |
268 | std::shared_ptr<::arrow::io::ReadableFileInterface> source; |
269 | if (memory_map) { |
270 | std::shared_ptr<::arrow::io::MemoryMappedFile> handle; |
271 | PARQUET_THROW_NOT_OK( |
272 | ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle)); |
273 | source = handle; |
274 | } else { |
275 | std::shared_ptr<::arrow::io::ReadableFile> handle; |
276 | PARQUET_THROW_NOT_OK( |
277 | ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle)); |
278 | source = handle; |
279 | } |
280 | |
281 | return Open(source, props, metadata); |
282 | } |
283 | |
284 | void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) { |
285 | contents_ = std::move(contents); |
286 | } |
287 | |
288 | void ParquetFileReader::Close() { |
289 | if (contents_) { |
290 | contents_->Close(); |
291 | } |
292 | } |
293 | |
294 | std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const { |
295 | return contents_->metadata(); |
296 | } |
297 | |
298 | std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) { |
299 | DCHECK(i < metadata()->num_row_groups()) |
300 | << "The file only has " << metadata()->num_row_groups() |
301 | << "row groups, requested reader for: " << i; |
302 | return contents_->GetRowGroup(i); |
303 | } |
304 | |
305 | // ---------------------------------------------------------------------- |
306 | // File metadata helpers |
307 | |
308 | std::shared_ptr<FileMetaData> ReadMetaData( |
309 | const std::shared_ptr<::arrow::io::ReadableFileInterface>& source) { |
310 | return ParquetFileReader::Open(source)->metadata(); |
311 | } |
312 | |
313 | // ---------------------------------------------------------------------- |
314 | // File scanner for performance testing |
315 | |
316 | int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size, |
317 | ParquetFileReader* reader) { |
318 | std::vector<int16_t> rep_levels(column_batch_size); |
319 | std::vector<int16_t> def_levels(column_batch_size); |
320 | |
321 | int num_columns = static_cast<int>(columns.size()); |
322 | |
323 | // columns are not specified explicitly. Add all columns |
324 | if (columns.size() == 0) { |
325 | num_columns = reader->metadata()->num_columns(); |
326 | columns.resize(num_columns); |
327 | for (int i = 0; i < num_columns; i++) { |
328 | columns[i] = i; |
329 | } |
330 | } |
331 | |
332 | std::vector<int64_t> total_rows(num_columns, 0); |
333 | |
334 | for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) { |
335 | auto group_reader = reader->RowGroup(r); |
336 | int col = 0; |
337 | for (auto i : columns) { |
338 | std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i); |
339 | size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type()); |
340 | std::vector<uint8_t> values(column_batch_size * value_byte_size); |
341 | |
342 | int64_t values_read = 0; |
343 | while (col_reader->HasNext()) { |
344 | int64_t levels_read = |
345 | ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(), |
346 | values.data(), &values_read, col_reader.get()); |
347 | if (col_reader->descr()->max_repetition_level() > 0) { |
348 | for (int64_t i = 0; i < levels_read; i++) { |
349 | if (rep_levels[i] == 0) { |
350 | total_rows[col]++; |
351 | } |
352 | } |
353 | } else { |
354 | total_rows[col] += levels_read; |
355 | } |
356 | } |
357 | col++; |
358 | } |
359 | } |
360 | |
361 | for (int i = 1; i < num_columns; ++i) { |
362 | if (total_rows[0] != total_rows[i]) { |
363 | throw ParquetException("Parquet error: Total rows among columns do not match" ); |
364 | } |
365 | } |
366 | |
367 | return total_rows[0]; |
368 | } |
369 | |
370 | } // namespace parquet |
371 | |