1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <gtest/gtest.h> |
19 | |
20 | #include <arrow/test-util.h> |
21 | |
22 | #include "parquet/column_reader.h" |
23 | #include "parquet/column_writer.h" |
24 | #include "parquet/test-specialization.h" |
25 | #include "parquet/test-util.h" |
26 | #include "parquet/thrift.h" |
27 | #include "parquet/types.h" |
28 | #include "parquet/util/comparison.h" |
29 | #include "parquet/util/memory.h" |
30 | |
31 | namespace parquet { |
32 | |
33 | using schema::GroupNode; |
34 | using schema::NodePtr; |
35 | using schema::PrimitiveNode; |
36 | |
37 | namespace test { |
38 | |
39 | // The default size used in most tests. |
40 | const int SMALL_SIZE = 100; |
41 | #ifdef PARQUET_VALGRIND |
42 | // Larger size to test some corner cases, only used in some specific cases. |
43 | const int LARGE_SIZE = 10000; |
44 | // Very large size to test dictionary fallback. |
45 | const int VERY_LARGE_SIZE = 40000; |
46 | // Reduced dictionary page size to use for testing dictionary fallback with valgrind |
47 | const int64_t DICTIONARY_PAGE_SIZE = 1024; |
48 | #else |
49 | // Larger size to test some corner cases, only used in some specific cases. |
50 | const int LARGE_SIZE = 100000; |
51 | // Very large size to test dictionary fallback. |
52 | const int VERY_LARGE_SIZE = 400000; |
53 | // Dictionary page size to use for testing dictionary fallback |
54 | const int64_t DICTIONARY_PAGE_SIZE = 1024 * 1024; |
55 | #endif |
56 | |
57 | template <typename TestType> |
58 | class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> { |
59 | public: |
60 | typedef typename TestType::c_type T; |
61 | |
62 | void SetUp() { |
63 | this->SetupValuesOut(SMALL_SIZE); |
64 | writer_properties_ = default_writer_properties(); |
65 | definition_levels_out_.resize(SMALL_SIZE); |
66 | repetition_levels_out_.resize(SMALL_SIZE); |
67 | |
68 | this->SetUpSchema(Repetition::REQUIRED); |
69 | |
70 | descr_ = this->schema_.Column(0); |
71 | } |
72 | |
73 | Type::type type_num() { return TestType::type_num; } |
74 | |
75 | void BuildReader(int64_t num_rows, |
76 | Compression::type compression = Compression::UNCOMPRESSED) { |
77 | auto buffer = sink_->GetBuffer(); |
78 | std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer)); |
79 | std::unique_ptr<PageReader> page_reader = |
80 | PageReader::Open(std::move(source), num_rows, compression); |
81 | reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader))); |
82 | } |
83 | |
84 | std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter( |
85 | int64_t output_size = SMALL_SIZE, |
86 | const ColumnProperties& column_properties = ColumnProperties(), |
87 | const ParquetVersion::type version = ParquetVersion::PARQUET_1_0) { |
88 | sink_.reset(new InMemoryOutputStream()); |
89 | WriterProperties::Builder wp_builder; |
90 | wp_builder.version(version); |
91 | if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY || |
92 | column_properties.encoding() == Encoding::RLE_DICTIONARY) { |
93 | wp_builder.enable_dictionary(); |
94 | wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE); |
95 | } else { |
96 | wp_builder.disable_dictionary(); |
97 | wp_builder.encoding(column_properties.encoding()); |
98 | } |
99 | wp_builder.max_statistics_size(column_properties.max_statistics_size()); |
100 | writer_properties_ = wp_builder.build(); |
101 | |
102 | metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); |
103 | std::unique_ptr<PageWriter> = |
104 | PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get()); |
105 | std::shared_ptr<ColumnWriter> writer = |
106 | ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); |
107 | return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer); |
108 | } |
109 | |
110 | void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) { |
111 | BuildReader(static_cast<int64_t>(this->values_out_.size()), compression); |
112 | reader_->ReadBatch(static_cast<int>(this->values_out_.size()), |
113 | definition_levels_out_.data(), repetition_levels_out_.data(), |
114 | this->values_out_ptr_, &values_read_); |
115 | this->SyncValuesOut(); |
116 | } |
117 | |
118 | void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED); |
119 | |
120 | void TestRequiredWithEncoding(Encoding::type encoding) { |
121 | return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false); |
122 | } |
123 | |
124 | void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression, |
125 | bool enable_dictionary, bool enable_statistics, |
126 | int64_t num_rows = SMALL_SIZE) { |
127 | this->GenerateData(num_rows); |
128 | |
129 | this->WriteRequiredWithSettings(encoding, compression, enable_dictionary, |
130 | enable_statistics, num_rows); |
131 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows)); |
132 | |
133 | this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary, |
134 | enable_statistics, num_rows); |
135 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows)); |
136 | } |
137 | |
138 | void TestDictionaryFallbackEncoding(ParquetVersion::type version) { |
139 | this->GenerateData(VERY_LARGE_SIZE); |
140 | ColumnProperties column_properties; |
141 | column_properties.set_dictionary_enabled(true); |
142 | |
143 | if (version == ParquetVersion::PARQUET_1_0) { |
144 | column_properties.set_encoding(Encoding::PLAIN_DICTIONARY); |
145 | } else { |
146 | column_properties.set_encoding(Encoding::RLE_DICTIONARY); |
147 | } |
148 | |
149 | auto writer = this->BuildWriter(VERY_LARGE_SIZE, column_properties, version); |
150 | |
151 | writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
152 | writer->Close(); |
153 | |
154 | // Read all rows so we are sure that also the non-dictionary pages are read correctly |
155 | this->SetupValuesOut(VERY_LARGE_SIZE); |
156 | this->ReadColumnFully(); |
157 | ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_); |
158 | this->values_.resize(VERY_LARGE_SIZE); |
159 | ASSERT_EQ(this->values_, this->values_out_); |
160 | std::vector<Encoding::type> encodings = this->metadata_encodings(); |
161 | |
162 | if (this->type_num() == Type::BOOLEAN) { |
163 | // Dictionary encoding is not allowed for boolean type |
164 | // There are 2 encodings (PLAIN, RLE) in a non dictionary encoding case |
165 | std::vector<Encoding::type> expected({Encoding::PLAIN, Encoding::RLE}); |
166 | ASSERT_EQ(encodings, expected); |
167 | } else if (version == ParquetVersion::PARQUET_1_0) { |
168 | // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case |
169 | // for version 1.0 |
170 | std::vector<Encoding::type> expected( |
171 | {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN}); |
172 | ASSERT_EQ(encodings, expected); |
173 | } else { |
174 | // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for |
175 | // version 2.0 |
176 | std::vector<Encoding::type> expected( |
177 | {Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN}); |
178 | ASSERT_EQ(encodings, expected); |
179 | } |
180 | } |
181 | |
182 | void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression, |
183 | bool enable_dictionary, bool enable_statistics, |
184 | int64_t num_rows) { |
185 | ColumnProperties column_properties(encoding, compression, enable_dictionary, |
186 | enable_statistics); |
187 | std::shared_ptr<TypedColumnWriter<TestType>> writer = |
188 | this->BuildWriter(num_rows, column_properties); |
189 | writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
190 | // The behaviour should be independent from the number of Close() calls |
191 | writer->Close(); |
192 | writer->Close(); |
193 | } |
194 | |
195 | void WriteRequiredWithSettingsSpaced(Encoding::type encoding, |
196 | Compression::type compression, |
197 | bool enable_dictionary, bool enable_statistics, |
198 | int64_t num_rows) { |
199 | std::vector<uint8_t> valid_bits( |
200 | BitUtil::BytesForBits(static_cast<uint32_t>(this->values_.size())) + 1, 255); |
201 | ColumnProperties column_properties(encoding, compression, enable_dictionary, |
202 | enable_statistics); |
203 | std::shared_ptr<TypedColumnWriter<TestType>> writer = |
204 | this->BuildWriter(num_rows, column_properties); |
205 | writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, |
206 | this->values_ptr_); |
207 | // The behaviour should be independent from the number of Close() calls |
208 | writer->Close(); |
209 | writer->Close(); |
210 | } |
211 | |
212 | void ReadAndCompare(Compression::type compression, int64_t num_rows) { |
213 | this->SetupValuesOut(num_rows); |
214 | this->ReadColumnFully(compression); |
215 | std::shared_ptr<CompareDefault<TestType>> compare; |
216 | compare = std::static_pointer_cast<CompareDefault<TestType>>( |
217 | Comparator::Make(this->descr_)); |
218 | for (size_t i = 0; i < this->values_.size(); i++) { |
219 | if ((*compare)(this->values_[i], this->values_out_[i]) || |
220 | (*compare)(this->values_out_[i], this->values_[i])) { |
221 | std::cout << "Failed at " << i << std::endl; |
222 | } |
223 | ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i])); |
224 | ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i])); |
225 | } |
226 | ASSERT_EQ(this->values_, this->values_out_); |
227 | } |
228 | |
229 | int64_t metadata_num_values() { |
230 | // Metadata accessor must be created lazily. |
231 | // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
232 | // complete (no changes to the metadata buffer can be made after instantiation) |
233 | auto metadata_accessor = |
234 | ColumnChunkMetaData::Make(metadata_->contents(), this->descr_); |
235 | return metadata_accessor->num_values(); |
236 | } |
237 | |
238 | bool metadata_is_stats_set() { |
239 | // Metadata accessor must be created lazily. |
240 | // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
241 | // complete (no changes to the metadata buffer can be made after instantiation) |
242 | ApplicationVersion app_version(this->writer_properties_->created_by()); |
243 | auto metadata_accessor = |
244 | ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version); |
245 | return metadata_accessor->is_stats_set(); |
246 | } |
247 | |
248 | std::vector<Encoding::type> metadata_encodings() { |
249 | // Metadata accessor must be created lazily. |
250 | // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
251 | // complete (no changes to the metadata buffer can be made after instantiation) |
252 | auto metadata_accessor = |
253 | ColumnChunkMetaData::Make(metadata_->contents(), this->descr_); |
254 | return metadata_accessor->encodings(); |
255 | } |
256 | |
257 | protected: |
258 | int64_t values_read_; |
259 | // Keep the reader alive as for ByteArray the lifetime of the ByteArray |
260 | // content is bound to the reader. |
261 | std::unique_ptr<TypedColumnReader<TestType>> reader_; |
262 | |
263 | std::vector<int16_t> definition_levels_out_; |
264 | std::vector<int16_t> repetition_levels_out_; |
265 | |
266 | const ColumnDescriptor* descr_; |
267 | |
268 | private: |
269 | std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_; |
270 | std::unique_ptr<InMemoryOutputStream> sink_; |
271 | std::shared_ptr<WriterProperties> writer_properties_; |
272 | std::vector<std::vector<uint8_t>> data_buffer_; |
273 | }; |
274 | |
275 | template <typename TestType> |
276 | void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) { |
277 | int64_t total_values = static_cast<int64_t>(this->values_out_.size()); |
278 | BuildReader(total_values, compression); |
279 | values_read_ = 0; |
280 | while (values_read_ < total_values) { |
281 | int64_t values_read_recently = 0; |
282 | reader_->ReadBatch( |
283 | static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_), |
284 | definition_levels_out_.data() + values_read_, |
285 | repetition_levels_out_.data() + values_read_, |
286 | this->values_out_ptr_ + values_read_, &values_read_recently); |
287 | values_read_ += values_read_recently; |
288 | } |
289 | this->SyncValuesOut(); |
290 | } |
291 | |
292 | template <> |
293 | void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compression, |
294 | int64_t num_rows) { |
295 | this->SetupValuesOut(num_rows); |
296 | this->ReadColumnFully(compression); |
297 | std::shared_ptr<CompareDefault<Int96Type>> compare; |
298 | compare = std::make_shared<CompareDefaultInt96>(); |
299 | for (size_t i = 0; i < this->values_.size(); i++) { |
300 | if ((*compare)(this->values_[i], this->values_out_[i]) || |
301 | (*compare)(this->values_out_[i], this->values_[i])) { |
302 | std::cout << "Failed at " << i << std::endl; |
303 | } |
304 | ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i])); |
305 | ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i])); |
306 | } |
307 | ASSERT_EQ(this->values_, this->values_out_); |
308 | } |
309 | |
310 | template <> |
311 | void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) { |
312 | int64_t total_values = static_cast<int64_t>(this->values_out_.size()); |
313 | BuildReader(total_values, compression); |
314 | this->data_buffer_.clear(); |
315 | |
316 | values_read_ = 0; |
317 | while (values_read_ < total_values) { |
318 | int64_t values_read_recently = 0; |
319 | reader_->ReadBatch( |
320 | static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_), |
321 | definition_levels_out_.data() + values_read_, |
322 | repetition_levels_out_.data() + values_read_, |
323 | this->values_out_ptr_ + values_read_, &values_read_recently); |
324 | |
325 | // Copy contents of the pointers |
326 | std::vector<uint8_t> data(values_read_recently * this->descr_->type_length()); |
327 | uint8_t* data_ptr = data.data(); |
328 | for (int64_t i = 0; i < values_read_recently; i++) { |
329 | memcpy(data_ptr + this->descr_->type_length() * i, |
330 | this->values_out_[i + values_read_].ptr, this->descr_->type_length()); |
331 | this->values_out_[i + values_read_].ptr = |
332 | data_ptr + this->descr_->type_length() * i; |
333 | } |
334 | data_buffer_.emplace_back(std::move(data)); |
335 | |
336 | values_read_ += values_read_recently; |
337 | } |
338 | this->SyncValuesOut(); |
339 | } |
340 | |
341 | typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, |
342 | BooleanType, ByteArrayType, FLBAType> |
343 | TestTypes; |
344 | |
345 | TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); |
346 | |
347 | using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>; |
348 | |
349 | TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { |
350 | this->TestRequiredWithEncoding(Encoding::PLAIN); |
351 | } |
352 | |
353 | TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) { |
354 | this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY); |
355 | } |
356 | |
357 | /* |
358 | TYPED_TEST(TestPrimitiveWriter, RequiredRLE) { |
359 | this->TestRequiredWithEncoding(Encoding::RLE); |
360 | } |
361 | |
362 | TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) { |
363 | this->TestRequiredWithEncoding(Encoding::BIT_PACKED); |
364 | } |
365 | |
366 | TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) { |
367 | this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); |
368 | } |
369 | |
370 | TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { |
371 | this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); |
372 | } |
373 | |
374 | TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { |
375 | this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); |
376 | } |
377 | |
378 | TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { |
379 | this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY); |
380 | } |
381 | */ |
382 | |
383 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) { |
384 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false, |
385 | LARGE_SIZE); |
386 | } |
387 | |
388 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) { |
389 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false, |
390 | LARGE_SIZE); |
391 | } |
392 | |
393 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) { |
394 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false, |
395 | LARGE_SIZE); |
396 | } |
397 | |
398 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) { |
399 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false, |
400 | LARGE_SIZE); |
401 | } |
402 | |
403 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) { |
404 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, |
405 | LARGE_SIZE); |
406 | } |
407 | |
408 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) { |
409 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true, |
410 | LARGE_SIZE); |
411 | } |
412 | |
413 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) { |
414 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, true, |
415 | LARGE_SIZE); |
416 | } |
417 | |
418 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) { |
419 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true, |
420 | LARGE_SIZE); |
421 | } |
422 | |
423 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) { |
424 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true, |
425 | LARGE_SIZE); |
426 | } |
427 | |
428 | // The ExternalProject for zstd does not build on CMake < 3.7, so we do not |
429 | // require it here |
430 | #ifdef ARROW_WITH_ZSTD |
431 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) { |
432 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false, |
433 | LARGE_SIZE); |
434 | } |
435 | |
436 | TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) { |
437 | this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true, |
438 | LARGE_SIZE); |
439 | } |
440 | #endif |
441 | |
442 | TYPED_TEST(TestPrimitiveWriter, Optional) { |
443 | // Optional and non-repeated, with definition levels |
444 | // but no repetition levels |
445 | this->SetUpSchema(Repetition::OPTIONAL); |
446 | |
447 | this->GenerateData(SMALL_SIZE); |
448 | std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
449 | definition_levels[1] = 0; |
450 | |
451 | auto writer = this->BuildWriter(); |
452 | writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr, |
453 | this->values_ptr_); |
454 | writer->Close(); |
455 | |
456 | // PARQUET-703 |
457 | ASSERT_EQ(100, this->metadata_num_values()); |
458 | |
459 | this->ReadColumn(); |
460 | ASSERT_EQ(99, this->values_read_); |
461 | this->values_out_.resize(99); |
462 | this->values_.resize(99); |
463 | ASSERT_EQ(this->values_, this->values_out_); |
464 | } |
465 | |
466 | TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) { |
467 | // Optional and non-repeated, with definition levels |
468 | // but no repetition levels |
469 | this->SetUpSchema(Repetition::OPTIONAL); |
470 | |
471 | this->GenerateData(SMALL_SIZE); |
472 | std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
473 | std::vector<uint8_t> valid_bits(::arrow::BitUtil::BytesForBits(SMALL_SIZE), 255); |
474 | |
475 | definition_levels[SMALL_SIZE - 1] = 0; |
476 | ::arrow::BitUtil::ClearBit(valid_bits.data(), SMALL_SIZE - 1); |
477 | definition_levels[1] = 0; |
478 | ::arrow::BitUtil::ClearBit(valid_bits.data(), 1); |
479 | |
480 | auto writer = this->BuildWriter(); |
481 | writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr, |
482 | valid_bits.data(), 0, this->values_ptr_); |
483 | writer->Close(); |
484 | |
485 | // PARQUET-703 |
486 | ASSERT_EQ(100, this->metadata_num_values()); |
487 | |
488 | this->ReadColumn(); |
489 | ASSERT_EQ(98, this->values_read_); |
490 | this->values_out_.resize(98); |
491 | this->values_.resize(99); |
492 | this->values_.erase(this->values_.begin() + 1); |
493 | ASSERT_EQ(this->values_, this->values_out_); |
494 | } |
495 | |
496 | TYPED_TEST(TestPrimitiveWriter, Repeated) { |
497 | // Optional and repeated, so definition and repetition levels |
498 | this->SetUpSchema(Repetition::REPEATED); |
499 | |
500 | this->GenerateData(SMALL_SIZE); |
501 | std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
502 | definition_levels[1] = 0; |
503 | std::vector<int16_t> repetition_levels(SMALL_SIZE, 0); |
504 | |
505 | auto writer = this->BuildWriter(); |
506 | writer->WriteBatch(this->values_.size(), definition_levels.data(), |
507 | repetition_levels.data(), this->values_ptr_); |
508 | writer->Close(); |
509 | |
510 | this->ReadColumn(); |
511 | ASSERT_EQ(SMALL_SIZE - 1, this->values_read_); |
512 | this->values_out_.resize(SMALL_SIZE - 1); |
513 | this->values_.resize(SMALL_SIZE - 1); |
514 | ASSERT_EQ(this->values_, this->values_out_); |
515 | } |
516 | |
517 | TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) { |
518 | this->GenerateData(LARGE_SIZE); |
519 | |
520 | // Test case 1: required and non-repeated, so no definition or repetition levels |
521 | auto writer = this->BuildWriter(LARGE_SIZE); |
522 | writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
523 | writer->Close(); |
524 | |
525 | // Just read the first SMALL_SIZE rows to ensure we could read it back in |
526 | this->ReadColumn(); |
527 | ASSERT_EQ(SMALL_SIZE, this->values_read_); |
528 | this->values_.resize(SMALL_SIZE); |
529 | ASSERT_EQ(this->values_, this->values_out_); |
530 | } |
531 | |
532 | // Test cases for dictionary fallback encoding |
533 | TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) { |
534 | this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_1_0); |
535 | } |
536 | |
537 | TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) { |
538 | this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_0); |
539 | } |
540 | |
541 | // PARQUET-719 |
542 | // Test case for NULL values |
543 | TEST_F(TestNullValuesWriter, OptionalNullValueChunk) { |
544 | this->SetUpSchema(Repetition::OPTIONAL); |
545 | |
546 | this->GenerateData(LARGE_SIZE); |
547 | |
548 | std::vector<int16_t> definition_levels(LARGE_SIZE, 0); |
549 | std::vector<int16_t> repetition_levels(LARGE_SIZE, 0); |
550 | |
551 | auto writer = this->BuildWriter(LARGE_SIZE); |
552 | // All values being written are NULL |
553 | writer->WriteBatch(this->values_.size(), definition_levels.data(), |
554 | repetition_levels.data(), nullptr); |
555 | writer->Close(); |
556 | |
557 | // Just read the first SMALL_SIZE rows to ensure we could read it back in |
558 | this->ReadColumn(); |
559 | ASSERT_EQ(0, this->values_read_); |
560 | } |
561 | |
562 | // PARQUET-764 |
563 | // Correct bitpacking for boolean write at non-byte boundaries |
564 | using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>; |
565 | TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) { |
566 | this->SetUpSchema(Repetition::REQUIRED); |
567 | auto writer = this->BuildWriter(); |
568 | for (int i = 0; i < SMALL_SIZE; i++) { |
569 | bool value = (i % 2 == 0) ? true : false; |
570 | writer->WriteBatch(1, nullptr, nullptr, &value); |
571 | } |
572 | writer->Close(); |
573 | this->ReadColumn(); |
574 | for (int i = 0; i < SMALL_SIZE; i++) { |
575 | ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i; |
576 | } |
577 | } |
578 | |
579 | // PARQUET-979 |
580 | // Prevent writing large stats |
581 | using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>; |
582 | TEST_F(TestByteArrayValuesWriter, OmitStats) { |
583 | int min_len = 1024 * 4; |
584 | int max_len = 1024 * 8; |
585 | this->SetUpSchema(Repetition::REQUIRED); |
586 | auto writer = this->BuildWriter(); |
587 | |
588 | values_.resize(SMALL_SIZE); |
589 | InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len); |
590 | writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data()); |
591 | writer->Close(); |
592 | |
593 | ASSERT_FALSE(this->metadata_is_stats_set()); |
594 | } |
595 | |
596 | TEST_F(TestByteArrayValuesWriter, LimitStats) { |
597 | int min_len = 1024 * 4; |
598 | int max_len = 1024 * 8; |
599 | this->SetUpSchema(Repetition::REQUIRED); |
600 | ColumnProperties column_properties; |
601 | column_properties.set_max_statistics_size(static_cast<size_t>(max_len)); |
602 | auto writer = this->BuildWriter(SMALL_SIZE, column_properties); |
603 | |
604 | values_.resize(SMALL_SIZE); |
605 | InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len); |
606 | writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data()); |
607 | writer->Close(); |
608 | |
609 | ASSERT_TRUE(this->metadata_is_stats_set()); |
610 | } |
611 | |
612 | TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) { |
613 | this->SetUpSchema(Repetition::REQUIRED); |
614 | auto writer = this->BuildWriter(); |
615 | this->GenerateData(SMALL_SIZE); |
616 | |
617 | writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_); |
618 | writer->Close(); |
619 | |
620 | ASSERT_TRUE(this->metadata_is_stats_set()); |
621 | } |
622 | |
623 | TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { |
624 | // In ARROW-3930 we discovered a bug when writing from Arrow when we had data |
625 | // that looks like this: |
626 | // |
627 | // [null, [0, 1, null, 2, 3, 4, null]] |
628 | |
629 | // Create schema |
630 | NodePtr item = schema::Int32("item" ); // optional item |
631 | NodePtr list(GroupNode::Make("b" , Repetition::REPEATED, {item}, LogicalType::LIST)); |
632 | NodePtr bag(GroupNode::Make("bag" , Repetition::OPTIONAL, {list})); // optional list |
633 | std::vector<NodePtr> fields = {bag}; |
634 | NodePtr root = GroupNode::Make("schema" , Repetition::REPEATED, fields); |
635 | |
636 | SchemaDescriptor schema; |
637 | schema.Init(root); |
638 | |
639 | InMemoryOutputStream sink; |
640 | auto props = WriterProperties::Builder().build(); |
641 | |
642 | auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); |
643 | std::unique_ptr<PageWriter> = |
644 | PageWriter::Open(&sink, Compression::UNCOMPRESSED, metadata.get()); |
645 | std::shared_ptr<ColumnWriter> writer = |
646 | ColumnWriter::Make(metadata.get(), std::move(pager), props.get()); |
647 | auto typed_writer = std::static_pointer_cast<TypedColumnWriter<Int32Type>>(writer); |
648 | |
649 | std::vector<int16_t> def_levels = {1, 3, 3, 2, 3, 3, 3, 2, 3, 3, 3, 2, 3, 3}; |
650 | std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; |
651 | std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; |
652 | |
653 | // Write the values into uninitialized memory |
654 | std::shared_ptr<Buffer> values_buffer; |
655 | ASSERT_OK(::arrow::AllocateBuffer(64, &values_buffer)); |
656 | memcpy(values_buffer->mutable_data(), values.data(), 13 * sizeof(int32_t)); |
657 | auto values_data = reinterpret_cast<const int32_t*>(values_buffer->data()); |
658 | |
659 | std::shared_ptr<Buffer> valid_bits; |
660 | ASSERT_OK(::arrow::BitUtil::BytesToBits({1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1}, |
661 | ::arrow::default_memory_pool(), &valid_bits)); |
662 | |
663 | // valgrind will warn about out of bounds access into def_levels_data |
664 | typed_writer->WriteBatchSpaced(14, def_levels.data(), rep_levels.data(), |
665 | valid_bits->data(), 0, values_data); |
666 | writer->Close(); |
667 | } |
668 | |
669 | void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level, |
670 | std::vector<int16_t>& input_levels) { |
671 | // for each repetition count upto max_repeat_factor |
672 | for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { |
673 | // repeat count increases by a factor of 2 for every iteration |
674 | int repeat_count = (1 << repeat); |
675 | // generate levels for repetition count upto the maximum level |
676 | int16_t value = 0; |
677 | int bwidth = 0; |
678 | while (value <= max_level) { |
679 | for (int i = 0; i < repeat_count; i++) { |
680 | input_levels.push_back(value); |
681 | } |
682 | value = static_cast<int16_t>((2 << bwidth) - 1); |
683 | bwidth++; |
684 | } |
685 | } |
686 | } |
687 | |
688 | void EncodeLevels(Encoding::type encoding, int16_t max_level, int num_levels, |
689 | const int16_t* input_levels, std::vector<uint8_t>& bytes) { |
690 | LevelEncoder encoder; |
691 | int levels_count = 0; |
692 | bytes.resize(2 * num_levels); |
693 | ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size())); |
694 | // encode levels |
695 | if (encoding == Encoding::RLE) { |
696 | // leave space to write the rle length value |
697 | encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t), |
698 | static_cast<int>(bytes.size())); |
699 | |
700 | levels_count = encoder.Encode(num_levels, input_levels); |
701 | (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len(); |
702 | } else { |
703 | encoder.Init(encoding, max_level, num_levels, bytes.data(), |
704 | static_cast<int>(bytes.size())); |
705 | levels_count = encoder.Encode(num_levels, input_levels); |
706 | } |
707 | ASSERT_EQ(num_levels, levels_count); |
708 | } |
709 | |
710 | void VerifyDecodingLevels(Encoding::type encoding, int16_t max_level, |
711 | std::vector<int16_t>& input_levels, |
712 | std::vector<uint8_t>& bytes) { |
713 | LevelDecoder decoder; |
714 | int levels_count = 0; |
715 | std::vector<int16_t> output_levels; |
716 | int num_levels = static_cast<int>(input_levels.size()); |
717 | |
718 | output_levels.resize(num_levels); |
719 | ASSERT_EQ(num_levels, static_cast<int>(output_levels.size())); |
720 | |
721 | // Decode levels and test with multiple decode calls |
722 | decoder.SetData(encoding, max_level, num_levels, bytes.data()); |
723 | int decode_count = 4; |
724 | int num_inner_levels = num_levels / decode_count; |
725 | // Try multiple decoding on a single SetData call |
726 | for (int ct = 0; ct < decode_count; ct++) { |
727 | int offset = ct * num_inner_levels; |
728 | levels_count = decoder.Decode(num_inner_levels, output_levels.data()); |
729 | ASSERT_EQ(num_inner_levels, levels_count); |
730 | for (int i = 0; i < num_inner_levels; i++) { |
731 | EXPECT_EQ(input_levels[i + offset], output_levels[i]); |
732 | } |
733 | } |
734 | // check the remaining levels |
735 | int num_levels_completed = decode_count * (num_levels / decode_count); |
736 | int num_remaining_levels = num_levels - num_levels_completed; |
737 | if (num_remaining_levels > 0) { |
738 | levels_count = decoder.Decode(num_remaining_levels, output_levels.data()); |
739 | ASSERT_EQ(num_remaining_levels, levels_count); |
740 | for (int i = 0; i < num_remaining_levels; i++) { |
741 | EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]); |
742 | } |
743 | } |
744 | // Test zero Decode values |
745 | ASSERT_EQ(0, decoder.Decode(1, output_levels.data())); |
746 | } |
747 | |
748 | void VerifyDecodingMultipleSetData(Encoding::type encoding, int16_t max_level, |
749 | std::vector<int16_t>& input_levels, |
750 | std::vector<std::vector<uint8_t>>& bytes) { |
751 | LevelDecoder decoder; |
752 | int levels_count = 0; |
753 | std::vector<int16_t> output_levels; |
754 | |
755 | // Decode levels and test with multiple SetData calls |
756 | int setdata_count = static_cast<int>(bytes.size()); |
757 | int num_levels = static_cast<int>(input_levels.size()) / setdata_count; |
758 | output_levels.resize(num_levels); |
759 | // Try multiple SetData |
760 | for (int ct = 0; ct < setdata_count; ct++) { |
761 | int offset = ct * num_levels; |
762 | ASSERT_EQ(num_levels, static_cast<int>(output_levels.size())); |
763 | decoder.SetData(encoding, max_level, num_levels, bytes[ct].data()); |
764 | levels_count = decoder.Decode(num_levels, output_levels.data()); |
765 | ASSERT_EQ(num_levels, levels_count); |
766 | for (int i = 0; i < num_levels; i++) { |
767 | EXPECT_EQ(input_levels[i + offset], output_levels[i]); |
768 | } |
769 | } |
770 | } |
771 | |
772 | // Test levels with maximum bit-width from 1 to 8 |
773 | // increase the repetition count for each iteration by a factor of 2 |
774 | TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) { |
775 | int min_repeat_factor = 0; |
776 | int max_repeat_factor = 7; // 128 |
777 | int max_bit_width = 8; |
778 | std::vector<int16_t> input_levels; |
779 | std::vector<uint8_t> bytes; |
780 | Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED}; |
781 | |
782 | // for each encoding |
783 | for (int encode = 0; encode < 2; encode++) { |
784 | Encoding::type encoding = encodings[encode]; |
785 | // BIT_PACKED requires a sequence of atleast 8 |
786 | if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3; |
787 | // for each maximum bit-width |
788 | for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) { |
789 | // find the maximum level for the current bit_width |
790 | int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1); |
791 | // Generate levels |
792 | GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels); |
793 | ASSERT_NO_FATAL_FAILURE(EncodeLevels(encoding, max_level, |
794 | static_cast<int>(input_levels.size()), |
795 | input_levels.data(), bytes)); |
796 | ASSERT_NO_FATAL_FAILURE( |
797 | VerifyDecodingLevels(encoding, max_level, input_levels, bytes)); |
798 | input_levels.clear(); |
799 | } |
800 | } |
801 | } |
802 | |
803 | // Test multiple decoder SetData calls |
804 | TEST(TestLevels, TestLevelsDecodeMultipleSetData) { |
805 | int min_repeat_factor = 3; |
806 | int max_repeat_factor = 7; // 128 |
807 | int bit_width = 8; |
808 | int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1); |
809 | std::vector<int16_t> input_levels; |
810 | std::vector<std::vector<uint8_t>> bytes; |
811 | Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED}; |
812 | GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels); |
813 | int num_levels = static_cast<int>(input_levels.size()); |
814 | int setdata_factor = 8; |
815 | int split_level_size = num_levels / setdata_factor; |
816 | bytes.resize(setdata_factor); |
817 | |
818 | // for each encoding |
819 | for (int encode = 0; encode < 2; encode++) { |
820 | Encoding::type encoding = encodings[encode]; |
821 | for (int rf = 0; rf < setdata_factor; rf++) { |
822 | int offset = rf * split_level_size; |
823 | ASSERT_NO_FATAL_FAILURE(EncodeLevels( |
824 | encoding, max_level, split_level_size, |
825 | reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf])); |
826 | } |
827 | ASSERT_NO_FATAL_FAILURE( |
828 | VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes)); |
829 | } |
830 | } |
831 | |
832 | TEST(TestLevelEncoder, MinimumBufferSize) { |
833 | // PARQUET-676, PARQUET-698 |
834 | const int kNumToEncode = 1024; |
835 | |
836 | std::vector<int16_t> levels; |
837 | for (int i = 0; i < kNumToEncode; ++i) { |
838 | if (i % 9 == 0) { |
839 | levels.push_back(0); |
840 | } else { |
841 | levels.push_back(1); |
842 | } |
843 | } |
844 | |
845 | std::vector<uint8_t> output( |
846 | LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode)); |
847 | |
848 | LevelEncoder encoder; |
849 | encoder.Init(Encoding::RLE, 1, kNumToEncode, output.data(), |
850 | static_cast<int>(output.size())); |
851 | int encode_count = encoder.Encode(kNumToEncode, levels.data()); |
852 | |
853 | ASSERT_EQ(kNumToEncode, encode_count); |
854 | } |
855 | |
856 | TEST(TestLevelEncoder, MinimumBufferSize2) { |
857 | // PARQUET-708 |
858 | // Test the worst case for bit_width=2 consisting of |
859 | // LiteralRun(size=8) |
860 | // RepeatedRun(size=8) |
861 | // LiteralRun(size=8) |
862 | // ... |
863 | const int kNumToEncode = 1024; |
864 | |
865 | std::vector<int16_t> levels; |
866 | for (int i = 0; i < kNumToEncode; ++i) { |
867 | // This forces a literal run of 00000001 |
868 | // followed by eight 1s |
869 | if ((i % 16) < 7) { |
870 | levels.push_back(0); |
871 | } else { |
872 | levels.push_back(1); |
873 | } |
874 | } |
875 | |
876 | for (int16_t bit_width = 1; bit_width <= 8; bit_width++) { |
877 | std::vector<uint8_t> output( |
878 | LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode)); |
879 | |
880 | LevelEncoder encoder; |
881 | encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(), |
882 | static_cast<int>(output.size())); |
883 | int encode_count = encoder.Encode(kNumToEncode, levels.data()); |
884 | |
885 | ASSERT_EQ(kNumToEncode, encode_count); |
886 | } |
887 | } |
888 | |
889 | } // namespace test |
890 | } // namespace parquet |
891 | |