1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <gtest/gtest.h> |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstdlib> |
23 | #include <memory> |
24 | #include <string> |
25 | #include <vector> |
26 | |
27 | #include "parquet/column_page.h" |
28 | #include "parquet/column_scanner.h" |
29 | #include "parquet/schema.h" |
30 | #include "parquet/test-specialization.h" |
31 | #include "parquet/test-util.h" |
32 | #include "parquet/types.h" |
33 | #include "parquet/util/test-common.h" |
34 | |
35 | using std::shared_ptr; |
36 | using std::string; |
37 | using std::vector; |
38 | |
39 | namespace parquet { |
40 | |
41 | using schema::NodePtr; |
42 | |
43 | namespace test { |
44 | |
45 | template <> |
46 | void InitDictValues<bool>(int num_values, int dict_per_page, vector<bool>& values, |
47 | vector<uint8_t>& buffer) { |
48 | // No op for bool |
49 | } |
50 | |
51 | template <typename Type> |
52 | class TestFlatScanner : public ::testing::Test { |
53 | public: |
54 | typedef typename Type::c_type T; |
55 | |
56 | void InitScanner(const ColumnDescriptor* d) { |
57 | std::unique_ptr<PageReader> (new test::MockPageReader(pages_)); |
58 | scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager))); |
59 | } |
60 | |
61 | void CheckResults(int batch_size, const ColumnDescriptor* d) { |
62 | TypedScanner<Type>* scanner = reinterpret_cast<TypedScanner<Type>*>(scanner_.get()); |
63 | T val; |
64 | bool is_null = false; |
65 | int16_t def_level; |
66 | int16_t rep_level; |
67 | int j = 0; |
68 | scanner->SetBatchSize(batch_size); |
69 | for (int i = 0; i < num_levels_; i++) { |
70 | ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j; |
71 | if (!is_null) { |
72 | ASSERT_EQ(values_[j], val) << i << "V" << j; |
73 | j++; |
74 | } |
75 | if (d->max_definition_level() > 0) { |
76 | ASSERT_EQ(def_levels_[i], def_level) << i << "D" << j; |
77 | } |
78 | if (d->max_repetition_level() > 0) { |
79 | ASSERT_EQ(rep_levels_[i], rep_level) << i << "R" << j; |
80 | } |
81 | } |
82 | ASSERT_EQ(num_values_, j); |
83 | ASSERT_FALSE(scanner->Next(&val, &def_level, &rep_level, &is_null)); |
84 | } |
85 | |
86 | void Clear() { |
87 | pages_.clear(); |
88 | values_.clear(); |
89 | def_levels_.clear(); |
90 | rep_levels_.clear(); |
91 | } |
92 | |
93 | void Execute(int num_pages, int levels_per_page, int batch_size, |
94 | const ColumnDescriptor* d, Encoding::type encoding) { |
95 | num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_, |
96 | values_, data_buffer_, pages_, encoding); |
97 | num_levels_ = num_pages * levels_per_page; |
98 | InitScanner(d); |
99 | CheckResults(batch_size, d); |
100 | Clear(); |
101 | } |
102 | |
103 | void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1, |
104 | std::shared_ptr<ColumnDescriptor>& d2, |
105 | std::shared_ptr<ColumnDescriptor>& d3, int length) { |
106 | NodePtr type; |
107 | type = schema::PrimitiveNode::Make("c1" , Repetition::REQUIRED, Type::type_num, |
108 | LogicalType::NONE, length); |
109 | d1.reset(new ColumnDescriptor(type, 0, 0)); |
110 | type = schema::PrimitiveNode::Make("c2" , Repetition::OPTIONAL, Type::type_num, |
111 | LogicalType::NONE, length); |
112 | d2.reset(new ColumnDescriptor(type, 4, 0)); |
113 | type = schema::PrimitiveNode::Make("c3" , Repetition::REPEATED, Type::type_num, |
114 | LogicalType::NONE, length); |
115 | d3.reset(new ColumnDescriptor(type, 4, 2)); |
116 | } |
117 | |
118 | void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length, |
119 | Encoding::type encoding = Encoding::PLAIN) { |
120 | std::shared_ptr<ColumnDescriptor> d1; |
121 | std::shared_ptr<ColumnDescriptor> d2; |
122 | std::shared_ptr<ColumnDescriptor> d3; |
123 | InitDescriptors(d1, d2, d3, type_length); |
124 | // evaluate REQUIRED pages |
125 | Execute(num_pages, num_levels, batch_size, d1.get(), encoding); |
126 | // evaluate OPTIONAL pages |
127 | Execute(num_pages, num_levels, batch_size, d2.get(), encoding); |
128 | // evaluate REPEATED pages |
129 | Execute(num_pages, num_levels, batch_size, d3.get(), encoding); |
130 | } |
131 | |
132 | protected: |
133 | int num_levels_; |
134 | int num_values_; |
135 | vector<shared_ptr<Page>> pages_; |
136 | std::shared_ptr<Scanner> scanner_; |
137 | vector<T> values_; |
138 | vector<int16_t> def_levels_; |
139 | vector<int16_t> rep_levels_; |
140 | vector<uint8_t> data_buffer_; // For BA and FLBA |
141 | }; |
142 | |
143 | static int num_levels_per_page = 100; |
144 | static int num_pages = 20; |
145 | static int batch_size = 32; |
146 | |
147 | typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, |
148 | ByteArrayType> |
149 | TestTypes; |
150 | |
151 | using TestBooleanFlatScanner = TestFlatScanner<BooleanType>; |
152 | using TestFLBAFlatScanner = TestFlatScanner<FLBAType>; |
153 | |
154 | TYPED_TEST_CASE(TestFlatScanner, TestTypes); |
155 | |
156 | TYPED_TEST(TestFlatScanner, TestPlainScanner) { |
157 | ASSERT_NO_FATAL_FAILURE( |
158 | this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN)); |
159 | } |
160 | |
161 | TYPED_TEST(TestFlatScanner, TestDictScanner) { |
162 | ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, |
163 | Encoding::RLE_DICTIONARY)); |
164 | } |
165 | |
166 | TEST_F(TestBooleanFlatScanner, TestPlainScanner) { |
167 | ASSERT_NO_FATAL_FAILURE( |
168 | this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0)); |
169 | } |
170 | |
171 | TEST_F(TestFLBAFlatScanner, TestPlainScanner) { |
172 | ASSERT_NO_FATAL_FAILURE( |
173 | this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH)); |
174 | } |
175 | |
176 | TEST_F(TestFLBAFlatScanner, TestDictScanner) { |
177 | ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, |
178 | FLBA_LENGTH, Encoding::RLE_DICTIONARY)); |
179 | } |
180 | |
181 | TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) { |
182 | ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, |
183 | FLBA_LENGTH, Encoding::PLAIN_DICTIONARY)); |
184 | } |
185 | |
186 | // PARQUET 502 |
187 | TEST_F(TestFLBAFlatScanner, TestSmallBatch) { |
188 | NodePtr type = |
189 | schema::PrimitiveNode::Make("c1" , Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, |
190 | LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); |
191 | const ColumnDescriptor d(type, 0, 0); |
192 | num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, |
193 | data_buffer_, pages_); |
194 | num_levels_ = 1 * 100; |
195 | InitScanner(&d); |
196 | ASSERT_NO_FATAL_FAILURE(CheckResults(1, &d)); |
197 | } |
198 | |
199 | TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) { |
200 | NodePtr type = |
201 | schema::PrimitiveNode::Make("c1" , Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, |
202 | LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); |
203 | const ColumnDescriptor d(type, 4, 0); |
204 | num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, |
205 | data_buffer_, pages_); |
206 | num_levels_ = 1 * 100; |
207 | InitScanner(&d); |
208 | TypedScanner<FLBAType>* scanner = |
209 | reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get()); |
210 | ASSERT_EQ(10, scanner->descr()->type_precision()); |
211 | ASSERT_EQ(2, scanner->descr()->type_scale()); |
212 | ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length()); |
213 | } |
214 | |
215 | TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) { |
216 | NodePtr type = |
217 | schema::PrimitiveNode::Make("c1" , Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, |
218 | LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); |
219 | const ColumnDescriptor d(type, 4, 0); |
220 | num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, |
221 | data_buffer_, pages_); |
222 | num_levels_ = 1 * 100; |
223 | InitScanner(&d); |
224 | TypedScanner<FLBAType>* scanner = |
225 | reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get()); |
226 | scanner->SetBatchSize(batch_size); |
227 | std::stringstream ss_fail; |
228 | for (int i = 0; i < num_levels_; i++) { |
229 | std::stringstream ss; |
230 | scanner->PrintNext(ss, 17); |
231 | std::string result = ss.str(); |
232 | ASSERT_LE(17, result.size()) << i; |
233 | } |
234 | ASSERT_THROW(scanner->PrintNext(ss_fail, 17), ParquetException); |
235 | } |
236 | |
237 | } // namespace test |
238 | } // namespace parquet |
239 | |