1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <gtest/gtest.h>
19
20#include "parquet/column_reader.h"
21#include "parquet/column_writer.h"
22#include "parquet/file_reader.h"
23#include "parquet/file_writer.h"
24#include "parquet/test-specialization.h"
25#include "parquet/test-util.h"
26#include "parquet/types.h"
27#include "parquet/util/memory.h"
28
29namespace parquet {
30
31using schema::GroupNode;
32using schema::NodePtr;
33using schema::PrimitiveNode;
34
35namespace test {
36
37template <typename TestType>
38class TestSerialize : public PrimitiveTypedTest<TestType> {
39 public:
40 typedef typename TestType::c_type T;
41
42 void SetUp() {
43 num_columns_ = 4;
44 num_rowgroups_ = 4;
45 rows_per_rowgroup_ = 50;
46 rows_per_batch_ = 10;
47 this->SetUpSchema(Repetition::OPTIONAL, num_columns_);
48 }
49
50 protected:
51 int num_columns_;
52 int num_rowgroups_;
53 int rows_per_rowgroup_;
54 int rows_per_batch_;
55
56 void FileSerializeTest(Compression::type codec_type) {
57 std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
58 auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
59
60 WriterProperties::Builder prop_builder;
61
62 for (int i = 0; i < num_columns_; ++i) {
63 prop_builder.compression(this->schema_.Column(i)->name(), codec_type);
64 }
65 std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
66
67 auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
68 this->GenerateData(rows_per_rowgroup_);
69 for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
70 RowGroupWriter* row_group_writer;
71 row_group_writer = file_writer->AppendRowGroup();
72 for (int col = 0; col < num_columns_; ++col) {
73 auto column_writer =
74 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
75 column_writer->WriteBatch(rows_per_rowgroup_, this->def_levels_.data(), nullptr,
76 this->values_ptr_);
77 column_writer->Close();
78 // Ensure column() API which is specific to BufferedRowGroup cannot be called
79 ASSERT_THROW(row_group_writer->column(col), ParquetException);
80 }
81
82 row_group_writer->Close();
83 }
84 // Write half BufferedRowGroups
85 for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
86 RowGroupWriter* row_group_writer;
87 row_group_writer = file_writer->AppendBufferedRowGroup();
88 for (int batch = 0; batch < (rows_per_rowgroup_ / rows_per_batch_); ++batch) {
89 for (int col = 0; col < num_columns_; ++col) {
90 auto column_writer =
91 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
92 column_writer->WriteBatch(
93 rows_per_batch_, this->def_levels_.data() + (batch * rows_per_batch_),
94 nullptr, this->values_ptr_ + (batch * rows_per_batch_));
95 // Ensure NextColumn() API which is specific to RowGroup cannot be called
96 ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
97 }
98 }
99 for (int col = 0; col < num_columns_; ++col) {
100 auto column_writer =
101 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
102 column_writer->Close();
103 }
104 row_group_writer->Close();
105 }
106 file_writer->Close();
107
108 auto buffer = sink->GetBuffer();
109 int num_rows_ = num_rowgroups_ * rows_per_rowgroup_;
110
111 auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
112 auto file_reader = ParquetFileReader::Open(source);
113 ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns());
114 ASSERT_EQ(num_rowgroups_, file_reader->metadata()->num_row_groups());
115 ASSERT_EQ(num_rows_, file_reader->metadata()->num_rows());
116
117 for (int rg = 0; rg < num_rowgroups_; ++rg) {
118 auto rg_reader = file_reader->RowGroup(rg);
119 ASSERT_EQ(num_columns_, rg_reader->metadata()->num_columns());
120 ASSERT_EQ(rows_per_rowgroup_, rg_reader->metadata()->num_rows());
121 // Check that the specified compression was actually used.
122 ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
123
124 int64_t values_read;
125
126 for (int i = 0; i < num_columns_; ++i) {
127 ASSERT_FALSE(rg_reader->metadata()->ColumnChunk(i)->has_index_page());
128 std::vector<int16_t> def_levels_out(rows_per_rowgroup_);
129 std::vector<int16_t> rep_levels_out(rows_per_rowgroup_);
130 auto col_reader =
131 std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(i));
132 this->SetupValuesOut(rows_per_rowgroup_);
133 col_reader->ReadBatch(rows_per_rowgroup_, def_levels_out.data(),
134 rep_levels_out.data(), this->values_out_ptr_, &values_read);
135 this->SyncValuesOut();
136 ASSERT_EQ(rows_per_rowgroup_, values_read);
137 ASSERT_EQ(this->values_, this->values_out_);
138 ASSERT_EQ(this->def_levels_, def_levels_out);
139 }
140 }
141 }
142
143 void UnequalNumRows(int64_t max_rows, const std::vector<int64_t> rows_per_column) {
144 std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
145 auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
146
147 std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
148
149 auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
150
151 RowGroupWriter* row_group_writer;
152 row_group_writer = file_writer->AppendRowGroup();
153
154 this->GenerateData(max_rows);
155 for (int col = 0; col < num_columns_; ++col) {
156 auto column_writer =
157 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
158 column_writer->WriteBatch(rows_per_column[col], this->def_levels_.data(), nullptr,
159 this->values_ptr_);
160 column_writer->Close();
161 }
162 row_group_writer->Close();
163 file_writer->Close();
164 }
165
166 void UnequalNumRowsBuffered(int64_t max_rows,
167 const std::vector<int64_t> rows_per_column) {
168 std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
169 auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
170
171 std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
172
173 auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
174
175 RowGroupWriter* row_group_writer;
176 row_group_writer = file_writer->AppendBufferedRowGroup();
177
178 this->GenerateData(max_rows);
179 for (int col = 0; col < num_columns_; ++col) {
180 auto column_writer =
181 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
182 column_writer->WriteBatch(rows_per_column[col], this->def_levels_.data(), nullptr,
183 this->values_ptr_);
184 column_writer->Close();
185 }
186 row_group_writer->Close();
187 file_writer->Close();
188 }
189
190 void RepeatedUnequalRows() {
191 // Optional and repeated, so definition and repetition levels
192 this->SetUpSchema(Repetition::REPEATED);
193
194 const int kNumRows = 100;
195 this->GenerateData(kNumRows);
196
197 std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
198 auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
199 std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
200 auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
201
202 RowGroupWriter* row_group_writer;
203 row_group_writer = file_writer->AppendRowGroup();
204
205 this->GenerateData(kNumRows);
206
207 std::vector<int16_t> definition_levels(kNumRows, 1);
208 std::vector<int16_t> repetition_levels(kNumRows, 0);
209
210 {
211 auto column_writer =
212 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
213 column_writer->WriteBatch(kNumRows, definition_levels.data(),
214 repetition_levels.data(), this->values_ptr_);
215 column_writer->Close();
216 }
217
218 definition_levels[1] = 0;
219 repetition_levels[3] = 1;
220
221 {
222 auto column_writer =
223 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
224 column_writer->WriteBatch(kNumRows, definition_levels.data(),
225 repetition_levels.data(), this->values_ptr_);
226 column_writer->Close();
227 }
228 }
229
230 void ZeroRowsRowGroup() {
231 std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
232 auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
233
234 std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
235
236 auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
237
238 RowGroupWriter* row_group_writer;
239
240 row_group_writer = file_writer->AppendRowGroup();
241 for (int col = 0; col < num_columns_; ++col) {
242 auto column_writer =
243 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
244 column_writer->Close();
245 }
246 row_group_writer->Close();
247
248 row_group_writer = file_writer->AppendBufferedRowGroup();
249 for (int col = 0; col < num_columns_; ++col) {
250 auto column_writer =
251 static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
252 column_writer->Close();
253 }
254 row_group_writer->Close();
255
256 file_writer->Close();
257 }
258};
259
260typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
261 BooleanType, ByteArrayType, FLBAType>
262 TestTypes;
263
264TYPED_TEST_CASE(TestSerialize, TestTypes);
265
266TYPED_TEST(TestSerialize, SmallFileUncompressed) {
267 ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::UNCOMPRESSED));
268}
269
270TYPED_TEST(TestSerialize, TooFewRows) {
271 std::vector<int64_t> num_rows = {100, 100, 100, 99};
272 ASSERT_THROW(this->UnequalNumRows(100, num_rows), ParquetException);
273 ASSERT_THROW(this->UnequalNumRowsBuffered(100, num_rows), ParquetException);
274}
275
276TYPED_TEST(TestSerialize, TooManyRows) {
277 std::vector<int64_t> num_rows = {100, 100, 100, 101};
278 ASSERT_THROW(this->UnequalNumRows(101, num_rows), ParquetException);
279 ASSERT_THROW(this->UnequalNumRowsBuffered(101, num_rows), ParquetException);
280}
281
282TYPED_TEST(TestSerialize, ZeroRows) { ASSERT_NO_THROW(this->ZeroRowsRowGroup()); }
283
284TYPED_TEST(TestSerialize, RepeatedTooFewRows) {
285 ASSERT_THROW(this->RepeatedUnequalRows(), ParquetException);
286}
287
288TYPED_TEST(TestSerialize, SmallFileSnappy) {
289 ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::SNAPPY));
290}
291
292TYPED_TEST(TestSerialize, SmallFileBrotli) {
293 ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::BROTLI));
294}
295
296TYPED_TEST(TestSerialize, SmallFileGzip) {
297 ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::GZIP));
298}
299
300TYPED_TEST(TestSerialize, SmallFileLz4) {
301 ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::LZ4));
302}
303
304#ifdef ARROW_WITH_ZSTD
305TYPED_TEST(TestSerialize, SmallFileZstd) {
306 ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::ZSTD));
307}
308#endif
309
310} // namespace test
311
312} // namespace parquet
313