1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <gtest/gtest.h> |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstdlib> |
23 | #include <limits> |
24 | #include <memory> |
25 | #include <string> |
26 | #include <vector> |
27 | |
28 | #include "parquet/column_page.h" |
29 | #include "parquet/column_reader.h" |
30 | #include "parquet/schema.h" |
31 | #include "parquet/test-util.h" |
32 | #include "parquet/types.h" |
33 | #include "parquet/util/test-common.h" |
34 | |
35 | using std::shared_ptr; |
36 | using std::string; |
37 | using std::vector; |
38 | |
39 | namespace parquet { |
40 | |
41 | using schema::NodePtr; |
42 | |
43 | namespace test { |
44 | |
45 | template <typename T> |
46 | static inline bool vector_equal_with_def_levels(const vector<T>& left, |
47 | const vector<int16_t>& def_levels, |
48 | int16_t max_def_levels, |
49 | int16_t max_rep_levels, |
50 | const vector<T>& right) { |
51 | size_t i_left = 0; |
52 | size_t i_right = 0; |
53 | for (size_t i = 0; i < def_levels.size(); i++) { |
54 | if (def_levels[i] == max_def_levels) { |
55 | // Compare |
56 | if (left[i_left] != right[i_right]) { |
57 | std::cerr << "index " << i << " left was " << left[i_left] << " right was " |
58 | << right[i] << std::endl; |
59 | return false; |
60 | } |
61 | i_left++; |
62 | i_right++; |
63 | } else if (def_levels[i] == (max_def_levels - 1)) { |
64 | // Null entry on the lowest nested level |
65 | i_right++; |
66 | } else if (def_levels[i] < (max_def_levels - 1)) { |
67 | // Null entry on a higher nesting level, only supported for non-repeating data |
68 | if (max_rep_levels == 0) { |
69 | i_right++; |
70 | } |
71 | } |
72 | } |
73 | |
74 | return true; |
75 | } |
76 | |
77 | class TestPrimitiveReader : public ::testing::Test { |
78 | public: |
79 | void InitReader(const ColumnDescriptor* d) { |
80 | std::unique_ptr<PageReader> ; |
81 | pager_.reset(new test::MockPageReader(pages_)); |
82 | reader_ = ColumnReader::Make(d, std::move(pager_)); |
83 | } |
84 | |
85 | void CheckResults() { |
86 | vector<int32_t> vresult(num_values_, -1); |
87 | vector<int16_t> dresult(num_levels_, -1); |
88 | vector<int16_t> rresult(num_levels_, -1); |
89 | int64_t values_read = 0; |
90 | int total_values_read = 0; |
91 | int batch_actual = 0; |
92 | |
93 | Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); |
94 | int32_t batch_size = 8; |
95 | int batch = 0; |
96 | // This will cover both the cases |
97 | // 1) batch_size < page_size (multiple ReadBatch from a single page) |
98 | // 2) batch_size > page_size (BatchRead limits to a single page) |
99 | do { |
100 | batch = static_cast<int>(reader->ReadBatch( |
101 | batch_size, &dresult[0] + batch_actual, &rresult[0] + batch_actual, |
102 | &vresult[0] + total_values_read, &values_read)); |
103 | total_values_read += static_cast<int>(values_read); |
104 | batch_actual += batch; |
105 | batch_size = std::min(1 << 24, std::max(batch_size * 2, 4096)); |
106 | } while (batch > 0); |
107 | |
108 | ASSERT_EQ(num_levels_, batch_actual); |
109 | ASSERT_EQ(num_values_, total_values_read); |
110 | ASSERT_TRUE(vector_equal(values_, vresult)); |
111 | if (max_def_level_ > 0) { |
112 | ASSERT_TRUE(vector_equal(def_levels_, dresult)); |
113 | } |
114 | if (max_rep_level_ > 0) { |
115 | ASSERT_TRUE(vector_equal(rep_levels_, rresult)); |
116 | } |
117 | // catch improper writes at EOS |
118 | batch_actual = |
119 | static_cast<int>(reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read)); |
120 | ASSERT_EQ(0, batch_actual); |
121 | ASSERT_EQ(0, values_read); |
122 | } |
123 | |
124 | void CheckResultsSpaced() { |
125 | vector<int32_t> vresult(num_levels_, -1); |
126 | vector<int16_t> dresult(num_levels_, -1); |
127 | vector<int16_t> rresult(num_levels_, -1); |
128 | vector<uint8_t> valid_bits(num_levels_, 255); |
129 | int total_values_read = 0; |
130 | int batch_actual = 0; |
131 | int levels_actual = 0; |
132 | int64_t null_count = -1; |
133 | int64_t levels_read = 0; |
134 | int64_t values_read; |
135 | |
136 | Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); |
137 | int32_t batch_size = 8; |
138 | int batch = 0; |
139 | // This will cover both the cases |
140 | // 1) batch_size < page_size (multiple ReadBatch from a single page) |
141 | // 2) batch_size > page_size (BatchRead limits to a single page) |
142 | do { |
143 | batch = static_cast<int>(reader->ReadBatchSpaced( |
144 | batch_size, dresult.data() + levels_actual, rresult.data() + levels_actual, |
145 | vresult.data() + batch_actual, valid_bits.data() + batch_actual, 0, |
146 | &levels_read, &values_read, &null_count)); |
147 | total_values_read += batch - static_cast<int>(null_count); |
148 | batch_actual += batch; |
149 | levels_actual += static_cast<int>(levels_read); |
150 | batch_size = std::min(1 << 24, std::max(batch_size * 2, 4096)); |
151 | } while ((batch > 0) || (levels_read > 0)); |
152 | |
153 | ASSERT_EQ(num_levels_, levels_actual); |
154 | ASSERT_EQ(num_values_, total_values_read); |
155 | if (max_def_level_ > 0) { |
156 | ASSERT_TRUE(vector_equal(def_levels_, dresult)); |
157 | ASSERT_TRUE(vector_equal_with_def_levels(values_, dresult, max_def_level_, |
158 | max_rep_level_, vresult)); |
159 | } else { |
160 | ASSERT_TRUE(vector_equal(values_, vresult)); |
161 | } |
162 | if (max_rep_level_ > 0) { |
163 | ASSERT_TRUE(vector_equal(rep_levels_, rresult)); |
164 | } |
165 | // catch improper writes at EOS |
166 | batch_actual = static_cast<int>( |
167 | reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr, valid_bits.data(), 0, |
168 | &levels_read, &values_read, &null_count)); |
169 | ASSERT_EQ(0, batch_actual); |
170 | ASSERT_EQ(0, null_count); |
171 | } |
172 | |
173 | void Clear() { |
174 | values_.clear(); |
175 | def_levels_.clear(); |
176 | rep_levels_.clear(); |
177 | pages_.clear(); |
178 | reader_.reset(); |
179 | } |
180 | |
181 | void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) { |
182 | num_values_ = |
183 | MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_, |
184 | values_, data_buffer_, pages_, Encoding::PLAIN); |
185 | num_levels_ = num_pages * levels_per_page; |
186 | InitReader(d); |
187 | CheckResults(); |
188 | Clear(); |
189 | |
190 | num_values_ = |
191 | MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_, |
192 | values_, data_buffer_, pages_, Encoding::PLAIN); |
193 | num_levels_ = num_pages * levels_per_page; |
194 | InitReader(d); |
195 | CheckResultsSpaced(); |
196 | Clear(); |
197 | } |
198 | |
199 | void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) { |
200 | num_values_ = |
201 | MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_, |
202 | values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY); |
203 | num_levels_ = num_pages * levels_per_page; |
204 | InitReader(d); |
205 | CheckResults(); |
206 | Clear(); |
207 | |
208 | num_values_ = |
209 | MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_, |
210 | values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY); |
211 | num_levels_ = num_pages * levels_per_page; |
212 | InitReader(d); |
213 | CheckResultsSpaced(); |
214 | Clear(); |
215 | } |
216 | |
217 | protected: |
218 | int num_levels_; |
219 | int num_values_; |
220 | int16_t max_def_level_; |
221 | int16_t max_rep_level_; |
222 | vector<shared_ptr<Page>> pages_; |
223 | std::shared_ptr<ColumnReader> reader_; |
224 | vector<int32_t> values_; |
225 | vector<int16_t> def_levels_; |
226 | vector<int16_t> rep_levels_; |
227 | vector<uint8_t> data_buffer_; // For BA and FLBA |
228 | }; |
229 | |
230 | TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { |
231 | int levels_per_page = 100; |
232 | int num_pages = 50; |
233 | max_def_level_ = 0; |
234 | max_rep_level_ = 0; |
235 | NodePtr type = schema::Int32("a" , Repetition::REQUIRED); |
236 | const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); |
237 | ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr)); |
238 | ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr)); |
239 | } |
240 | |
241 | TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { |
242 | int levels_per_page = 100; |
243 | int num_pages = 50; |
244 | max_def_level_ = 4; |
245 | max_rep_level_ = 0; |
246 | NodePtr type = schema::Int32("b" , Repetition::OPTIONAL); |
247 | const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); |
248 | ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr)); |
249 | ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr)); |
250 | } |
251 | |
252 | TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { |
253 | int levels_per_page = 100; |
254 | int num_pages = 50; |
255 | max_def_level_ = 4; |
256 | max_rep_level_ = 2; |
257 | NodePtr type = schema::Int32("c" , Repetition::REPEATED); |
258 | const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); |
259 | ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr)); |
260 | ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr)); |
261 | } |
262 | |
263 | TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) { |
264 | int levels_per_page = 100; |
265 | int num_pages = 5; |
266 | max_def_level_ = 0; |
267 | max_rep_level_ = 0; |
268 | NodePtr type = schema::Int32("b" , Repetition::REQUIRED); |
269 | const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); |
270 | MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_, |
271 | values_, data_buffer_, pages_, Encoding::PLAIN); |
272 | InitReader(&descr); |
273 | vector<int32_t> vresult(levels_per_page / 2, -1); |
274 | vector<int16_t> dresult(levels_per_page / 2, -1); |
275 | vector<int16_t> rresult(levels_per_page / 2, -1); |
276 | |
277 | Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); |
278 | int64_t values_read = 0; |
279 | |
280 | // 1) skip_size > page_size (multiple pages skipped) |
281 | // Skip first 2 pages |
282 | int64_t levels_skipped = reader->Skip(2 * levels_per_page); |
283 | ASSERT_EQ(2 * levels_per_page, levels_skipped); |
284 | // Read half a page |
285 | reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), |
286 | &values_read); |
287 | vector<int32_t> sub_values( |
288 | values_.begin() + 2 * levels_per_page, |
289 | values_.begin() + static_cast<int>(2.5 * static_cast<double>(levels_per_page))); |
290 | ASSERT_TRUE(vector_equal(sub_values, vresult)); |
291 | |
292 | // 2) skip_size == page_size (skip across two pages) |
293 | levels_skipped = reader->Skip(levels_per_page); |
294 | ASSERT_EQ(levels_per_page, levels_skipped); |
295 | // Read half a page |
296 | reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), |
297 | &values_read); |
298 | sub_values.clear(); |
299 | sub_values.insert( |
300 | sub_values.end(), |
301 | values_.begin() + static_cast<int>(3.5 * static_cast<double>(levels_per_page)), |
302 | values_.begin() + 4 * levels_per_page); |
303 | ASSERT_TRUE(vector_equal(sub_values, vresult)); |
304 | |
305 | // 3) skip_size < page_size (skip limited to a single page) |
306 | // Skip half a page |
307 | levels_skipped = reader->Skip(levels_per_page / 2); |
308 | ASSERT_EQ(0.5 * levels_per_page, levels_skipped); |
309 | // Read half a page |
310 | reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), |
311 | &values_read); |
312 | sub_values.clear(); |
313 | sub_values.insert( |
314 | sub_values.end(), |
315 | values_.begin() + static_cast<int>(4.5 * static_cast<double>(levels_per_page)), |
316 | values_.end()); |
317 | ASSERT_TRUE(vector_equal(sub_values, vresult)); |
318 | |
319 | values_.clear(); |
320 | def_levels_.clear(); |
321 | rep_levels_.clear(); |
322 | pages_.clear(); |
323 | reader_.reset(); |
324 | } |
325 | |
326 | TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) { |
327 | max_def_level_ = 0; |
328 | max_rep_level_ = 0; |
329 | NodePtr type = schema::Int32("a" , Repetition::REQUIRED); |
330 | const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); |
331 | shared_ptr<ResizableBuffer> dummy = AllocateBuffer(); |
332 | |
333 | shared_ptr<DictionaryPage> dict_page = |
334 | std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN); |
335 | shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>( |
336 | &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0); |
337 | pages_.push_back(dict_page); |
338 | pages_.push_back(data_page); |
339 | InitReader(&descr); |
340 | // Tests Dict : PLAIN, Data : RLE_DICTIONARY |
341 | ASSERT_NO_THROW(reader_->HasNext()); |
342 | pages_.clear(); |
343 | |
344 | dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY); |
345 | data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0, |
346 | {}, 0, {}, 0); |
347 | pages_.push_back(dict_page); |
348 | pages_.push_back(data_page); |
349 | InitReader(&descr); |
350 | // Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY |
351 | ASSERT_NO_THROW(reader_->HasNext()); |
352 | pages_.clear(); |
353 | |
354 | data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, |
355 | 0, {}, 0); |
356 | pages_.push_back(data_page); |
357 | InitReader(&descr); |
358 | // Tests dictionary page must occur before data page |
359 | ASSERT_THROW(reader_->HasNext(), ParquetException); |
360 | pages_.clear(); |
361 | |
362 | dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::DELTA_BYTE_ARRAY); |
363 | pages_.push_back(dict_page); |
364 | InitReader(&descr); |
365 | // Tests only RLE_DICTIONARY is supported |
366 | ASSERT_THROW(reader_->HasNext(), ParquetException); |
367 | pages_.clear(); |
368 | |
369 | shared_ptr<DictionaryPage> dict_page1 = |
370 | std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY); |
371 | shared_ptr<DictionaryPage> dict_page2 = |
372 | std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN); |
373 | pages_.push_back(dict_page1); |
374 | pages_.push_back(dict_page2); |
375 | InitReader(&descr); |
376 | // Column cannot have more than one dictionary |
377 | ASSERT_THROW(reader_->HasNext(), ParquetException); |
378 | pages_.clear(); |
379 | |
380 | data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0, |
381 | {}, 0, {}, 0); |
382 | pages_.push_back(data_page); |
383 | InitReader(&descr); |
384 | // unsupported encoding |
385 | ASSERT_THROW(reader_->HasNext(), ParquetException); |
386 | pages_.clear(); |
387 | } |
388 | |
389 | TEST(TestColumnReader, DefinitionLevelsToBitmap) { |
390 | // Bugs in this function were exposed in ARROW-3930 |
391 | std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3}; |
392 | std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1}; |
393 | |
394 | std::vector<uint8_t> valid_bits(2, 0); |
395 | |
396 | const int max_def_level = 3; |
397 | const int max_rep_level = 1; |
398 | |
399 | int64_t values_read = -1; |
400 | int64_t null_count = 0; |
401 | internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level, |
402 | &values_read, &null_count, valid_bits.data(), |
403 | 0 /* valid_bits_offset */); |
404 | ASSERT_EQ(9, values_read); |
405 | ASSERT_EQ(1, null_count); |
406 | |
407 | // Call again with 0 definition levels, make sure that valid_bits is unmodifed |
408 | const uint8_t current_byte = valid_bits[1]; |
409 | null_count = 0; |
410 | internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level, |
411 | &values_read, &null_count, valid_bits.data(), |
412 | 9 /* valid_bits_offset */); |
413 | ASSERT_EQ(0, values_read); |
414 | ASSERT_EQ(0, null_count); |
415 | ASSERT_EQ(current_byte, valid_bits[1]); |
416 | } |
417 | |
418 | } // namespace test |
419 | } // namespace parquet |
420 | |