1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <gtest/gtest.h> |
19 | |
20 | #include <cstdint> |
21 | #include <cstring> |
22 | #include <memory> |
23 | |
24 | #include "parquet/column_page.h" |
25 | #include "parquet/exception.h" |
26 | #include "parquet/file_reader.h" |
27 | #include "parquet/thrift.h" |
28 | #include "parquet/types.h" |
29 | #include "parquet/util/memory.h" |
30 | #include "parquet/util/test-common.h" |
31 | |
32 | #include "arrow/io/memory.h" |
33 | #include "arrow/status.h" |
34 | #include "arrow/util/compression.h" |
35 | |
36 | namespace parquet { |
37 | |
38 | #define ASSERT_OK(expr) \ |
39 | do { \ |
40 | ::arrow::Status s = (expr); \ |
41 | if (!s.ok()) { \ |
42 | FAIL() << s.ToString(); \ |
43 | } \ |
44 | } while (0) |
45 | |
46 | using ::arrow::io::BufferReader; |
47 | |
48 | // Adds page statistics occupying a certain amount of bytes (for testing very |
49 | // large page headers) |
50 | static inline void (int stat_size, format::DataPageHeader& data_page) { |
51 | std::vector<uint8_t> stat_bytes(stat_size); |
52 | // Some non-zero value |
53 | std::fill(stat_bytes.begin(), stat_bytes.end(), 1); |
54 | data_page.statistics.__set_max( |
55 | std::string(reinterpret_cast<const char*>(stat_bytes.data()), stat_size)); |
56 | data_page.__isset.statistics = true; |
57 | } |
58 | |
59 | class TestPageSerde : public ::testing::Test { |
60 | public: |
61 | void SetUp() { |
62 | data_page_header_.encoding = format::Encoding::PLAIN; |
63 | data_page_header_.definition_level_encoding = format::Encoding::RLE; |
64 | data_page_header_.repetition_level_encoding = format::Encoding::RLE; |
65 | |
66 | ResetStream(); |
67 | } |
68 | |
69 | void (int64_t num_rows, |
70 | Compression::type codec = Compression::UNCOMPRESSED) { |
71 | EndStream(); |
72 | std::unique_ptr<InputStream> stream; |
73 | stream.reset(new InMemoryInputStream(out_buffer_)); |
74 | page_reader_ = PageReader::Open(std::move(stream), num_rows, codec); |
75 | } |
76 | |
77 | void (int max_serialized_len = 1024, int32_t uncompressed_size = 0, |
78 | int32_t compressed_size = 0) { |
79 | // Simplifying writing serialized data page headers which may or may not |
80 | // have meaningful data associated with them |
81 | |
82 | // Serialize the Page header |
83 | page_header_.__set_data_page_header(data_page_header_); |
84 | page_header_.uncompressed_page_size = uncompressed_size; |
85 | page_header_.compressed_page_size = compressed_size; |
86 | page_header_.type = format::PageType::DATA_PAGE; |
87 | |
88 | ThriftSerializer serializer; |
89 | ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get())); |
90 | } |
91 | |
92 | void ResetStream() { out_stream_.reset(new InMemoryOutputStream); } |
93 | |
94 | void EndStream() { out_buffer_ = out_stream_->GetBuffer(); } |
95 | |
96 | protected: |
97 | std::unique_ptr<InMemoryOutputStream> out_stream_; |
98 | std::shared_ptr<Buffer> out_buffer_; |
99 | |
100 | std::unique_ptr<PageReader> page_reader_; |
101 | format::PageHeader ; |
102 | format::DataPageHeader ; |
103 | }; |
104 | |
105 | void (const format::DataPageHeader expected, const Page* page) { |
106 | ASSERT_EQ(PageType::DATA_PAGE, page->type()); |
107 | |
108 | const DataPage* data_page = static_cast<const DataPage*>(page); |
109 | ASSERT_EQ(expected.num_values, data_page->num_values()); |
110 | ASSERT_EQ(expected.encoding, data_page->encoding()); |
111 | ASSERT_EQ(expected.definition_level_encoding, data_page->definition_level_encoding()); |
112 | ASSERT_EQ(expected.repetition_level_encoding, data_page->repetition_level_encoding()); |
113 | |
114 | if (expected.statistics.__isset.max) { |
115 | ASSERT_EQ(expected.statistics.max, data_page->statistics().max()); |
116 | } |
117 | if (expected.statistics.__isset.min) { |
118 | ASSERT_EQ(expected.statistics.min, data_page->statistics().min()); |
119 | } |
120 | } |
121 | |
122 | TEST_F(TestPageSerde, DataPage) { |
123 | format::PageHeader ; |
124 | |
125 | int stats_size = 512; |
126 | const int32_t num_rows = 4444; |
127 | AddDummyStats(stats_size, data_page_header_); |
128 | data_page_header_.num_values = num_rows; |
129 | |
130 | ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader()); |
131 | InitSerializedPageReader(num_rows); |
132 | std::shared_ptr<Page> current_page = page_reader_->NextPage(); |
133 | ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get())); |
134 | } |
135 | |
136 | TEST_F(TestPageSerde, TestLargePageHeaders) { |
137 | int stats_size = 256 * 1024; // 256 KB |
138 | AddDummyStats(stats_size, data_page_header_); |
139 | |
140 | // Any number to verify metadata roundtrip |
141 | const int32_t num_rows = 4141; |
142 | data_page_header_.num_values = num_rows; |
143 | |
144 | int = 512 * 1024; // 512 KB |
145 | ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(max_header_size)); |
146 | ASSERT_GE(max_header_size, out_stream_->Tell()); |
147 | |
148 | // check header size is between 256 KB to 16 MB |
149 | ASSERT_LE(stats_size, out_stream_->Tell()); |
150 | ASSERT_GE(kDefaultMaxPageHeaderSize, out_stream_->Tell()); |
151 | |
152 | InitSerializedPageReader(num_rows); |
153 | std::shared_ptr<Page> current_page = page_reader_->NextPage(); |
154 | ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get())); |
155 | } |
156 | |
157 | TEST_F(TestPageSerde, TestFailLargePageHeaders) { |
158 | const int32_t num_rows = 1337; // dummy value |
159 | |
160 | int stats_size = 256 * 1024; // 256 KB |
161 | AddDummyStats(stats_size, data_page_header_); |
162 | |
163 | // Serialize the Page header |
164 | int = 512 * 1024; // 512 KB |
165 | ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(max_header_size)); |
166 | ASSERT_GE(max_header_size, out_stream_->Tell()); |
167 | |
168 | int smaller_max_size = 128 * 1024; |
169 | ASSERT_LE(smaller_max_size, out_stream_->Tell()); |
170 | InitSerializedPageReader(num_rows); |
171 | |
172 | // Set the max page header size to 128 KB, which is less than the current |
173 | // header size |
174 | page_reader_->set_max_page_header_size(smaller_max_size); |
175 | ASSERT_THROW(page_reader_->NextPage(), ParquetException); |
176 | } |
177 | |
178 | TEST_F(TestPageSerde, Compression) { |
179 | std::vector<Compression::type> codec_types = {Compression::GZIP, Compression::SNAPPY, |
180 | Compression::BROTLI, Compression::LZ4}; |
181 | #ifdef ARROW_WITH_ZSTD |
182 | codec_types.push_back(Compression::ZSTD); |
183 | #endif |
184 | |
185 | const int32_t num_rows = 32; // dummy value |
186 | data_page_header_.num_values = num_rows; |
187 | |
188 | int num_pages = 10; |
189 | |
190 | std::vector<std::vector<uint8_t>> faux_data; |
191 | faux_data.resize(num_pages); |
192 | for (int i = 0; i < num_pages; ++i) { |
193 | // The pages keep getting larger |
194 | int page_size = (i + 1) * 64; |
195 | test::random_bytes(page_size, 0, &faux_data[i]); |
196 | } |
197 | for (auto codec_type : codec_types) { |
198 | auto codec = GetCodecFromArrow(codec_type); |
199 | |
200 | std::vector<uint8_t> buffer; |
201 | for (int i = 0; i < num_pages; ++i) { |
202 | const uint8_t* data = faux_data[i].data(); |
203 | int data_size = static_cast<int>(faux_data[i].size()); |
204 | |
205 | int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data); |
206 | buffer.resize(max_compressed_size); |
207 | |
208 | int64_t actual_size; |
209 | ASSERT_OK(codec->Compress(data_size, data, max_compressed_size, &buffer[0], |
210 | &actual_size)); |
211 | |
212 | ASSERT_NO_FATAL_FAILURE( |
213 | WriteDataPageHeader(1024, data_size, static_cast<int32_t>(actual_size))); |
214 | out_stream_->Write(buffer.data(), actual_size); |
215 | } |
216 | |
217 | InitSerializedPageReader(num_rows * num_pages, codec_type); |
218 | |
219 | std::shared_ptr<Page> page; |
220 | const DataPage* data_page; |
221 | for (int i = 0; i < num_pages; ++i) { |
222 | int data_size = static_cast<int>(faux_data[i].size()); |
223 | page = page_reader_->NextPage(); |
224 | data_page = static_cast<const DataPage*>(page.get()); |
225 | ASSERT_EQ(data_size, data_page->size()); |
226 | ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size)); |
227 | } |
228 | |
229 | ResetStream(); |
230 | } |
231 | } |
232 | |
233 | TEST_F(TestPageSerde, LZONotSupported) { |
234 | // Must await PARQUET-530 |
235 | int data_size = 1024; |
236 | std::vector<uint8_t> faux_data(data_size); |
237 | ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(1024, data_size, data_size)); |
238 | out_stream_->Write(faux_data.data(), data_size); |
239 | ASSERT_THROW(InitSerializedPageReader(data_size, Compression::LZO), ParquetException); |
240 | } |
241 | |
242 | // ---------------------------------------------------------------------- |
243 | // File structure tests |
244 | |
245 | class TestParquetFileReader : public ::testing::Test { |
246 | public: |
247 | void AssertInvalidFileThrows(const std::shared_ptr<Buffer>& buffer) { |
248 | reader_.reset(new ParquetFileReader()); |
249 | |
250 | auto reader = std::make_shared<BufferReader>(buffer); |
251 | auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader)); |
252 | |
253 | ASSERT_THROW(reader_->Open(ParquetFileReader::Contents::Open(std::move(wrapper))), |
254 | ParquetException); |
255 | } |
256 | |
257 | protected: |
258 | std::unique_ptr<ParquetFileReader> reader_; |
259 | }; |
260 | |
261 | TEST_F(TestParquetFileReader, InvalidHeader) { |
262 | const char* = "PAR2" ; |
263 | |
264 | auto buffer = Buffer::Wrap(bad_header, strlen(bad_header)); |
265 | ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); |
266 | } |
267 | |
268 | TEST_F(TestParquetFileReader, InvalidFooter) { |
269 | // File is smaller than FOOTER_SIZE |
270 | const char* bad_file = "PAR1PAR" ; |
271 | auto buffer = Buffer::Wrap(bad_file, strlen(bad_file)); |
272 | ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); |
273 | |
274 | // Magic number incorrect |
275 | const char* bad_file2 = "PAR1PAR2" ; |
276 | buffer = Buffer::Wrap(bad_file2, strlen(bad_file2)); |
277 | ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); |
278 | } |
279 | |
280 | TEST_F(TestParquetFileReader, IncompleteMetadata) { |
281 | InMemoryOutputStream stream; |
282 | |
283 | const char* magic = "PAR1" ; |
284 | |
285 | stream.Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic)); |
286 | std::vector<uint8_t> bytes(10); |
287 | stream.Write(bytes.data(), bytes.size()); |
288 | uint32_t metadata_len = 24; |
289 | stream.Write(reinterpret_cast<const uint8_t*>(&metadata_len), sizeof(uint32_t)); |
290 | stream.Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic)); |
291 | |
292 | auto buffer = stream.GetBuffer(); |
293 | ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); |
294 | } |
295 | |
296 | } // namespace parquet |
297 | |