1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <gtest/gtest.h>
19
20#include <algorithm>
21#include <cstdint>
22#include <cstdlib>
23#include <memory>
24#include <string>
25#include <vector>
26
27#include "parquet/column_page.h"
28#include "parquet/column_scanner.h"
29#include "parquet/schema.h"
30#include "parquet/test-specialization.h"
31#include "parquet/test-util.h"
32#include "parquet/types.h"
33#include "parquet/util/test-common.h"
34
35using std::shared_ptr;
36using std::string;
37using std::vector;
38
39namespace parquet {
40
41using schema::NodePtr;
42
43namespace test {
44
45template <>
46void InitDictValues<bool>(int num_values, int dict_per_page, vector<bool>& values,
47 vector<uint8_t>& buffer) {
48 // No op for bool
49}
50
51template <typename Type>
52class TestFlatScanner : public ::testing::Test {
53 public:
54 typedef typename Type::c_type T;
55
56 void InitScanner(const ColumnDescriptor* d) {
57 std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_));
58 scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager)));
59 }
60
61 void CheckResults(int batch_size, const ColumnDescriptor* d) {
62 TypedScanner<Type>* scanner = reinterpret_cast<TypedScanner<Type>*>(scanner_.get());
63 T val;
64 bool is_null = false;
65 int16_t def_level;
66 int16_t rep_level;
67 int j = 0;
68 scanner->SetBatchSize(batch_size);
69 for (int i = 0; i < num_levels_; i++) {
70 ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j;
71 if (!is_null) {
72 ASSERT_EQ(values_[j], val) << i << "V" << j;
73 j++;
74 }
75 if (d->max_definition_level() > 0) {
76 ASSERT_EQ(def_levels_[i], def_level) << i << "D" << j;
77 }
78 if (d->max_repetition_level() > 0) {
79 ASSERT_EQ(rep_levels_[i], rep_level) << i << "R" << j;
80 }
81 }
82 ASSERT_EQ(num_values_, j);
83 ASSERT_FALSE(scanner->Next(&val, &def_level, &rep_level, &is_null));
84 }
85
86 void Clear() {
87 pages_.clear();
88 values_.clear();
89 def_levels_.clear();
90 rep_levels_.clear();
91 }
92
93 void Execute(int num_pages, int levels_per_page, int batch_size,
94 const ColumnDescriptor* d, Encoding::type encoding) {
95 num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
96 values_, data_buffer_, pages_, encoding);
97 num_levels_ = num_pages * levels_per_page;
98 InitScanner(d);
99 CheckResults(batch_size, d);
100 Clear();
101 }
102
103 void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
104 std::shared_ptr<ColumnDescriptor>& d2,
105 std::shared_ptr<ColumnDescriptor>& d3, int length) {
106 NodePtr type;
107 type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num,
108 LogicalType::NONE, length);
109 d1.reset(new ColumnDescriptor(type, 0, 0));
110 type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num,
111 LogicalType::NONE, length);
112 d2.reset(new ColumnDescriptor(type, 4, 0));
113 type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num,
114 LogicalType::NONE, length);
115 d3.reset(new ColumnDescriptor(type, 4, 2));
116 }
117
118 void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length,
119 Encoding::type encoding = Encoding::PLAIN) {
120 std::shared_ptr<ColumnDescriptor> d1;
121 std::shared_ptr<ColumnDescriptor> d2;
122 std::shared_ptr<ColumnDescriptor> d3;
123 InitDescriptors(d1, d2, d3, type_length);
124 // evaluate REQUIRED pages
125 Execute(num_pages, num_levels, batch_size, d1.get(), encoding);
126 // evaluate OPTIONAL pages
127 Execute(num_pages, num_levels, batch_size, d2.get(), encoding);
128 // evaluate REPEATED pages
129 Execute(num_pages, num_levels, batch_size, d3.get(), encoding);
130 }
131
132 protected:
133 int num_levels_;
134 int num_values_;
135 vector<shared_ptr<Page>> pages_;
136 std::shared_ptr<Scanner> scanner_;
137 vector<T> values_;
138 vector<int16_t> def_levels_;
139 vector<int16_t> rep_levels_;
140 vector<uint8_t> data_buffer_; // For BA and FLBA
141};
142
143static int num_levels_per_page = 100;
144static int num_pages = 20;
145static int batch_size = 32;
146
147typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
148 ByteArrayType>
149 TestTypes;
150
151using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
152using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;
153
154TYPED_TEST_CASE(TestFlatScanner, TestTypes);
155
156TYPED_TEST(TestFlatScanner, TestPlainScanner) {
157 ASSERT_NO_FATAL_FAILURE(
158 this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN));
159}
160
161TYPED_TEST(TestFlatScanner, TestDictScanner) {
162 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0,
163 Encoding::RLE_DICTIONARY));
164}
165
166TEST_F(TestBooleanFlatScanner, TestPlainScanner) {
167 ASSERT_NO_FATAL_FAILURE(
168 this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0));
169}
170
171TEST_F(TestFLBAFlatScanner, TestPlainScanner) {
172 ASSERT_NO_FATAL_FAILURE(
173 this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH));
174}
175
176TEST_F(TestFLBAFlatScanner, TestDictScanner) {
177 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size,
178 FLBA_LENGTH, Encoding::RLE_DICTIONARY));
179}
180
181TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
182 ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size,
183 FLBA_LENGTH, Encoding::PLAIN_DICTIONARY));
184}
185
186// PARQUET 502
187TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
188 NodePtr type =
189 schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
190 LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
191 const ColumnDescriptor d(type, 0, 0);
192 num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
193 data_buffer_, pages_);
194 num_levels_ = 1 * 100;
195 InitScanner(&d);
196 ASSERT_NO_FATAL_FAILURE(CheckResults(1, &d));
197}
198
199TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
200 NodePtr type =
201 schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
202 LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
203 const ColumnDescriptor d(type, 4, 0);
204 num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
205 data_buffer_, pages_);
206 num_levels_ = 1 * 100;
207 InitScanner(&d);
208 TypedScanner<FLBAType>* scanner =
209 reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
210 ASSERT_EQ(10, scanner->descr()->type_precision());
211 ASSERT_EQ(2, scanner->descr()->type_scale());
212 ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length());
213}
214
215TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
216 NodePtr type =
217 schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
218 LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
219 const ColumnDescriptor d(type, 4, 0);
220 num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
221 data_buffer_, pages_);
222 num_levels_ = 1 * 100;
223 InitScanner(&d);
224 TypedScanner<FLBAType>* scanner =
225 reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
226 scanner->SetBatchSize(batch_size);
227 std::stringstream ss_fail;
228 for (int i = 0; i < num_levels_; i++) {
229 std::stringstream ss;
230 scanner->PrintNext(ss, 17);
231 std::string result = ss.str();
232 ASSERT_LE(17, result.size()) << i;
233 }
234 ASSERT_THROW(scanner->PrintNext(ss_fail, 17), ParquetException);
235}
236
237} // namespace test
238} // namespace parquet
239