1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <gtest/gtest.h>
19
20#include <algorithm>
21#include <cstdint>
22#include <cstdlib>
23#include <limits>
24#include <memory>
25#include <string>
26#include <vector>
27
28#include "parquet/column_page.h"
29#include "parquet/column_reader.h"
30#include "parquet/schema.h"
31#include "parquet/test-util.h"
32#include "parquet/types.h"
33#include "parquet/util/test-common.h"
34
35using std::shared_ptr;
36using std::string;
37using std::vector;
38
39namespace parquet {
40
41using schema::NodePtr;
42
43namespace test {
44
45template <typename T>
46static inline bool vector_equal_with_def_levels(const vector<T>& left,
47 const vector<int16_t>& def_levels,
48 int16_t max_def_levels,
49 int16_t max_rep_levels,
50 const vector<T>& right) {
51 size_t i_left = 0;
52 size_t i_right = 0;
53 for (size_t i = 0; i < def_levels.size(); i++) {
54 if (def_levels[i] == max_def_levels) {
55 // Compare
56 if (left[i_left] != right[i_right]) {
57 std::cerr << "index " << i << " left was " << left[i_left] << " right was "
58 << right[i] << std::endl;
59 return false;
60 }
61 i_left++;
62 i_right++;
63 } else if (def_levels[i] == (max_def_levels - 1)) {
64 // Null entry on the lowest nested level
65 i_right++;
66 } else if (def_levels[i] < (max_def_levels - 1)) {
67 // Null entry on a higher nesting level, only supported for non-repeating data
68 if (max_rep_levels == 0) {
69 i_right++;
70 }
71 }
72 }
73
74 return true;
75}
76
77class TestPrimitiveReader : public ::testing::Test {
78 public:
79 void InitReader(const ColumnDescriptor* d) {
80 std::unique_ptr<PageReader> pager_;
81 pager_.reset(new test::MockPageReader(pages_));
82 reader_ = ColumnReader::Make(d, std::move(pager_));
83 }
84
85 void CheckResults() {
86 vector<int32_t> vresult(num_values_, -1);
87 vector<int16_t> dresult(num_levels_, -1);
88 vector<int16_t> rresult(num_levels_, -1);
89 int64_t values_read = 0;
90 int total_values_read = 0;
91 int batch_actual = 0;
92
93 Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
94 int32_t batch_size = 8;
95 int batch = 0;
96 // This will cover both the cases
97 // 1) batch_size < page_size (multiple ReadBatch from a single page)
98 // 2) batch_size > page_size (BatchRead limits to a single page)
99 do {
100 batch = static_cast<int>(reader->ReadBatch(
101 batch_size, &dresult[0] + batch_actual, &rresult[0] + batch_actual,
102 &vresult[0] + total_values_read, &values_read));
103 total_values_read += static_cast<int>(values_read);
104 batch_actual += batch;
105 batch_size = std::min(1 << 24, std::max(batch_size * 2, 4096));
106 } while (batch > 0);
107
108 ASSERT_EQ(num_levels_, batch_actual);
109 ASSERT_EQ(num_values_, total_values_read);
110 ASSERT_TRUE(vector_equal(values_, vresult));
111 if (max_def_level_ > 0) {
112 ASSERT_TRUE(vector_equal(def_levels_, dresult));
113 }
114 if (max_rep_level_ > 0) {
115 ASSERT_TRUE(vector_equal(rep_levels_, rresult));
116 }
117 // catch improper writes at EOS
118 batch_actual =
119 static_cast<int>(reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read));
120 ASSERT_EQ(0, batch_actual);
121 ASSERT_EQ(0, values_read);
122 }
123
124 void CheckResultsSpaced() {
125 vector<int32_t> vresult(num_levels_, -1);
126 vector<int16_t> dresult(num_levels_, -1);
127 vector<int16_t> rresult(num_levels_, -1);
128 vector<uint8_t> valid_bits(num_levels_, 255);
129 int total_values_read = 0;
130 int batch_actual = 0;
131 int levels_actual = 0;
132 int64_t null_count = -1;
133 int64_t levels_read = 0;
134 int64_t values_read;
135
136 Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
137 int32_t batch_size = 8;
138 int batch = 0;
139 // This will cover both the cases
140 // 1) batch_size < page_size (multiple ReadBatch from a single page)
141 // 2) batch_size > page_size (BatchRead limits to a single page)
142 do {
143 batch = static_cast<int>(reader->ReadBatchSpaced(
144 batch_size, dresult.data() + levels_actual, rresult.data() + levels_actual,
145 vresult.data() + batch_actual, valid_bits.data() + batch_actual, 0,
146 &levels_read, &values_read, &null_count));
147 total_values_read += batch - static_cast<int>(null_count);
148 batch_actual += batch;
149 levels_actual += static_cast<int>(levels_read);
150 batch_size = std::min(1 << 24, std::max(batch_size * 2, 4096));
151 } while ((batch > 0) || (levels_read > 0));
152
153 ASSERT_EQ(num_levels_, levels_actual);
154 ASSERT_EQ(num_values_, total_values_read);
155 if (max_def_level_ > 0) {
156 ASSERT_TRUE(vector_equal(def_levels_, dresult));
157 ASSERT_TRUE(vector_equal_with_def_levels(values_, dresult, max_def_level_,
158 max_rep_level_, vresult));
159 } else {
160 ASSERT_TRUE(vector_equal(values_, vresult));
161 }
162 if (max_rep_level_ > 0) {
163 ASSERT_TRUE(vector_equal(rep_levels_, rresult));
164 }
165 // catch improper writes at EOS
166 batch_actual = static_cast<int>(
167 reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr, valid_bits.data(), 0,
168 &levels_read, &values_read, &null_count));
169 ASSERT_EQ(0, batch_actual);
170 ASSERT_EQ(0, null_count);
171 }
172
173 void Clear() {
174 values_.clear();
175 def_levels_.clear();
176 rep_levels_.clear();
177 pages_.clear();
178 reader_.reset();
179 }
180
181 void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
182 num_values_ =
183 MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
184 values_, data_buffer_, pages_, Encoding::PLAIN);
185 num_levels_ = num_pages * levels_per_page;
186 InitReader(d);
187 CheckResults();
188 Clear();
189
190 num_values_ =
191 MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
192 values_, data_buffer_, pages_, Encoding::PLAIN);
193 num_levels_ = num_pages * levels_per_page;
194 InitReader(d);
195 CheckResultsSpaced();
196 Clear();
197 }
198
199 void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
200 num_values_ =
201 MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
202 values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
203 num_levels_ = num_pages * levels_per_page;
204 InitReader(d);
205 CheckResults();
206 Clear();
207
208 num_values_ =
209 MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
210 values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
211 num_levels_ = num_pages * levels_per_page;
212 InitReader(d);
213 CheckResultsSpaced();
214 Clear();
215 }
216
217 protected:
218 int num_levels_;
219 int num_values_;
220 int16_t max_def_level_;
221 int16_t max_rep_level_;
222 vector<shared_ptr<Page>> pages_;
223 std::shared_ptr<ColumnReader> reader_;
224 vector<int32_t> values_;
225 vector<int16_t> def_levels_;
226 vector<int16_t> rep_levels_;
227 vector<uint8_t> data_buffer_; // For BA and FLBA
228};
229
230TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
231 int levels_per_page = 100;
232 int num_pages = 50;
233 max_def_level_ = 0;
234 max_rep_level_ = 0;
235 NodePtr type = schema::Int32("a", Repetition::REQUIRED);
236 const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
237 ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr));
238 ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr));
239}
240
241TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
242 int levels_per_page = 100;
243 int num_pages = 50;
244 max_def_level_ = 4;
245 max_rep_level_ = 0;
246 NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
247 const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
248 ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr));
249 ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr));
250}
251
252TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
253 int levels_per_page = 100;
254 int num_pages = 50;
255 max_def_level_ = 4;
256 max_rep_level_ = 2;
257 NodePtr type = schema::Int32("c", Repetition::REPEATED);
258 const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
259 ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr));
260 ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr));
261}
262
263TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
264 int levels_per_page = 100;
265 int num_pages = 5;
266 max_def_level_ = 0;
267 max_rep_level_ = 0;
268 NodePtr type = schema::Int32("b", Repetition::REQUIRED);
269 const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
270 MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
271 values_, data_buffer_, pages_, Encoding::PLAIN);
272 InitReader(&descr);
273 vector<int32_t> vresult(levels_per_page / 2, -1);
274 vector<int16_t> dresult(levels_per_page / 2, -1);
275 vector<int16_t> rresult(levels_per_page / 2, -1);
276
277 Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
278 int64_t values_read = 0;
279
280 // 1) skip_size > page_size (multiple pages skipped)
281 // Skip first 2 pages
282 int64_t levels_skipped = reader->Skip(2 * levels_per_page);
283 ASSERT_EQ(2 * levels_per_page, levels_skipped);
284 // Read half a page
285 reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
286 &values_read);
287 vector<int32_t> sub_values(
288 values_.begin() + 2 * levels_per_page,
289 values_.begin() + static_cast<int>(2.5 * static_cast<double>(levels_per_page)));
290 ASSERT_TRUE(vector_equal(sub_values, vresult));
291
292 // 2) skip_size == page_size (skip across two pages)
293 levels_skipped = reader->Skip(levels_per_page);
294 ASSERT_EQ(levels_per_page, levels_skipped);
295 // Read half a page
296 reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
297 &values_read);
298 sub_values.clear();
299 sub_values.insert(
300 sub_values.end(),
301 values_.begin() + static_cast<int>(3.5 * static_cast<double>(levels_per_page)),
302 values_.begin() + 4 * levels_per_page);
303 ASSERT_TRUE(vector_equal(sub_values, vresult));
304
305 // 3) skip_size < page_size (skip limited to a single page)
306 // Skip half a page
307 levels_skipped = reader->Skip(levels_per_page / 2);
308 ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
309 // Read half a page
310 reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
311 &values_read);
312 sub_values.clear();
313 sub_values.insert(
314 sub_values.end(),
315 values_.begin() + static_cast<int>(4.5 * static_cast<double>(levels_per_page)),
316 values_.end());
317 ASSERT_TRUE(vector_equal(sub_values, vresult));
318
319 values_.clear();
320 def_levels_.clear();
321 rep_levels_.clear();
322 pages_.clear();
323 reader_.reset();
324}
325
326TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
327 max_def_level_ = 0;
328 max_rep_level_ = 0;
329 NodePtr type = schema::Int32("a", Repetition::REQUIRED);
330 const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
331 shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
332
333 shared_ptr<DictionaryPage> dict_page =
334 std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
335 shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
336 &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
337 pages_.push_back(dict_page);
338 pages_.push_back(data_page);
339 InitReader(&descr);
340 // Tests Dict : PLAIN, Data : RLE_DICTIONARY
341 ASSERT_NO_THROW(reader_->HasNext());
342 pages_.clear();
343
344 dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
345 data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0,
346 {}, 0, {}, 0);
347 pages_.push_back(dict_page);
348 pages_.push_back(data_page);
349 InitReader(&descr);
350 // Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY
351 ASSERT_NO_THROW(reader_->HasNext());
352 pages_.clear();
353
354 data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {},
355 0, {}, 0);
356 pages_.push_back(data_page);
357 InitReader(&descr);
358 // Tests dictionary page must occur before data page
359 ASSERT_THROW(reader_->HasNext(), ParquetException);
360 pages_.clear();
361
362 dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::DELTA_BYTE_ARRAY);
363 pages_.push_back(dict_page);
364 InitReader(&descr);
365 // Tests only RLE_DICTIONARY is supported
366 ASSERT_THROW(reader_->HasNext(), ParquetException);
367 pages_.clear();
368
369 shared_ptr<DictionaryPage> dict_page1 =
370 std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
371 shared_ptr<DictionaryPage> dict_page2 =
372 std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
373 pages_.push_back(dict_page1);
374 pages_.push_back(dict_page2);
375 InitReader(&descr);
376 // Column cannot have more than one dictionary
377 ASSERT_THROW(reader_->HasNext(), ParquetException);
378 pages_.clear();
379
380 data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0,
381 {}, 0, {}, 0);
382 pages_.push_back(data_page);
383 InitReader(&descr);
384 // unsupported encoding
385 ASSERT_THROW(reader_->HasNext(), ParquetException);
386 pages_.clear();
387}
388
389TEST(TestColumnReader, DefinitionLevelsToBitmap) {
390 // Bugs in this function were exposed in ARROW-3930
391 std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3};
392 std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1};
393
394 std::vector<uint8_t> valid_bits(2, 0);
395
396 const int max_def_level = 3;
397 const int max_rep_level = 1;
398
399 int64_t values_read = -1;
400 int64_t null_count = 0;
401 internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level,
402 &values_read, &null_count, valid_bits.data(),
403 0 /* valid_bits_offset */);
404 ASSERT_EQ(9, values_read);
405 ASSERT_EQ(1, null_count);
406
407 // Call again with 0 definition levels, make sure that valid_bits is unmodifed
408 const uint8_t current_byte = valid_bits[1];
409 null_count = 0;
410 internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level,
411 &values_read, &null_count, valid_bits.data(),
412 9 /* valid_bits_offset */);
413 ASSERT_EQ(0, values_read);
414 ASSERT_EQ(0, null_count);
415 ASSERT_EQ(current_byte, valid_bits[1]);
416}
417
418} // namespace test
419} // namespace parquet
420