1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <fcntl.h>
19#include <gtest/gtest.h>
20#include <cstdint>
21#include <cstdlib>
22#include <iostream>
23#include <memory>
24#include <string>
25
26#include "arrow/io/file.h"
27
28#include "parquet/column_reader.h"
29#include "parquet/column_scanner.h"
30#include "parquet/file_reader.h"
31#include "parquet/printer.h"
32#include "parquet/util/memory.h"
33#include "parquet/util/test-common.h"
34
35using std::string;
36
37namespace parquet {
38
39using ReadableFile = ::arrow::io::ReadableFile;
40
41std::string alltypes_plain() {
42 std::string dir_string(test::get_data_dir());
43 std::stringstream ss;
44 ss << dir_string << "/"
45 << "alltypes_plain.parquet";
46 return ss.str();
47}
48
49std::string nation_dict_truncated_data_page() {
50 std::string dir_string(test::get_data_dir());
51 std::stringstream ss;
52 ss << dir_string << "/"
53 << "nation.dict-malformed.parquet";
54 return ss.str();
55}
56
57class TestAllTypesPlain : public ::testing::Test {
58 public:
59 void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); }
60
61 void TearDown() {}
62
63 protected:
64 std::unique_ptr<ParquetFileReader> reader_;
65};
66
67TEST_F(TestAllTypesPlain, NoopConstructDestruct) {}
68
69TEST_F(TestAllTypesPlain, TestBatchRead) {
70 std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
71
72 // column 0, id
73 std::shared_ptr<Int32Reader> col =
74 std::dynamic_pointer_cast<Int32Reader>(group->Column(0));
75
76 int16_t def_levels[4];
77 int16_t rep_levels[4];
78 int32_t values[4];
79
80 // This file only has 8 rows
81 ASSERT_EQ(8, reader_->metadata()->num_rows());
82 // This file only has 1 row group
83 ASSERT_EQ(1, reader_->metadata()->num_row_groups());
84 // Size of the metadata is 730 bytes
85 ASSERT_EQ(730, reader_->metadata()->size());
86 // This row group must have 8 rows
87 ASSERT_EQ(8, group->metadata()->num_rows());
88
89 ASSERT_TRUE(col->HasNext());
90 int64_t values_read;
91 auto levels_read = col->ReadBatch(4, def_levels, rep_levels, values, &values_read);
92 ASSERT_EQ(4, levels_read);
93 ASSERT_EQ(4, values_read);
94
95 // Now read past the end of the file
96 ASSERT_TRUE(col->HasNext());
97 levels_read = col->ReadBatch(5, def_levels, rep_levels, values, &values_read);
98 ASSERT_EQ(4, levels_read);
99 ASSERT_EQ(4, values_read);
100
101 ASSERT_FALSE(col->HasNext());
102}
103
104TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
105 std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
106
107 // column 0, id
108 std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
109 int32_t val;
110 bool is_null;
111 for (int i = 0; i < 8; ++i) {
112 ASSERT_TRUE(scanner->HasNext());
113 ASSERT_TRUE(scanner->NextValue(&val, &is_null));
114 ASSERT_FALSE(is_null);
115 }
116 ASSERT_FALSE(scanner->HasNext());
117 ASSERT_FALSE(scanner->NextValue(&val, &is_null));
118}
119
120TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
121 std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
122
123 // column 0, id
124 std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
125
126 ASSERT_EQ(128, scanner->batch_size());
127 scanner->SetBatchSize(1024);
128 ASSERT_EQ(1024, scanner->batch_size());
129}
130
131TEST_F(TestAllTypesPlain, DebugPrintWorks) {
132 std::stringstream ss;
133
134 std::list<int> columns;
135 ParquetFilePrinter printer(reader_.get());
136 printer.DebugPrint(ss, columns);
137
138 std::string result = ss.str();
139 ASSERT_GT(result.size(), 0);
140}
141
142TEST_F(TestAllTypesPlain, ColumnSelection) {
143 std::stringstream ss;
144
145 std::list<int> columns;
146 columns.push_back(5);
147 columns.push_back(0);
148 columns.push_back(10);
149 ParquetFilePrinter printer(reader_.get());
150 printer.DebugPrint(ss, columns);
151
152 std::string result = ss.str();
153 ASSERT_GT(result.size(), 0);
154}
155
156TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
157 std::stringstream ss;
158
159 std::list<int> columns;
160 columns.push_back(100);
161 ParquetFilePrinter printer1(reader_.get());
162 ASSERT_THROW(printer1.DebugPrint(ss, columns), ParquetException);
163
164 columns.clear();
165 columns.push_back(-1);
166 ParquetFilePrinter printer2(reader_.get());
167 ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException);
168}
169
170class TestLocalFile : public ::testing::Test {
171 public:
172 void SetUp() {
173 std::string dir_string(test::get_data_dir());
174
175 std::stringstream ss;
176 ss << dir_string << "/"
177 << "alltypes_plain.parquet";
178
179 PARQUET_THROW_NOT_OK(ReadableFile::Open(ss.str(), &handle));
180 fileno = handle->file_descriptor();
181 }
182
183 void TearDown() {}
184
185 protected:
186 int fileno;
187 std::shared_ptr<::arrow::io::ReadableFile> handle;
188};
189
190class HelperFileClosed : public ArrowInputFile {
191 public:
192 explicit HelperFileClosed(
193 const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, bool* close_called)
194 : ArrowInputFile(file), close_called_(close_called) {}
195
196 void Close() override { *close_called_ = true; }
197
198 private:
199 bool* close_called_;
200};
201
202TEST_F(TestLocalFile, FileClosedOnDestruction) {
203 bool close_called = false;
204 {
205 auto contents = ParquetFileReader::Contents::Open(
206 std::unique_ptr<RandomAccessSource>(new HelperFileClosed(handle, &close_called)));
207 std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
208 result->Open(std::move(contents));
209 }
210 ASSERT_TRUE(close_called);
211}
212
213TEST_F(TestLocalFile, OpenWithMetadata) {
214 // PARQUET-808
215 std::stringstream ss;
216 std::shared_ptr<FileMetaData> metadata = ReadMetaData(handle);
217
218 auto reader = ParquetFileReader::Open(handle, default_reader_properties(), metadata);
219
220 // Compare pointers
221 ASSERT_EQ(metadata.get(), reader->metadata().get());
222
223 std::list<int> columns;
224 ParquetFilePrinter printer(reader.get());
225 printer.DebugPrint(ss, columns, true);
226
227 // Make sure OpenFile passes on the external metadata, too
228 auto reader2 = ParquetFileReader::OpenFile(alltypes_plain(), false,
229 default_reader_properties(), metadata);
230
231 // Compare pointers
232 ASSERT_EQ(metadata.get(), reader2->metadata().get());
233}
234
235TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) {
236 // PARQUET-816. Some files generated by older Parquet implementations may
237 // contain malformed data page metadata, and we can successfully decode them
238 // if we optimistically proceed to decoding, even if there is not enough data
239 // available in the stream. Before, we had quite aggressive checking of
240 // stream reads, which are not found e.g. in Impala's Parquet implementation
241 auto reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), false);
242 std::stringstream ss;
243
244 // empty list means print all
245 std::list<int> columns;
246 ParquetFilePrinter printer1(reader.get());
247 printer1.DebugPrint(ss, columns, true);
248
249 reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), true);
250 std::stringstream ss2;
251 ParquetFilePrinter printer2(reader.get());
252 printer2.DebugPrint(ss2, columns, true);
253
254 // The memory-mapped reads runs over the end of the column chunk and succeeds
255 // by accident
256 ASSERT_EQ(ss2.str(), ss.str());
257}
258
259TEST(TestJSONWithLocalFile, JSONOutput) {
260 std::string jsonOutput = R"###({
261 "FileName": "alltypes_plain.parquet",
262 "Version": "0",
263 "CreatedBy": "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)",
264 "TotalRows": "8",
265 "NumberOfRowGroups": "1",
266 "NumberOfRealColumns": "11",
267 "NumberOfColumns": "11",
268 "Columns": [
269 { "Id": "0", "Name": "id", "PhysicalType": "INT32", "LogicalType": "NONE" },
270 { "Id": "1", "Name": "bool_col", "PhysicalType": "BOOLEAN", "LogicalType": "NONE" },
271 { "Id": "2", "Name": "tinyint_col", "PhysicalType": "INT32", "LogicalType": "NONE" },
272 { "Id": "3", "Name": "smallint_col", "PhysicalType": "INT32", "LogicalType": "NONE" },
273 { "Id": "4", "Name": "int_col", "PhysicalType": "INT32", "LogicalType": "NONE" },
274 { "Id": "5", "Name": "bigint_col", "PhysicalType": "INT64", "LogicalType": "NONE" },
275 { "Id": "6", "Name": "float_col", "PhysicalType": "FLOAT", "LogicalType": "NONE" },
276 { "Id": "7", "Name": "double_col", "PhysicalType": "DOUBLE", "LogicalType": "NONE" },
277 { "Id": "8", "Name": "date_string_col", "PhysicalType": "BYTE_ARRAY", "LogicalType": "NONE" },
278 { "Id": "9", "Name": "string_col", "PhysicalType": "BYTE_ARRAY", "LogicalType": "NONE" },
279 { "Id": "10", "Name": "timestamp_col", "PhysicalType": "INT96", "LogicalType": "NONE" }
280 ],
281 "RowGroups": [
282 {
283 "Id": "0", "TotalBytes": "671", "Rows": "8",
284 "ColumnChunks": [
285 {"Id": "0", "Values": "8", "StatsSet": "False",
286 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "73", "CompressedSize": "73" },
287 {"Id": "1", "Values": "8", "StatsSet": "False",
288 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "24", "CompressedSize": "24" },
289 {"Id": "2", "Values": "8", "StatsSet": "False",
290 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
291 {"Id": "3", "Values": "8", "StatsSet": "False",
292 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
293 {"Id": "4", "Values": "8", "StatsSet": "False",
294 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
295 {"Id": "5", "Values": "8", "StatsSet": "False",
296 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" },
297 {"Id": "6", "Values": "8", "StatsSet": "False",
298 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
299 {"Id": "7", "Values": "8", "StatsSet": "False",
300 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" },
301 {"Id": "8", "Values": "8", "StatsSet": "False",
302 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "88", "CompressedSize": "88" },
303 {"Id": "9", "Values": "8", "StatsSet": "False",
304 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "49", "CompressedSize": "49" },
305 {"Id": "10", "Values": "8", "StatsSet": "False",
306 "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "139", "CompressedSize": "139" }
307 ]
308 }
309 ]
310}
311)###";
312
313 std::stringstream ss;
314 // empty list means print all
315 std::list<int> columns;
316
317 auto reader =
318 ParquetFileReader::OpenFile(alltypes_plain(), false, default_reader_properties());
319 ParquetFilePrinter printer(reader.get());
320 printer.JSONPrint(ss, columns, "alltypes_plain.parquet");
321
322 ASSERT_EQ(jsonOutput, ss.str());
323}
324
325} // namespace parquet
326