1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <gtest/gtest.h>
19
20#include <cstdint>
21#include <cstring>
22#include <memory>
23
24#include "parquet/column_page.h"
25#include "parquet/exception.h"
26#include "parquet/file_reader.h"
27#include "parquet/thrift.h"
28#include "parquet/types.h"
29#include "parquet/util/memory.h"
30#include "parquet/util/test-common.h"
31
32#include "arrow/io/memory.h"
33#include "arrow/status.h"
34#include "arrow/util/compression.h"
35
36namespace parquet {
37
38#define ASSERT_OK(expr) \
39 do { \
40 ::arrow::Status s = (expr); \
41 if (!s.ok()) { \
42 FAIL() << s.ToString(); \
43 } \
44 } while (0)
45
46using ::arrow::io::BufferReader;
47
48// Adds page statistics occupying a certain amount of bytes (for testing very
49// large page headers)
50static inline void AddDummyStats(int stat_size, format::DataPageHeader& data_page) {
51 std::vector<uint8_t> stat_bytes(stat_size);
52 // Some non-zero value
53 std::fill(stat_bytes.begin(), stat_bytes.end(), 1);
54 data_page.statistics.__set_max(
55 std::string(reinterpret_cast<const char*>(stat_bytes.data()), stat_size));
56 data_page.__isset.statistics = true;
57}
58
59class TestPageSerde : public ::testing::Test {
60 public:
61 void SetUp() {
62 data_page_header_.encoding = format::Encoding::PLAIN;
63 data_page_header_.definition_level_encoding = format::Encoding::RLE;
64 data_page_header_.repetition_level_encoding = format::Encoding::RLE;
65
66 ResetStream();
67 }
68
69 void InitSerializedPageReader(int64_t num_rows,
70 Compression::type codec = Compression::UNCOMPRESSED) {
71 EndStream();
72 std::unique_ptr<InputStream> stream;
73 stream.reset(new InMemoryInputStream(out_buffer_));
74 page_reader_ = PageReader::Open(std::move(stream), num_rows, codec);
75 }
76
77 void WriteDataPageHeader(int max_serialized_len = 1024, int32_t uncompressed_size = 0,
78 int32_t compressed_size = 0) {
79 // Simplifying writing serialized data page headers which may or may not
80 // have meaningful data associated with them
81
82 // Serialize the Page header
83 page_header_.__set_data_page_header(data_page_header_);
84 page_header_.uncompressed_page_size = uncompressed_size;
85 page_header_.compressed_page_size = compressed_size;
86 page_header_.type = format::PageType::DATA_PAGE;
87
88 ThriftSerializer serializer;
89 ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
90 }
91
92 void ResetStream() { out_stream_.reset(new InMemoryOutputStream); }
93
94 void EndStream() { out_buffer_ = out_stream_->GetBuffer(); }
95
96 protected:
97 std::unique_ptr<InMemoryOutputStream> out_stream_;
98 std::shared_ptr<Buffer> out_buffer_;
99
100 std::unique_ptr<PageReader> page_reader_;
101 format::PageHeader page_header_;
102 format::DataPageHeader data_page_header_;
103};
104
105void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page) {
106 ASSERT_EQ(PageType::DATA_PAGE, page->type());
107
108 const DataPage* data_page = static_cast<const DataPage*>(page);
109 ASSERT_EQ(expected.num_values, data_page->num_values());
110 ASSERT_EQ(expected.encoding, data_page->encoding());
111 ASSERT_EQ(expected.definition_level_encoding, data_page->definition_level_encoding());
112 ASSERT_EQ(expected.repetition_level_encoding, data_page->repetition_level_encoding());
113
114 if (expected.statistics.__isset.max) {
115 ASSERT_EQ(expected.statistics.max, data_page->statistics().max());
116 }
117 if (expected.statistics.__isset.min) {
118 ASSERT_EQ(expected.statistics.min, data_page->statistics().min());
119 }
120}
121
122TEST_F(TestPageSerde, DataPage) {
123 format::PageHeader out_page_header;
124
125 int stats_size = 512;
126 const int32_t num_rows = 4444;
127 AddDummyStats(stats_size, data_page_header_);
128 data_page_header_.num_values = num_rows;
129
130 ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader());
131 InitSerializedPageReader(num_rows);
132 std::shared_ptr<Page> current_page = page_reader_->NextPage();
133 ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
134}
135
136TEST_F(TestPageSerde, TestLargePageHeaders) {
137 int stats_size = 256 * 1024; // 256 KB
138 AddDummyStats(stats_size, data_page_header_);
139
140 // Any number to verify metadata roundtrip
141 const int32_t num_rows = 4141;
142 data_page_header_.num_values = num_rows;
143
144 int max_header_size = 512 * 1024; // 512 KB
145 ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(max_header_size));
146 ASSERT_GE(max_header_size, out_stream_->Tell());
147
148 // check header size is between 256 KB to 16 MB
149 ASSERT_LE(stats_size, out_stream_->Tell());
150 ASSERT_GE(kDefaultMaxPageHeaderSize, out_stream_->Tell());
151
152 InitSerializedPageReader(num_rows);
153 std::shared_ptr<Page> current_page = page_reader_->NextPage();
154 ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
155}
156
157TEST_F(TestPageSerde, TestFailLargePageHeaders) {
158 const int32_t num_rows = 1337; // dummy value
159
160 int stats_size = 256 * 1024; // 256 KB
161 AddDummyStats(stats_size, data_page_header_);
162
163 // Serialize the Page header
164 int max_header_size = 512 * 1024; // 512 KB
165 ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(max_header_size));
166 ASSERT_GE(max_header_size, out_stream_->Tell());
167
168 int smaller_max_size = 128 * 1024;
169 ASSERT_LE(smaller_max_size, out_stream_->Tell());
170 InitSerializedPageReader(num_rows);
171
172 // Set the max page header size to 128 KB, which is less than the current
173 // header size
174 page_reader_->set_max_page_header_size(smaller_max_size);
175 ASSERT_THROW(page_reader_->NextPage(), ParquetException);
176}
177
178TEST_F(TestPageSerde, Compression) {
179 std::vector<Compression::type> codec_types = {Compression::GZIP, Compression::SNAPPY,
180 Compression::BROTLI, Compression::LZ4};
181#ifdef ARROW_WITH_ZSTD
182 codec_types.push_back(Compression::ZSTD);
183#endif
184
185 const int32_t num_rows = 32; // dummy value
186 data_page_header_.num_values = num_rows;
187
188 int num_pages = 10;
189
190 std::vector<std::vector<uint8_t>> faux_data;
191 faux_data.resize(num_pages);
192 for (int i = 0; i < num_pages; ++i) {
193 // The pages keep getting larger
194 int page_size = (i + 1) * 64;
195 test::random_bytes(page_size, 0, &faux_data[i]);
196 }
197 for (auto codec_type : codec_types) {
198 auto codec = GetCodecFromArrow(codec_type);
199
200 std::vector<uint8_t> buffer;
201 for (int i = 0; i < num_pages; ++i) {
202 const uint8_t* data = faux_data[i].data();
203 int data_size = static_cast<int>(faux_data[i].size());
204
205 int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data);
206 buffer.resize(max_compressed_size);
207
208 int64_t actual_size;
209 ASSERT_OK(codec->Compress(data_size, data, max_compressed_size, &buffer[0],
210 &actual_size));
211
212 ASSERT_NO_FATAL_FAILURE(
213 WriteDataPageHeader(1024, data_size, static_cast<int32_t>(actual_size)));
214 out_stream_->Write(buffer.data(), actual_size);
215 }
216
217 InitSerializedPageReader(num_rows * num_pages, codec_type);
218
219 std::shared_ptr<Page> page;
220 const DataPage* data_page;
221 for (int i = 0; i < num_pages; ++i) {
222 int data_size = static_cast<int>(faux_data[i].size());
223 page = page_reader_->NextPage();
224 data_page = static_cast<const DataPage*>(page.get());
225 ASSERT_EQ(data_size, data_page->size());
226 ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size));
227 }
228
229 ResetStream();
230 }
231}
232
233TEST_F(TestPageSerde, LZONotSupported) {
234 // Must await PARQUET-530
235 int data_size = 1024;
236 std::vector<uint8_t> faux_data(data_size);
237 ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(1024, data_size, data_size));
238 out_stream_->Write(faux_data.data(), data_size);
239 ASSERT_THROW(InitSerializedPageReader(data_size, Compression::LZO), ParquetException);
240}
241
242// ----------------------------------------------------------------------
243// File structure tests
244
245class TestParquetFileReader : public ::testing::Test {
246 public:
247 void AssertInvalidFileThrows(const std::shared_ptr<Buffer>& buffer) {
248 reader_.reset(new ParquetFileReader());
249
250 auto reader = std::make_shared<BufferReader>(buffer);
251 auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader));
252
253 ASSERT_THROW(reader_->Open(ParquetFileReader::Contents::Open(std::move(wrapper))),
254 ParquetException);
255 }
256
257 protected:
258 std::unique_ptr<ParquetFileReader> reader_;
259};
260
261TEST_F(TestParquetFileReader, InvalidHeader) {
262 const char* bad_header = "PAR2";
263
264 auto buffer = Buffer::Wrap(bad_header, strlen(bad_header));
265 ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer));
266}
267
268TEST_F(TestParquetFileReader, InvalidFooter) {
269 // File is smaller than FOOTER_SIZE
270 const char* bad_file = "PAR1PAR";
271 auto buffer = Buffer::Wrap(bad_file, strlen(bad_file));
272 ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer));
273
274 // Magic number incorrect
275 const char* bad_file2 = "PAR1PAR2";
276 buffer = Buffer::Wrap(bad_file2, strlen(bad_file2));
277 ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer));
278}
279
280TEST_F(TestParquetFileReader, IncompleteMetadata) {
281 InMemoryOutputStream stream;
282
283 const char* magic = "PAR1";
284
285 stream.Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic));
286 std::vector<uint8_t> bytes(10);
287 stream.Write(bytes.data(), bytes.size());
288 uint32_t metadata_len = 24;
289 stream.Write(reinterpret_cast<const uint8_t*>(&metadata_len), sizeof(uint32_t));
290 stream.Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic));
291
292 auto buffer = stream.GetBuffer();
293 ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer));
294}
295
296} // namespace parquet
297