1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <fcntl.h> |
19 | #include <gtest/gtest.h> |
20 | #include <cstdint> |
21 | #include <cstdlib> |
22 | #include <iostream> |
23 | #include <memory> |
24 | #include <string> |
25 | |
26 | #include "arrow/io/file.h" |
27 | |
28 | #include "parquet/column_reader.h" |
29 | #include "parquet/column_scanner.h" |
30 | #include "parquet/file_reader.h" |
31 | #include "parquet/printer.h" |
32 | #include "parquet/util/memory.h" |
33 | #include "parquet/util/test-common.h" |
34 | |
35 | using std::string; |
36 | |
37 | namespace parquet { |
38 | |
39 | using ReadableFile = ::arrow::io::ReadableFile; |
40 | |
41 | std::string alltypes_plain() { |
42 | std::string dir_string(test::get_data_dir()); |
43 | std::stringstream ss; |
44 | ss << dir_string << "/" |
45 | << "alltypes_plain.parquet" ; |
46 | return ss.str(); |
47 | } |
48 | |
49 | std::string nation_dict_truncated_data_page() { |
50 | std::string dir_string(test::get_data_dir()); |
51 | std::stringstream ss; |
52 | ss << dir_string << "/" |
53 | << "nation.dict-malformed.parquet" ; |
54 | return ss.str(); |
55 | } |
56 | |
57 | class TestAllTypesPlain : public ::testing::Test { |
58 | public: |
59 | void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); } |
60 | |
61 | void TearDown() {} |
62 | |
63 | protected: |
64 | std::unique_ptr<ParquetFileReader> reader_; |
65 | }; |
66 | |
67 | TEST_F(TestAllTypesPlain, NoopConstructDestruct) {} |
68 | |
69 | TEST_F(TestAllTypesPlain, TestBatchRead) { |
70 | std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); |
71 | |
72 | // column 0, id |
73 | std::shared_ptr<Int32Reader> col = |
74 | std::dynamic_pointer_cast<Int32Reader>(group->Column(0)); |
75 | |
76 | int16_t def_levels[4]; |
77 | int16_t rep_levels[4]; |
78 | int32_t values[4]; |
79 | |
80 | // This file only has 8 rows |
81 | ASSERT_EQ(8, reader_->metadata()->num_rows()); |
82 | // This file only has 1 row group |
83 | ASSERT_EQ(1, reader_->metadata()->num_row_groups()); |
84 | // Size of the metadata is 730 bytes |
85 | ASSERT_EQ(730, reader_->metadata()->size()); |
86 | // This row group must have 8 rows |
87 | ASSERT_EQ(8, group->metadata()->num_rows()); |
88 | |
89 | ASSERT_TRUE(col->HasNext()); |
90 | int64_t values_read; |
91 | auto levels_read = col->ReadBatch(4, def_levels, rep_levels, values, &values_read); |
92 | ASSERT_EQ(4, levels_read); |
93 | ASSERT_EQ(4, values_read); |
94 | |
95 | // Now read past the end of the file |
96 | ASSERT_TRUE(col->HasNext()); |
97 | levels_read = col->ReadBatch(5, def_levels, rep_levels, values, &values_read); |
98 | ASSERT_EQ(4, levels_read); |
99 | ASSERT_EQ(4, values_read); |
100 | |
101 | ASSERT_FALSE(col->HasNext()); |
102 | } |
103 | |
104 | TEST_F(TestAllTypesPlain, TestFlatScannerInt32) { |
105 | std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); |
106 | |
107 | // column 0, id |
108 | std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0))); |
109 | int32_t val; |
110 | bool is_null; |
111 | for (int i = 0; i < 8; ++i) { |
112 | ASSERT_TRUE(scanner->HasNext()); |
113 | ASSERT_TRUE(scanner->NextValue(&val, &is_null)); |
114 | ASSERT_FALSE(is_null); |
115 | } |
116 | ASSERT_FALSE(scanner->HasNext()); |
117 | ASSERT_FALSE(scanner->NextValue(&val, &is_null)); |
118 | } |
119 | |
120 | TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) { |
121 | std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); |
122 | |
123 | // column 0, id |
124 | std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0))); |
125 | |
126 | ASSERT_EQ(128, scanner->batch_size()); |
127 | scanner->SetBatchSize(1024); |
128 | ASSERT_EQ(1024, scanner->batch_size()); |
129 | } |
130 | |
131 | TEST_F(TestAllTypesPlain, DebugPrintWorks) { |
132 | std::stringstream ss; |
133 | |
134 | std::list<int> columns; |
135 | ParquetFilePrinter printer(reader_.get()); |
136 | printer.DebugPrint(ss, columns); |
137 | |
138 | std::string result = ss.str(); |
139 | ASSERT_GT(result.size(), 0); |
140 | } |
141 | |
142 | TEST_F(TestAllTypesPlain, ColumnSelection) { |
143 | std::stringstream ss; |
144 | |
145 | std::list<int> columns; |
146 | columns.push_back(5); |
147 | columns.push_back(0); |
148 | columns.push_back(10); |
149 | ParquetFilePrinter printer(reader_.get()); |
150 | printer.DebugPrint(ss, columns); |
151 | |
152 | std::string result = ss.str(); |
153 | ASSERT_GT(result.size(), 0); |
154 | } |
155 | |
156 | TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { |
157 | std::stringstream ss; |
158 | |
159 | std::list<int> columns; |
160 | columns.push_back(100); |
161 | ParquetFilePrinter printer1(reader_.get()); |
162 | ASSERT_THROW(printer1.DebugPrint(ss, columns), ParquetException); |
163 | |
164 | columns.clear(); |
165 | columns.push_back(-1); |
166 | ParquetFilePrinter printer2(reader_.get()); |
167 | ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException); |
168 | } |
169 | |
170 | class TestLocalFile : public ::testing::Test { |
171 | public: |
172 | void SetUp() { |
173 | std::string dir_string(test::get_data_dir()); |
174 | |
175 | std::stringstream ss; |
176 | ss << dir_string << "/" |
177 | << "alltypes_plain.parquet" ; |
178 | |
179 | PARQUET_THROW_NOT_OK(ReadableFile::Open(ss.str(), &handle)); |
180 | fileno = handle->file_descriptor(); |
181 | } |
182 | |
183 | void TearDown() {} |
184 | |
185 | protected: |
186 | int fileno; |
187 | std::shared_ptr<::arrow::io::ReadableFile> handle; |
188 | }; |
189 | |
190 | class HelperFileClosed : public ArrowInputFile { |
191 | public: |
192 | explicit HelperFileClosed( |
193 | const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, bool* close_called) |
194 | : ArrowInputFile(file), close_called_(close_called) {} |
195 | |
196 | void Close() override { *close_called_ = true; } |
197 | |
198 | private: |
199 | bool* close_called_; |
200 | }; |
201 | |
202 | TEST_F(TestLocalFile, FileClosedOnDestruction) { |
203 | bool close_called = false; |
204 | { |
205 | auto contents = ParquetFileReader::Contents::Open( |
206 | std::unique_ptr<RandomAccessSource>(new HelperFileClosed(handle, &close_called))); |
207 | std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); |
208 | result->Open(std::move(contents)); |
209 | } |
210 | ASSERT_TRUE(close_called); |
211 | } |
212 | |
213 | TEST_F(TestLocalFile, OpenWithMetadata) { |
214 | // PARQUET-808 |
215 | std::stringstream ss; |
216 | std::shared_ptr<FileMetaData> metadata = ReadMetaData(handle); |
217 | |
218 | auto reader = ParquetFileReader::Open(handle, default_reader_properties(), metadata); |
219 | |
220 | // Compare pointers |
221 | ASSERT_EQ(metadata.get(), reader->metadata().get()); |
222 | |
223 | std::list<int> columns; |
224 | ParquetFilePrinter printer(reader.get()); |
225 | printer.DebugPrint(ss, columns, true); |
226 | |
227 | // Make sure OpenFile passes on the external metadata, too |
228 | auto reader2 = ParquetFileReader::OpenFile(alltypes_plain(), false, |
229 | default_reader_properties(), metadata); |
230 | |
231 | // Compare pointers |
232 | ASSERT_EQ(metadata.get(), reader2->metadata().get()); |
233 | } |
234 | |
235 | TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) { |
236 | // PARQUET-816. Some files generated by older Parquet implementations may |
237 | // contain malformed data page metadata, and we can successfully decode them |
238 | // if we optimistically proceed to decoding, even if there is not enough data |
239 | // available in the stream. Before, we had quite aggressive checking of |
240 | // stream reads, which are not found e.g. in Impala's Parquet implementation |
241 | auto reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), false); |
242 | std::stringstream ss; |
243 | |
244 | // empty list means print all |
245 | std::list<int> columns; |
246 | ParquetFilePrinter printer1(reader.get()); |
247 | printer1.DebugPrint(ss, columns, true); |
248 | |
249 | reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), true); |
250 | std::stringstream ss2; |
251 | ParquetFilePrinter printer2(reader.get()); |
252 | printer2.DebugPrint(ss2, columns, true); |
253 | |
254 | // The memory-mapped reads runs over the end of the column chunk and succeeds |
255 | // by accident |
256 | ASSERT_EQ(ss2.str(), ss.str()); |
257 | } |
258 | |
259 | TEST(TestJSONWithLocalFile, JSONOutput) { |
260 | std::string jsonOutput = R"###({ |
261 | "FileName": "alltypes_plain.parquet", |
262 | "Version": "0", |
263 | "CreatedBy": "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)", |
264 | "TotalRows": "8", |
265 | "NumberOfRowGroups": "1", |
266 | "NumberOfRealColumns": "11", |
267 | "NumberOfColumns": "11", |
268 | "Columns": [ |
269 | { "Id": "0", "Name": "id", "PhysicalType": "INT32", "LogicalType": "NONE" }, |
270 | { "Id": "1", "Name": "bool_col", "PhysicalType": "BOOLEAN", "LogicalType": "NONE" }, |
271 | { "Id": "2", "Name": "tinyint_col", "PhysicalType": "INT32", "LogicalType": "NONE" }, |
272 | { "Id": "3", "Name": "smallint_col", "PhysicalType": "INT32", "LogicalType": "NONE" }, |
273 | { "Id": "4", "Name": "int_col", "PhysicalType": "INT32", "LogicalType": "NONE" }, |
274 | { "Id": "5", "Name": "bigint_col", "PhysicalType": "INT64", "LogicalType": "NONE" }, |
275 | { "Id": "6", "Name": "float_col", "PhysicalType": "FLOAT", "LogicalType": "NONE" }, |
276 | { "Id": "7", "Name": "double_col", "PhysicalType": "DOUBLE", "LogicalType": "NONE" }, |
277 | { "Id": "8", "Name": "date_string_col", "PhysicalType": "BYTE_ARRAY", "LogicalType": "NONE" }, |
278 | { "Id": "9", "Name": "string_col", "PhysicalType": "BYTE_ARRAY", "LogicalType": "NONE" }, |
279 | { "Id": "10", "Name": "timestamp_col", "PhysicalType": "INT96", "LogicalType": "NONE" } |
280 | ], |
281 | "RowGroups": [ |
282 | { |
283 | "Id": "0", "TotalBytes": "671", "Rows": "8", |
284 | "ColumnChunks": [ |
285 | {"Id": "0", "Values": "8", "StatsSet": "False", |
286 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "73", "CompressedSize": "73" }, |
287 | {"Id": "1", "Values": "8", "StatsSet": "False", |
288 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "24", "CompressedSize": "24" }, |
289 | {"Id": "2", "Values": "8", "StatsSet": "False", |
290 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
291 | {"Id": "3", "Values": "8", "StatsSet": "False", |
292 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
293 | {"Id": "4", "Values": "8", "StatsSet": "False", |
294 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
295 | {"Id": "5", "Values": "8", "StatsSet": "False", |
296 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" }, |
297 | {"Id": "6", "Values": "8", "StatsSet": "False", |
298 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
299 | {"Id": "7", "Values": "8", "StatsSet": "False", |
300 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" }, |
301 | {"Id": "8", "Values": "8", "StatsSet": "False", |
302 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "88", "CompressedSize": "88" }, |
303 | {"Id": "9", "Values": "8", "StatsSet": "False", |
304 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "49", "CompressedSize": "49" }, |
305 | {"Id": "10", "Values": "8", "StatsSet": "False", |
306 | "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "139", "CompressedSize": "139" } |
307 | ] |
308 | } |
309 | ] |
310 | } |
311 | )###" ; |
312 | |
313 | std::stringstream ss; |
314 | // empty list means print all |
315 | std::list<int> columns; |
316 | |
317 | auto reader = |
318 | ParquetFileReader::OpenFile(alltypes_plain(), false, default_reader_properties()); |
319 | ParquetFilePrinter printer(reader.get()); |
320 | printer.JSONPrint(ss, columns, "alltypes_plain.parquet" ); |
321 | |
322 | ASSERT_EQ(jsonOutput, ss.str()); |
323 | } |
324 | |
325 | } // namespace parquet |
326 | |